00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #ifndef _MACE_THREAD_POOL_H
00034 #define _MACE_THREAD_POOL_H
00035
00036 #include <list>
00037 #include <pthread.h>
00038
00039 #include "mvector.h"
00040 #include "mhash_set.h"
00041 #include "mace-macros.h"
00042
00043 #include "ScopedLock.h"
00044 #include "ScopedLog.h"
00045 #include "ThreadCreate.h"
00046 #include "Scheduler.h"
00047
00054 namespace mace {
00055
00056 template<class C, class D>
00057 class ThreadPool {
00058
00059 public:
00060 typedef bool (C::*ConditionFP)(uint);
00061 typedef void (C::*WorkFP)(uint);
00062
00063 private:
00064 struct ThreadArg {
00065 ThreadPool* p;
00066 uint i;
00067 };
00068
00069 public:
00070 ThreadPool(C& o,
00071 ConditionFP cond,
00072 WorkFP process,
00073 WorkFP setup = 0,
00074 WorkFP finish = 0,
00075 uint16_t numThreads = 1) :
00076 obj(o), dstore(0), cond(cond), process(process), setup(setup), finish(finish),
00077 threadCount(numThreads), sleeping(0), stop(false) {
00078
00079 dstore = new D[threadCount];
00080 sleeping = new uint[threadCount];
00081 for (uint i = 0; i < threadCount; i++) {
00082 sleeping[i] = 0;
00083 }
00084
00085 assert(pthread_key_create(&key, 0) == 0);
00086 assert(pthread_mutex_init(&poolMutex, 0) == 0);
00087 assert(pthread_cond_init(&signalv, 0) == 0);
00088
00089 for (uint i = 0; i < threadCount; i++) {
00090
00091
00092
00093 pthread_t t;
00094 ThreadArg* ta = new ThreadArg;
00095 ta->p = this;
00096 ta->i = i;
00097 runNewThread(&t, ThreadPool::startThread, ta, 0);
00098 threads.push_back(t);
00099 }
00100 assert(threadCount == threads.size());
00101 }
00102
00103 virtual ~ThreadPool() {
00104 halt();
00105
00106
00107
00108
00109
00110 assert(pthread_mutex_destroy(&poolMutex) == 0);
00111 assert(pthread_cond_destroy(&signalv) == 0);
00112 assert(pthread_key_delete(key) == 0);
00113
00114 delete [] dstore;
00115 delete [] sleeping;
00116 }
00117
00118 uint getThreadCount() const {
00119 return threadCount;
00120 }
00121
00122 size_t size() const {
00123 return threadCount;
00124 }
00125
00126 size_t sleepingSize() const {
00127 size_t r = 0;
00128 for (uint i = 0; i < threadCount; i++) {
00129 r += sleeping[i];
00130 }
00131 return r;
00132 }
00133
00134 bool isDone() const {
00135 return (sleepingSize() == threadCount);
00136 }
00137
00138 void halt() {
00139 stop = true;
00140 signal();
00141 }
00142
00143 void signal() {
00144 lock();
00145
00146
00147
00148 pthread_cond_broadcast(&signalv);
00149 unlock();
00150 }
00151
00152 D& data(uint i) const {
00153 assert(i < threadCount);
00154 return dstore[i];
00155 }
00156
00157 void* getSpecific() {
00158 return pthread_getspecific(key);
00159 }
00160
00161 void setSpecific(const void* v) {
00162 assert(pthread_setspecific(key, v) == 0);
00163 }
00164
00165 void lock() const {
00166 assert(pthread_mutex_lock(&poolMutex) == 0);
00167 }
00168
00169 void unlock() const {
00170 assert(pthread_mutex_unlock(&poolMutex) == 0);
00171 }
00172
00173 protected:
00174 static void* startThread(void* arg) {
00175 ThreadArg* a = (ThreadArg*)arg;
00176 ThreadPool* t = (ThreadPool*)(a->p);
00177 t->run(a->i);
00178 delete a;
00179 return 0;
00180 }
00181
00182
00183 void wait(uint index) {
00184
00185
00186
00187
00188
00189
00190
00191 assert(pthread_cond_wait(&signalv, &poolMutex) == 0);
00192
00193 }
00194
00195 private:
00196 void run(uint index) {
00197 assert(index < threadCount);
00198 ScopedLock sl(poolMutex);
00199
00200 while (!stop) {
00201 if (!(obj.*cond)(index)) {
00202 sleeping[index] = 1;
00203 wait(index);
00204 continue;
00205 }
00206
00207 sleeping[index] = 0;
00208
00209 if (setup) {
00210 (obj.*setup)(index);
00211 }
00212 sl.unlock();
00213 (obj.*process)(index);
00214 sl.lock();
00215 if (finish) {
00216 (obj.*finish)(index);
00217 }
00218 }
00219 }
00220
00221 private:
00222 C& obj;
00223 D* dstore;
00224 ConditionFP cond;
00225 WorkFP process;
00226 WorkFP setup;
00227 WorkFP finish;
00228 uint threadCount;
00229 uint* sleeping;
00230 bool stop;
00231 pthread_key_t key;
00232 mutable pthread_mutex_t poolMutex;
00233 pthread_cond_t signalv;
00234
00235 typedef vector<pthread_t> ThreadList;
00236 ThreadList threads;
00237
00238
00239 };
00240
00241 }
00242
00243 #endif // _MACE_THREAD_POOL_H