00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #ifndef _PROCESSING_QUEUE_H
00034 #define _PROCESSING_QUEUE_H
00035
00036 #include <pthread.h>
00037 #include <cassert>
00038
00039 #include "CircularQueueList.h"
00040 #include "Exception.h"
00041 #include "ThreadCreate.h"
00042
00049 namespace ProcessingQueueCallbacks {
00050 template<class P, class C, class R, class B1>
00051 class callback_cref {
00052 typedef R (C::*cb_t) (const B1&);
00053 P c;
00054 cb_t f;
00055 public:
00056 callback_cref(const P &cc, cb_t ff)
00057 : c (cc), f (ff) {}
00058 R operator() (const B1& b1)
00059 { return ((*c).*f) (b1); }
00060 };
00061
00062 template<class P, class C, class R, class B1>
00063 class callback_copy {
00064 typedef R (C::*cb_t) (B1);
00065 P c;
00066 cb_t f;
00067 public:
00068 callback_copy(const P &cc, cb_t ff)
00069 : c (cc), f (ff) {}
00070 R operator() (B1 b1)
00071 { return ((*c).*f) (b1); }
00072 };
00073 };
00074
00075 class ProcessingQueueException : public virtual Exception {
00076 public:
00077 ProcessingQueueException(const std::string& m) : Exception(m) { }
00078 void rethrow() const { throw *this; }
00079 };
00080
00081 template <class C, class R, class T>
00082 class ProcessingQueue {
00083
00084 public:
00085 ProcessingQueue(C p, pthread_cond_t* sig = 0, pthread_mutex_t* siglock = 0) :
00086 proc(p), resultSignal(sig), resultLock(siglock), halt(false), busy(false) {
00087
00088 pthread_mutex_init(&taskLock, 0);
00089 pthread_cond_init(&taskSignal, 0);
00090
00091 _runNewThread(&processingThread, startProcessingThread, this, 0,
00092 "startProcessingThread", false);
00093 }
00094
00095 virtual ~ProcessingQueue() {
00096 shutdown();
00097
00098 assert(pthread_join(processingThread, NULL) == 0);
00099 pthread_mutex_destroy(&taskLock);
00100 pthread_cond_destroy(&taskSignal);
00101 }
00102
00103 void shutdown() {
00104 halt = true;
00105 signalTask();
00106 }
00107
00108 void push(const T& v) {
00109 busy = true;
00110 tasks.push(v);
00111 signalTask();
00112 }
00113
00114 const R& front() const throw (ProcessingQueueException) {
00115 if (results.empty()) {
00116 throw ProcessingQueueException("dequeue called on empty queue");
00117 }
00118 const R& r = results.front();
00119 return r;
00120 }
00121
00122 void pop() throw (ProcessingQueueException) {
00123 if (results.empty()) {
00124 throw ProcessingQueueException("dequeue called on empty queue");
00125 }
00126 results.pop();
00127 }
00128
00129
00130 void clear() {
00131 results.clear();
00132 }
00133
00134
00135 bool empty() {
00136 return results.empty();
00137 }
00138
00139
00140 size_t size() {
00141 return results.size();
00142 }
00143
00144 void clearQueue() {
00145 tasks.clear();
00146 }
00147
00148
00149 bool queueEmpty() {
00150 return tasks.empty();
00151 }
00152
00153
00154 size_t queueSize() {
00155 return tasks.size();
00156 }
00157
00158 bool isBusy() const {
00159 return busy;
00160 }
00161
00162 void setResultSignal(pthread_cond_t* sig, pthread_mutex_t* mutex) {
00163 resultSignal = sig;
00164 resultLock = mutex;
00165 }
00166
00167 void clearResultSignal() {
00168 resultSignal = 0;
00169 }
00170
00171 protected:
00172 static void* startProcessingThread(void* arg) {
00173 ProcessingQueue<C, R, T>* pq = (ProcessingQueue<C, R, T> *)arg;
00174 pq->processQueue();
00175 return 0;
00176 }
00177
00178 private:
00179 void signalTask() {
00180 acquireLock(taskLock);
00181 signalCond(taskSignal);
00182 releaseLock(taskLock);
00183 }
00184
00185 void processQueue() {
00186 while (1) {
00187 while (tasks.empty() && !halt) {
00188 busy = false;
00189 acquireLock(taskLock);
00190 waitForCond(taskSignal, taskLock);
00191 releaseLock(taskLock);
00192 }
00193 if (halt) {
00194 busy = false;
00195 return;
00196 }
00197
00198 T& t = tasks.front();
00199
00200 R r = proc(t);
00201
00202 tasks.pop();
00203
00204 results.push(r);
00205
00206 if (resultSignal) {
00207 acquireLock(*resultLock);
00208 signalCond(*resultSignal);
00209 releaseLock(*resultLock);
00210 }
00211 }
00212 }
00213
00214 void acquireLock(pthread_mutex_t& l) const {
00215 assert(pthread_mutex_lock(&l) == 0);
00216 }
00217
00218 void releaseLock(pthread_mutex_t& l) const {
00219 assert(pthread_mutex_unlock(&l) == 0);
00220 }
00221
00222 void signalCond(pthread_cond_t& c) {
00223 assert(pthread_cond_signal(&c) == 0);
00224 }
00225
00226 void waitForCond(pthread_cond_t& c, pthread_mutex_t& l) {
00227
00228 assert(pthread_cond_wait(&c, &l) == 0);
00229 }
00230
00231 private:
00232 C proc;
00233 pthread_cond_t* resultSignal;
00234 pthread_mutex_t* resultLock;
00235 bool halt;
00236 bool busy;
00237
00238 pthread_t processingThread;
00239 mutable pthread_mutex_t taskLock;
00240 mutable pthread_cond_t taskSignal;
00241
00242 CircularQueueList<T> tasks;
00243 CircularQueueList<R> results;
00244
00245
00246 };
00247
00248 #endif // _PROCESSING_QUEUE_H