threadpool.c (4341B)
1 #include "threadpool.h" 2 3 #include "memory.h" 4 5 #include <pthread.h> 6 #include <stdbool.h> 7 #include <stdlib.h> 8 9 typedef struct ThreadWork ThreadWork; 10 11 struct ThreadWork { 12 ThreadFunc func; 13 void *arg; 14 ThreadWork *next; 15 }; 16 17 static ThreadWork *ThreadWork_make(ThreadFunc func, void *arg) 18 { 19 ThreadWork *work = s_alloc(sizeof *work, MEMORY_TAG_JOB); 20 work->func = func; 21 work->arg = arg; 22 work->next = NULL; 23 24 return work; 25 } 26 27 static void ThreadWork_free(ThreadWork *work) 28 { 29 if (work) 30 { 31 s_free(work, sizeof *work, MEMORY_TAG_JOB); 32 } 33 } 34 35 struct ThreadPool { 36 ThreadWork *work_first; 37 ThreadWork *work_last; 38 pthread_mutex_t work_mutex; 39 pthread_cond_t work_cond; 40 pthread_cond_t active_cond; 41 unsigned int thread_count; 42 unsigned int active_count; 43 bool stop; 44 }; 45 46 void ThreadPool_addWork(ThreadPool *pool, ThreadFunc func, void *arg) 47 { 48 ThreadWork *work = ThreadWork_make(func, arg); 49 50 pthread_mutex_lock(&pool->work_mutex); 51 if (pool->work_first == NULL) 52 { 53 pool->work_first = work; 54 pool->work_last = pool->work_first; 55 } 56 else 57 { 58 pool->work_last->next = work; 59 pool->work_last = work; 60 } 61 62 pthread_cond_broadcast(&pool->work_cond); 63 pthread_mutex_unlock(&pool->work_mutex); 64 } 65 66 static ThreadWork *ThreadPool_getWork(ThreadPool *pool) 67 { 68 ThreadWork *work = pool->work_first; 69 if (work == NULL) 70 { 71 return NULL; 72 } 73 74 if (work->next == NULL) 75 { 76 pool->work_first = NULL; 77 pool->work_last = NULL; 78 } 79 else 80 { 81 pool->work_first = work->next; 82 } 83 84 return work; 85 } 86 87 static void *ThreadPool_worker(void *arg) 88 { 89 ThreadPool *pool = arg; 90 ThreadWork *work; 91 92 while (true) 93 { 94 pthread_mutex_lock(&pool->work_mutex); 95 96 while (pool->work_first == NULL && !pool->stop) 97 { 98 pthread_cond_wait(&pool->work_cond, &pool->work_mutex); 99 } 100 101 if (pool->stop) 102 { 103 break; 104 } 105 106 work = ThreadPool_getWork(pool); 107 pool->active_count += 1; 108 109 pthread_mutex_unlock(&pool->work_mutex); 110 111 if (work != NULL) 112 { 113 work->func(work->arg); 114 ThreadWork_free(work); 115 } 116 117 pthread_mutex_lock(&pool->work_mutex); 118 119 pool->active_count -= 1; 120 if (!pool->stop && pool->active_count == 0 && pool->work_first == NULL) 121 { 122 pthread_cond_signal(&pool->active_cond); 123 } 124 125 pthread_mutex_unlock(&pool->work_mutex); 126 } 127 128 pool->thread_count -= 1; 129 pthread_cond_signal(&pool->active_cond); 130 pthread_mutex_unlock(&pool->work_mutex); 131 132 return NULL; 133 } 134 135 ThreadPool *ThreadPool_make(unsigned int thread_count) 136 { 137 ThreadPool *pool = s_alloc(sizeof *pool, MEMORY_TAG_JOB); 138 pool->work_first = NULL; 139 pool->work_last = NULL; 140 pthread_mutex_init(&pool->work_mutex, NULL); 141 pthread_cond_init(&pool->work_cond, NULL); 142 pthread_cond_init(&pool->active_cond, NULL); 143 pool->thread_count = thread_count; 144 pool->active_count = 0; 145 pool->stop = false; 146 147 pthread_t thread; 148 for (unsigned int i = 0; i < thread_count; i++) 149 { 150 pthread_create(&thread, NULL, ThreadPool_worker, pool); 151 pthread_detach(thread); 152 } 153 154 return pool; 155 } 156 157 void ThreadPool_wait(ThreadPool *pool) 158 { 159 pthread_mutex_lock(&pool->work_mutex); 160 while (true) 161 { 162 if ((!pool->stop && pool->active_count != 0) 163 || (pool->stop && pool->thread_count != 0)) 164 { 165 pthread_cond_wait(&pool->active_cond, &pool->work_mutex); 166 } 167 else 168 { 169 break; 170 } 171 } 172 pthread_mutex_unlock(&pool->work_mutex); 173 } 174 175 void ThreadPool_free(ThreadPool *pool) 176 { 177 ThreadWork *work1; 178 ThreadWork *work2; 179 180 if (pool == NULL) 181 { 182 return; 183 } 184 185 pthread_mutex_lock(&pool->work_mutex); 186 work1 = pool->work_first; 187 while (work1 != NULL) 188 { 189 work2 = work1->next; 190 ThreadWork_free(work1); 191 work1 = work2; 192 } 193 194 pool->stop = true; 195 pthread_cond_broadcast(&pool->work_cond); 196 pthread_mutex_unlock(&pool->work_mutex); 197 198 ThreadPool_wait(pool); 199 200 pthread_mutex_destroy(&pool->work_mutex); 201 pthread_cond_destroy(&pool->work_cond); 202 pthread_cond_destroy(&pool->active_cond); 203 }