00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #ifndef _DYNAMIC_PROCESSING_POOL_H
00034 #define _DYNAMIC_PROCESSING_POOL_H
00035
00036 #include <deque>
00037
00038 #include "ThreadCreate.h"
00039 #include "ScopedLock.h"
00040 #include "mset.h"
00041
00049 namespace DynamicProcessingPoolCallbacks {
00050
00051 template<class P, class C, class B1>
00052 class callback_cref {
00053 typedef void (C::*cb_t) (const B1&);
00054 P c;
00055 cb_t f;
00056 public:
00057 callback_cref(const P &cc, cb_t ff)
00058 : c (cc), f (ff) {}
00059 void operator() (const B1& b1)
00060 { ((*c).*f) (b1); }
00061 };
00062
00063 template<class P, class C, class B1>
00064 class callback_copy {
00065 typedef void (C::*cb_t) (B1);
00066 P c;
00067 cb_t f;
00068 public:
00069 callback_copy(const P &cc, cb_t ff)
00070 : c (cc), f (ff) {}
00071 void operator() (B1 b1)
00072 { ((*c).*f) (b1); }
00073 };
00074 };
00075
00076
00077 template <class C, class T>
00078 class DynamicProcessingPool {
00079
00080 public:
00081 DynamicProcessingPool(C p, size_t numThreads, size_t maxThreads) :
00082 proc(p), threadCount(numThreads), maxThreads(maxThreads), stop(false) {
00083
00084 ASSERT(pthread_mutex_init(&poolMutex, 0) == 0);
00085 ASSERT(pthread_cond_init(&signalv, 0) == 0);
00086
00087 for (size_t i = 0; i < threadCount; i++) {
00088 createThread();
00089 }
00090 }
00091
00092 virtual ~DynamicProcessingPool() {
00093 halt();
00094 }
00095
00096 void halt() {
00097 ScopedLock sl(poolMutex);
00098 stop = true;
00099 signal();
00100 }
00101
00102 void execute(const T& v) {
00103 ScopedLock sl(poolMutex);
00104
00105 tasks.push_back(v);
00106 if (sleeping == 0 && threads < maxThreads) {
00107 createThread();
00108 }
00109 signal();
00110 }
00111
00112 protected:
00113 static void* startThread(void* arg) {
00114 DynamicProcessingPool<C, T>* pq = (DynamicProcessingPool<C, T> *)arg;
00115 pq->run();
00116 return 0;
00117 }
00118
00119 private:
00120 void createThread() {
00121 pthread_t t;
00122 runNewThread(&t, DynamicProcessingPool::startThread, this, 0);
00123 threads++;
00124 sleeping++;
00125 }
00126
00127 void lock() const {
00128 ASSERT(pthread_mutex_lock(&poolMutex) == 0);
00129 }
00130
00131 void unlock() const {
00132 ASSERT(pthread_mutex_unlock(&poolMutex) == 0);
00133 }
00134
00135 void signal() {
00136
00137 pthread_cond_broadcast(&signalv);
00138 }
00139
00140 void wait() {
00141 ASSERT(pthread_cond_wait(&signalv, &poolMutex) == 0);
00142 }
00143
00144 void run() {
00145 ScopedLock sl(poolMutex);
00146
00147 while (!stop) {
00148 if (tasks.empty()) {
00149 wait();
00150 continue;
00151 }
00152
00153 ASSERT(sleeping > 0);
00154 sleeping--;
00155 T t = tasks.front();
00156 tasks.pop_front();
00157
00158 sl.unlock();
00159
00160 proc(t);
00161
00162 sl.lock();
00163
00164 if (tasks.empty() && threads > threadCount) {
00165 ASSERT(threads > 0);
00166 threads--;
00167 return;
00168 }
00169
00170 sleeping++;
00171 ASSERT(sleeping <= threads);
00172 }
00173 }
00174
00175
00176 private:
00177 C proc;
00178 size_t threadCount;
00179 size_t maxThreads;
00180 bool stop;
00181
00182 typedef mace::set<pthread_t> ThreadSet;
00183 size_t threads;
00184 size_t sleeping;
00185
00186 mutable pthread_mutex_t poolMutex;
00187 pthread_cond_t signalv;
00188
00189 std::deque<T> tasks;
00190
00191 };
00192
00193 #endif // _DYNAMIC_PROCESSING_POOL_H