00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "Scheduler.h"
00034 #include "TimeUtil.h"
00035
00036 Scheduler* Scheduler::scheduler = 0;
00037
00038 Scheduler::Scheduler() : running(true), next(0) {
00039 pthread_mutex_init(&slock, 0);
00040 }
00041
00042 Scheduler::~Scheduler() {
00043 haltScheduler();
00044 }
00045
00046 void Scheduler::haltScheduler() {
00047 assert(scheduler);
00048 scheduler->lock();
00049 bool r = scheduler->running;
00050 scheduler->running = false;
00051 scheduler->unlock();
00052 if (r) {
00053 scheduler->sig.signal();
00054 assert(pthread_join(scheduler->schedulerThread, NULL) == 0);
00055 }
00056 }
00057
00058 uint64_t Scheduler::schedule(TimerHandler& timer, uint64_t time, bool abs) {
00059 ADD_SELECTORS("Scheduler::schedule");
00060 ScopedLock sl(slock);
00061 if (!abs) {
00062 time += TimeUtil::timeu();
00063 }
00064
00065
00066 timers.insert(std::make_pair(time, &timer));
00067
00068 if (time < next || next == 0) {
00069
00070 sig.signal();
00071 }
00072
00073 return time;
00074 }
00075
00076 void Scheduler::cancel(TimerHandler& timer) {
00077 ADD_SELECTORS("Scheduler::cancel");
00078 ScopedLock sl(slock);
00079
00080
00081
00082 TimerMap::iterator i = timers.begin();
00083 while (i != timers.end()) {
00084
00085 if (i->second->getId() == timer.getId()) {
00086 timers.erase(i);
00087 if (!timers.empty()) {
00088 next = timers.begin()->first;
00089
00090 }
00091
00092
00093 return;
00094 }
00095 i++;
00096 }
00097 }
00098
00099 void Scheduler::runSchedulerThread() {
00100 ADD_SELECTORS("Scheduler::runSchedulerThread");
00101
00102 while (running) {
00103 uint64_t pending = 0;
00104
00105 lock();
00106 joinThreads(joinSet);
00107
00108 if (!timers.empty()) {
00109 pending = timers.begin()->first;
00110 next = pending;
00111
00112 }
00113 else {
00114 next = 0;
00115 unlock();
00116
00117 sig.wait();
00118 continue;
00119 }
00120 uint64_t now = TimeUtil::timeu();
00121 uint64_t sleeptime = next - now;
00122
00123 if ((now + CLOCK_RESOLUTION) > next) {
00124
00125 fireTimer(true);
00126 }
00127 else {
00128 unlock();
00129
00130 int n = sig.wait(sleeptime);
00131
00132 if (n) {
00133
00134 continue;
00135 }
00136 if (pending == next) {
00137
00138
00139 fireTimer(false);
00140 }
00141 }
00142 }
00143
00144 lock();
00145 joinThreads(joinSet);
00146 unlock();
00147 joinThreads(shutdownJoinSet);
00148 }
00149
00150 void Scheduler::fireTimer(bool locked) {
00151 ADD_SELECTORS("Scheduler::fireTimer");
00152
00153 if (!locked) {
00154 lock();
00155 }
00156 if (timers.empty()) {
00157 unlock();
00158
00159 return;
00160 }
00161 TimerMap::iterator i = timers.begin();
00162 TimerHandler* t = i->second;
00163 timers.erase(i);
00164 unlock();
00165
00166 t->fire();
00167 }
00168
00169 size_t Scheduler::joinThreads(IdThreadMap& s) {
00170 size_t joinCount = 0;
00171 for (IdThreadMap::const_iterator i = s.begin(); i != s.end(); i++) {
00172 assert(pthread_join(i->second, NULL) == 0);
00173 joinCount++;
00174 }
00175 if (joinCount) {
00176 s.clear();
00177 }
00178 return joinCount;
00179 }
00180
00181 void Scheduler::joinThread(uint64_t id, pthread_t tid) {
00182 ScopedLock sl(slock);
00183 if (running) {
00184 ASSERT(pthread_equal(shutdownJoinSet[id], tid));
00185 shutdownJoinSet.erase(id);
00186 joinSet[id] = tid;
00187 }
00188 }
00189
00190 void Scheduler::shutdownJoinThread(uint64_t id, pthread_t tid) {
00191 ScopedLock sl(slock);
00192
00193 if (running) {
00194 shutdownJoinSet[id] = tid;
00195 }
00196 }
00197
00198 void* Scheduler::startSchedulerThread(void* arg) {
00199 Scheduler* s = (Scheduler*)arg;
00200 s->runSchedulerThread();
00201 return 0;
00202 }
00203