00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <inttypes.h>
00034 #include "Log.h"
00035 #include "BufferedBlockManager.h"
00036 #include "ThreadCreate.h"
00037 #include "mace-macros.h"
00038 #include "HashUtil.h"
00039
00040 using namespace std;
00041
00042 BufferedBlockManager::BufferedBlockManager(FileBlockManager& m, uint capacity)
00043 : fbm(&m), bufferCache(capacity), isOpenFlag(false),
00044 processQueuesFlag(false) {
00045
00046 }
00047
00048 BufferedBlockManager::~BufferedBlockManager() {
00049 close();
00050 }
00051
00052 void BufferedBlockManager::prefetchBlock(uint64_t index) {
00053 Log::logf("BufferedBlockManager::prefetchBlock", 2, "enqueuing %" PRIu64, index);
00054 acquireQueueLock();
00055 readQueue.push_back(index);
00056 signalQueueData();
00057 releaseQueueLock();
00058 }
00059
00060 string BufferedBlockManager::getBlock(uint64_t index) {
00061 ADD_SELECTORS("BufferedBlockManager::getBlock");
00062 ASSERT(isOpen());
00063 macedbg(2) << "index=" << index << Log::endl;
00064 acquireQueueLock();
00065 readIntoCache(index);
00066 string tmp = bufferCache.get(index);
00067 releaseQueueLock();
00068
00069 return tmp;
00070 }
00071
00072 size_t BufferedBlockManager::setBlock(uint64_t index, const string& buf) {
00073 ADD_SELECTORS("BufferedBlockManager::setBlock");
00074 ASSERT(isOpen());
00075 macedbg(2) << "index=" << index << Log::endl;
00076
00077 acquireQueueLock();
00078 while (bufferCache.isFullDirty()) {
00079
00080 writeLastDirty();
00081 }
00082 bufferCache.add(index, buf, true);
00083 signalQueueData();
00084 releaseQueueLock();
00085 return buf.size();
00086 }
00087
00088 bool BufferedBlockManager::isOpen() const {
00089 return isOpenFlag;
00090 }
00091
00092 const string& BufferedBlockManager::getPath() const {
00093 return fbm->getPath();
00094 }
00095
00096 std::string BufferedBlockManager::getFileName() const {
00097 return fbm->getFileName();
00098 }
00099
00100 int BufferedBlockManager::open(const std::string& p, const char* mode) {
00101 if (isOpen()) {
00102 return 0;
00103 }
00104
00105 int openrc = fbm->open(p, mode);
00106 if (openrc != 0) {
00107 fprintf(stderr, "BufferedBlockManager::open: error opening FileBlockManager\n");
00108 return openrc;
00109 }
00110
00111 isOpenFlag = true;
00112
00113 Log::logf("BufferedBlockManager::open", 1,
00114 "blockSize=%zu blockCount=%" PRIu64 " size=%" PRIu64,
00115 getBlockSize(), getBlockCount(), getSize());
00116
00117
00118 processQueuesFlag = true;
00119
00120 pthread_attr_t threadAttr;
00121 pthread_attr_init(&threadAttr);
00122 pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
00123
00124 pthread_mutex_init(&queueLock, 0);
00125 pthread_cond_init(&queueCond, 0);
00126
00127 runNewThread(&queueThread, &BufferedBlockManager::startQueueThread, this, &threadAttr);
00128 pthread_attr_destroy(&threadAttr);
00129
00130 return 0;
00131 }
00132
00133 int BufferedBlockManager::close() {
00134 if (!isOpen()) {
00135 return 0;
00136 }
00137
00138 isOpenFlag = false;
00139
00140 flush();
00141
00142 acquireQueueLock();
00143 processQueuesFlag = false;
00144 signalQueueData();
00145 releaseQueueLock();
00146
00147 pthread_cond_destroy(&queueCond);
00148 pthread_mutex_destroy(&queueLock);
00149
00150 Log::log("BufferedBlockManager::close", 1, "all data written, closing");
00151 return fbm->close();
00152 }
00153
00154 int BufferedBlockManager::flush() {
00155 int ret;
00156
00157 Log::log("BufferedBlockManager::flush", 2, "flushing cache");
00158 acquireQueueLock();
00159 while (bufferCache.hasDirty()) {
00160 writeLastDirty();
00161 }
00162
00163 ret = fbm->flush();
00164 releaseQueueLock();
00165 return ret;
00166 }
00167
00168 size_t BufferedBlockManager::getBlockSize() const {
00169
00170 return fbm->getBlockSize();
00171 }
00172
00173 void BufferedBlockManager::setBlockSize(size_t s) {
00174 fbm->setBlockSize(s);
00175 }
00176
00177 void BufferedBlockManager::processQueues() {
00178 while (processQueuesFlag) {
00179 acquireQueueLock();
00180 while (!bufferCache.hasDirty() && readQueue.empty() && processQueuesFlag) {
00181
00182 waitForQueueData();
00183
00184 }
00185
00186 if (processQueuesFlag) {
00187 if (bufferCache.hasDirty()) {
00188 Log::log("BufferedBlockManager::processQueues", 3, "writing last dirty block");
00189 writeLastDirty();
00190 }
00191 else {
00192 ASSERT(!readQueue.empty());
00193 int index;
00194 do {
00195 index = readQueue.front();
00196 Log::logf("BufferedBlockManager::processQueues", 2, "dequeued %d", index);
00197 readQueue.pop_front();
00198 } while (!bufferCache.containsKey(index) && !readQueue.empty());
00199
00200 readIntoCache(index);
00201 }
00202 }
00203
00204 releaseQueueLock();
00205 }
00206
00207 }
00208
00209 void BufferedBlockManager::readIntoCache(uint index) {
00210
00211
00212 if (bufferCache.containsKey(index)) {
00213 return;
00214 }
00215
00216
00217 Log::logf("BufferedBlockManager::readIntoCache", 2, "reading block %d", index);
00218 string buf = fbm->getBlock(index);
00219
00220 while (bufferCache.isFullDirty()) {
00221
00222 writeLastDirty();
00223 }
00224 bufferCache.add(index, buf, false);
00225 }
00226
00227 void BufferedBlockManager::writeLastDirty() {
00228
00229
00230 int index = bufferCache.getLastDirtyKey();
00231 Log::logf("BufferedBlockManager::writeLastDirty", 2, "writing block %d", index);
00232 string buf = bufferCache.getDirty(index);
00233 bufferCache.clearDirty(index);
00234
00235 ASSERT(fbm->setBlock(index, buf) == buf.size());
00236 Log::logf("BufferedBlockManager::writeLastDirty", 2, "wrote %zu bytes", buf.size());
00237
00238 }
00239
00240 void BufferedBlockManager::acquireQueueLock() {
00241 ASSERT(pthread_mutex_lock(&queueLock) == 0);
00242 }
00243
00244 void BufferedBlockManager::signalQueueData() {
00245 ASSERT(pthread_cond_signal(&queueCond) == 0);
00246 }
00247
00248 void BufferedBlockManager::waitForQueueData() {
00249
00250 ASSERT(pthread_cond_wait(&queueCond, &queueLock) == 0);
00251 }
00252
00253 void BufferedBlockManager::releaseQueueLock() {
00254 ASSERT(pthread_mutex_unlock(&queueLock) == 0);
00255 }
00256
00257 void* BufferedBlockManager::startQueueThread(void* t) {
00258 BufferedBlockManager* self = (BufferedBlockManager*)t;
00259 ASSERT(self);
00260 self->processQueues();
00261
00262 return t;
00263 }
00264