Update stuff
This commit is contained in:
+16
-21
@@ -7,25 +7,24 @@
|
||||
|
||||
struct thread_pool_queue {
|
||||
Task task;
|
||||
void* arg;
|
||||
void *arg;
|
||||
FreeFunc ff;
|
||||
struct thread_pool_queue* next;
|
||||
struct thread_pool_queue *next;
|
||||
};
|
||||
|
||||
struct _ThreadPool {
|
||||
bool running;
|
||||
size_t nthreads;
|
||||
sigset_t thread_sig_mask;
|
||||
pthread_t* threads;
|
||||
pthread_t *threads;
|
||||
|
||||
pthread_cond_t queue_cnd;
|
||||
pthread_mutex_t queue_mtx;
|
||||
struct thread_pool_queue* queue;
|
||||
struct thread_pool_queue *queue;
|
||||
};
|
||||
|
||||
// return false if we need to stop
|
||||
static bool get_task(ThreadPool* pool, Task* task, void** task_arg)
|
||||
{
|
||||
static bool get_task(ThreadPool *pool, Task *task, void **task_arg) {
|
||||
pthread_mutex_lock(&pool->queue_mtx);
|
||||
if (!pool->running) {
|
||||
pthread_mutex_unlock(&pool->queue_mtx);
|
||||
@@ -37,7 +36,7 @@ static bool get_task(ThreadPool* pool, Task* task, void** task_arg)
|
||||
pthread_mutex_unlock(&pool->queue_mtx);
|
||||
return false;
|
||||
}
|
||||
struct thread_pool_queue* ent = pool->queue;
|
||||
struct thread_pool_queue *ent = pool->queue;
|
||||
if (ent) {
|
||||
pool->queue = pool->queue->next;
|
||||
pthread_mutex_unlock(&pool->queue_mtx);
|
||||
@@ -50,21 +49,19 @@ static bool get_task(ThreadPool* pool, Task* task, void** task_arg)
|
||||
abort();
|
||||
}
|
||||
|
||||
static void* pool_thread_function(void* arg)
|
||||
{
|
||||
ThreadPool* pool = arg;
|
||||
static void *pool_thread_function(void *arg) {
|
||||
ThreadPool *pool = arg;
|
||||
pthread_sigmask(SIG_SETMASK, &pool->thread_sig_mask, NULL);
|
||||
Task task;
|
||||
void* task_arg;
|
||||
void *task_arg;
|
||||
while (get_task(pool, &task, &task_arg)) {
|
||||
task(task_arg);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ThreadPool* make_thread_pool(size_t parallelism, sigset_t sig_mask)
|
||||
{
|
||||
ThreadPool* pool = malloc_safe(sizeof(ThreadPool));
|
||||
ThreadPool *make_thread_pool(size_t parallelism, sigset_t sig_mask) {
|
||||
ThreadPool *pool = malloc_safe(sizeof(ThreadPool));
|
||||
pthread_mutex_init(&pool->queue_mtx, NULL);
|
||||
pthread_cond_init(&pool->queue_cnd, NULL);
|
||||
pool->running = true;
|
||||
@@ -84,8 +81,7 @@ ThreadPool* make_thread_pool(size_t parallelism, sigset_t sig_mask)
|
||||
return pool;
|
||||
}
|
||||
|
||||
void destroy_thread_pool(ThreadPool* pool)
|
||||
{
|
||||
void destroy_thread_pool(ThreadPool *pool) {
|
||||
pthread_mutex_lock(&pool->queue_mtx);
|
||||
pool->running = false;
|
||||
pthread_cond_broadcast(&pool->queue_cnd);
|
||||
@@ -96,9 +92,9 @@ void destroy_thread_pool(ThreadPool* pool)
|
||||
free(pool->threads);
|
||||
pthread_mutex_destroy(&pool->queue_mtx);
|
||||
pthread_cond_destroy(&pool->queue_cnd);
|
||||
struct thread_pool_queue* queue = pool->queue;
|
||||
struct thread_pool_queue *queue = pool->queue;
|
||||
while (queue) {
|
||||
struct thread_pool_queue* next = queue->next;
|
||||
struct thread_pool_queue *next = queue->next;
|
||||
if (queue->ff) {
|
||||
queue->ff(queue->arg);
|
||||
}
|
||||
@@ -108,10 +104,9 @@ void destroy_thread_pool(ThreadPool* pool)
|
||||
free(pool);
|
||||
}
|
||||
|
||||
void thread_pool_enqueue(ThreadPool* pool, Task task, void* arg, FreeFunc ff)
|
||||
{
|
||||
void thread_pool_enqueue(ThreadPool *pool, Task task, void *arg, FreeFunc ff) {
|
||||
pthread_mutex_lock(&pool->queue_mtx);
|
||||
struct thread_pool_queue* new = malloc_safe(sizeof(struct thread_pool_queue));
|
||||
struct thread_pool_queue *new = malloc_safe(sizeof(struct thread_pool_queue));
|
||||
new->task = task;
|
||||
new->arg = arg;
|
||||
new->ff = ff;
|
||||
|
||||
Reference in New Issue
Block a user