Skip to content

Commit

Permalink
Implement constructor, run, post, and destructor.
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-j-ibanez committed Dec 23, 2023
1 parent c7ef4c0 commit 253a068
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 16 deletions.
7 changes: 4 additions & 3 deletions include/whirlpool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@

class ThreadPool {
private:
std::atomic_bool active { true };
std::atomic_bool active;
std::vector<std::thread> thread_pool;
std::queue<std::packaged_task<void()>> job_queue;
std::queue<std::function<void()>> job_queue;
std::mutex pool_lock;
std::condition_variable cv;
void run();
public:
explicit ThreadPool(int num_threads = 1);
~ThreadPool();
void post(std::packaged_task<void()>);
template <typename Function, typename... Args>
auto post(Function&& f, Args&&... args) -> std::future<decltype(f(args...))>;
void stop();
bool busy();
};
63 changes: 50 additions & 13 deletions src/whirlpool.cpp
Original file line number Diff line number Diff line change
@@ -1,35 +1,72 @@
#include "../include/whirlpool.hpp"
#include <future>
#include <functional>
#include <unistd.h>

ThreadPool::ThreadPool(int num_threads) {
active = true;
for (int i = 0; i < num_threads; i++) {
std::thread t(&ThreadPool::run, this);
thread_pool.push_back(t);
thread_pool.emplace_back(&ThreadPool::run, this);
}
}

ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(pool_lock);
active = false;
cv.notify_all();
for (int i = 0; i < thread_pool.size(); i++) {
thread_pool[i].join();
}

for (auto& thread : thread_pool) {
thread.join();
}
}

void ThreadPool::post(std::packaged_task<void()> task) {
std::unique_lock lock(pool_lock);
job_queue.emplace(task);
cv.notify_one();
template <typename Function, typename... Args>
auto ThreadPool::post(Function&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using ReturnType = decltype(f(args...));

// Create packaged task using function
auto func_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(
std::bind(std::forward<Function>(f), std::forward<Args>(args)...));
auto func_wrapper = [func_ptr]() {
(*func_ptr)();
};

{
std::unique_lock<std::mutex> lock(pool_lock);
job_queue.push(func_wrapper);
cv.notify_one();
}

// Get task future
auto future = func_ptr->get_future();
return future;
}

void ThreadPool::run() {
while(active) {
std::unique_lock lock(pool_lock);
cv.wait(lock, [&] { return !job_queue.empty() || !active; });
if(!active) break;
// Get next job from queue
std::packaged_task<void()> job;
job.swap(job_queue.front());
job_queue.pop();
// Get job if thread pool is still active
if(active) {
auto job = std::move(job_queue.front());
job_queue.pop();
lock.unlock();
job();
}
else {
break;
}
}
}

void ThreadPool::stop() {
while(!job_queue.empty()) { }
active = false;
cv.notify_all();
}

bool ThreadPool::busy() {
return active;
}

0 comments on commit 253a068

Please sign in to comment.