00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #ifndef _MACE_DB_H
00034 #define _MACE_DB_H
00035
00042 #ifdef NO_DB_CXX
00043 #error "db_cxx.h could not be found. Reconfigure Mace with the path to db_cxx"
00044 #endif
00045
00046 #include <db_cxx.h>
00047 #include <errno.h>
00048 #include <string>
00049 #include <sstream>
00050 #include <cassert>
00051 #include <boost/shared_ptr.hpp>
00052 #include "Serializable.h"
00053 #include "ScopedLock.h"
00054 #include "basemap.h"
00055 #include "massert.h"
00056 #include "FileUtil.h"
00057 #include "Log.h"
00058 #include "Accumulator.h"
00059
00060 #if DB_VERSION_MINOR < 3
00061 #define DB_BUFFER_SMALL ENOMEM
00062 #endif
00063 namespace mace {
00064
00071
00072
00074 template<class Key, class Data>
00075 class DB : public virtual Map<Key, Data>, public virtual PrintPrintable {
00076 public:
00077 typedef Map<Key, Data> baseMap;
00078 typedef typename baseMap::const_iterator const_iterator;
00079 typedef typename baseMap::inner_iterator inner_iterator;
00080
00081 public:
00082 class _inner_iterator : public virtual inner_iterator {
00083 friend class DB<Key, Data>;
00084
00085 public:
00086 typedef typename baseMap::ipair ipair;
00087 typedef typename baseMap::const_key_ipair const_key_ipair;
00088 typedef boost::shared_ptr<const_key_ipair> kdpairptr;
00089
00090 public:
00091 virtual ~_inner_iterator() {
00092 close();
00093 if (kbuf) {
00094 delete[] kbuf;
00095 }
00096 if (vbuf) {
00097 delete[] vbuf;
00098 }
00099 }
00100
00101 protected:
00102 const const_key_ipair* arrow() const {
00103 ASSERT(c);
00104 return &(*cur);
00105 }
00106
00107 const const_key_ipair& star() const {
00108 ASSERT(c);
00109 return *cur;
00110 }
00111
00112 void next() const {
00113 if (c) {
00114 readCur(DB_NEXT);
00115 }
00116 }
00117
00118 bool equals(const inner_iterator& right) const {
00119 return c == dynamic_cast<const _inner_iterator&>(right).c;
00120 }
00121
00122 void assign(const inner_iterator& other) {
00123 copy(dynamic_cast<const _inner_iterator&>(other));
00124 }
00125
00126 inner_iterator* clone() const {
00127 return new _inner_iterator(*this);
00128 }
00129
00130 void erase() {
00131 ASSERT(c);
00132 ASSERT(c->del(0) == 0);
00133 int r = readDbc(DB_NEXT);
00134 if (r == DB_NOTFOUND) {
00135 close();
00136 return;
00137 }
00138 ASSERT(r == 0);
00139 }
00140
00141 private:
00142 _inner_iterator(const _inner_iterator& other) :
00143 c(0), kbuf(0), vbuf(0), kused(0), ksize(0), vsize(0),
00144 KEY_SIZE(other.KEY_SIZE), DATA_SIZE(other.DATA_SIZE) {
00145
00146 copy(other);
00147 }
00148
00149 _inner_iterator(Dbc* c, size_t ksize, size_t dsize) :
00150 c(c), kbuf(0), vbuf(0), kused(0), ksize(0), vsize(0), KEY_SIZE(ksize),
00151 DATA_SIZE(dsize) {
00152
00153 next();
00154 }
00155
00156 _inner_iterator(Dbc* c, const Key& sk, size_t ksize, size_t dsize,
00157 u_int32_t flags = DB_SET) :
00158 c(c), kbuf(0), vbuf(0), kused(0), ksize(0), vsize(0), KEY_SIZE(ksize),
00159 DATA_SIZE(dsize) {
00160
00161 ASSERT(c);
00162 setCur(sk, flags);
00163 }
00164
00165 void copy(const _inner_iterator& other) {
00166 if (other.c) {
00167 other.c->dup(&c, DB_POSITION);
00168 }
00169
00170 if (c) {
00171 readCur(DB_CURRENT);
00172 }
00173 }
00174
00175 int readDbc(u_int32_t flags) const {
00176 ASSERT(c);
00177
00178 allocKbuf();
00179
00180 if (vbuf == 0) {
00181 if (DATA_SIZE) {
00182 vsize = DATA_SIZE;
00183 }
00184 else {
00185 vsize = DEFAULT_DATA_SIZE;
00186 }
00187 vbuf = new char[vsize];
00188 }
00189
00190 DB::initDbt(k, kbuf, ksize);
00191 if (flags == DB_SET || flags == DB_SET_RANGE) {
00192 k.set_size(kused);
00193 }
00194
00195 DB::initDbt(v, vbuf, vsize);
00196
00197 while (true) {
00198 try {
00199 int r = c->get(&k, &v, flags);
00200 return r;
00201 }
00202 catch (DbException& e) {
00203 if (e.get_errno() == DB_BUFFER_SMALL || e.get_errno() == ENOMEM) {
00204 if (k.get_size() > ksize) {
00205 ASSERT(KEY_SIZE == 0);
00206 ksize = k.get_size();
00207 delete[] kbuf;
00208 kbuf = new char[ksize];
00209 DB::initDbt(k, kbuf, ksize);
00210 }
00211 if (v.get_size() > vsize) {
00212 ASSERT(DATA_SIZE == 0);
00213 vsize = v.get_size();
00214 delete[] vbuf;
00215 vbuf = new char[vsize];
00216 DB::initDbt(v, vbuf, vsize);
00217 }
00218 }
00219 else {
00220 throw e;
00221 }
00222 }
00223 }
00224 }
00225
00226 void allocKbuf(size_t sz = 0) const {
00227 if (sz > 0) {
00228 ASSERT(KEY_SIZE == 0 || sz <= KEY_SIZE);
00229 }
00230 if (kbuf == 0) {
00231 if (KEY_SIZE) {
00232 ksize = KEY_SIZE;
00233 }
00234 else {
00235 ksize = DEFAULT_KEY_SIZE;
00236 if (sz > ksize) {
00237 ksize = sz;
00238 }
00239 }
00240 kbuf = new char[ksize];
00241 }
00242 }
00243
00244 void readCur(u_int32_t flags) const {
00245 int r = readDbc(flags);
00246 if (r == DB_NOTFOUND) {
00247 close();
00248 return;
00249 }
00250 ASSERT(r == 0);
00251 fillCur();
00252 }
00253
00254 void setCur(const Key& kt, u_int32_t flags = DB_SET) {
00255 std::string sk = serialize(&kt);
00256 allocKbuf(sk.size());
00257 kused = sk.size();
00258 memcpy(kbuf, sk.data(), sk.size());
00259 readCur(flags);
00260 }
00261
00262 void fillCur() const {
00263 kused = k.get_size();
00264 std::string sk((const char*)k.get_data(), kused);
00265 deserialize(sk, &ktmp);
00266 std::string s((const char*)v.get_data(), v.get_size());
00267 deserialize(s, &dtmp);
00268 cur = kdpairptr(new const_key_ipair(ktmp, dtmp));
00269 }
00270
00271 void close() const {
00272 if (c) {
00273 ASSERT(c->close() == 0);
00274 c = 0;
00275 }
00276 }
00277
00278 private:
00279 mutable Dbc* c;
00280 mutable kdpairptr cur;
00281 mutable Key ktmp;
00282 mutable Data dtmp;
00283 mutable char *kbuf;
00284 mutable char *vbuf;
00285 mutable size_t kused;
00286 mutable size_t ksize;
00287 mutable size_t vsize;
00288 const size_t KEY_SIZE;
00289 const size_t DATA_SIZE;
00290 mutable Dbt k;
00291 mutable Dbt v;
00292 };
00293
00294 public:
00295 DB(uint32_t cacheSize = 0, bool autosync = true, size_t keySize = 0,
00296 size_t dataSize = 0, const std::string& errorPrefix = "mace::DB: ") :
00297 isOpen(false), privateEnv(true), alwaysSync(autosync), dbenv(0), db(0),
00298 KEY_SIZE(keySize), DATA_SIZE(dataSize), databuf(0) {
00299
00300 dbenv = new DbEnv(0);
00301
00302 init(cacheSize, errorPrefix);
00303 }
00304
00305 DB(DbEnv* env, uint32_t cacheSize, bool autosync,
00306 size_t keySize = 0, size_t dataSize = 0,
00307 const std::string& errorPrefix = "mace::DB: ") :
00308 isOpen(false), privateEnv(false), alwaysSync(autosync), dbenv(env), db(0),
00309 KEY_SIZE(keySize), DATA_SIZE(dataSize), databuf(0) {
00310
00311 ASSERT(dbenv);
00312 init(cacheSize, errorPrefix);
00313 }
00314
00315 DB(DbEnv* env, bool autosync, size_t keySize = 0, size_t dataSize = 0) :
00316 isOpen(false), privateEnv(false), alwaysSync(autosync), dbenv(env), db(0),
00317 KEY_SIZE(keySize), DATA_SIZE(dataSize), databuf(0) {
00318
00319 ASSERT(dbenv);
00320 init();
00321 }
00322
00323 DB(size_t keySize, size_t dataSize) :
00324 isOpen(false), privateEnv(false), alwaysSync(false), dbenv(0), db(0),
00325 KEY_SIZE(keySize), DATA_SIZE(dataSize), databuf(0) {
00326 init();
00327 }
00328
00329 virtual ~DB() {
00330 close();
00331 if (DATA_SIZE) {
00332 delete[] databuf;
00333 databuf = 0;
00334 }
00335 if (privateEnv) {
00336 delete dbenv;
00337 }
00338 dbenv = 0;
00339 delete db;
00340 db = 0;
00341 }
00342
00343 virtual void open(const std::string& dir, const std::string& file,
00344 DBTYPE dbtype = DB_BTREE, bool create = false) {
00345 ScopedLock sl(lock);
00346
00347 if (create && !FileUtil::fileExists(dir)) {
00348 FileUtil::mkdir(dir, 0755, true);
00349 }
00350
00351 ASSERT(!isOpen);
00352
00353
00354
00355
00356 ASSERT(dbenv->open(dir.c_str(),
00357 DB_CREATE | DB_INIT_LOG | DB_INIT_MPOOL |
00358 DB_INIT_TXN | DB_RECOVER |
00359
00360 DB_PRIVATE, 0) == 0);
00361
00362 dbfile = file;
00363 _open(dbtype);
00364 }
00365
00366 virtual void open(const std::string& file, DBTYPE dbtype = DB_BTREE) {
00367 ScopedLock sl(lock);
00368
00369 ASSERT(!isOpen);
00370 ASSERT(!privateEnv);
00371
00372 dbfile = file;
00373 _open(dbtype);
00374 }
00375
00376 virtual void open(DBTYPE dbtype = DB_BTREE) {
00377 ScopedLock sl(lock);
00378 ASSERT(!isOpen);
00379 ASSERT(!privateEnv);
00380 ASSERT(dbenv == 0);
00381
00382 _open(dbtype);
00383 }
00384
00385 virtual void close() {
00386 ScopedLock sl(lock);
00387 if (isOpen) {
00388 isOpen = false;
00389 db->close(0);
00390 if (privateEnv) {
00391 dbenv->close(0);
00392 }
00393 }
00394 }
00395
00396 virtual bool get(const Key& kt, Data& d) const {
00397 static char buf[DEFAULT_DATA_SIZE];
00398 ASSERT(isOpen);
00399
00400 std::string sk = serialize(&kt);
00401 Dbt k((void*)sk.data(), sk.size());
00402
00403 int r = 0;
00404 char* dbuf = 0;
00405 bool del = false;
00406 Dbt v;
00407 if (DATA_SIZE) {
00408 dbuf = databuf;
00409 initDbt(v, dbuf, DATA_SIZE);
00410
00411 r = db->get(NULL, &k, &v, 0);
00412 }
00413 else {
00414 dbuf = buf;
00415 initDbt(v, dbuf, DEFAULT_DATA_SIZE);
00416
00417 try {
00418 r = db->get(NULL, &k, &v, 0);
00419 }
00420 catch (DbMemoryException& e) {
00421 if (e.get_errno() == DB_BUFFER_SMALL || e.get_errno() == ENOMEM) {
00422 size_t sz = v.get_size();
00423 dbuf = new char[sz];
00424 del = true;
00425 initDbt(v, dbuf, sz);
00426 r = db->get(NULL, &k, &v, 0);
00427 }
00428 else {
00429 throw e;
00430 }
00431 }
00432 catch (DbException& e) {
00433 throw e;
00434 }
00435 }
00436
00437 if (r == DB_NOTFOUND) {
00438 if (del) {
00439 delete[] dbuf;
00440 }
00441 return false;
00442 }
00443 ASSERT(r == 0);
00444
00445 std::string s(dbuf, v.get_size());
00446 deserialize(s, &d);
00447
00448 if (del) {
00449 delete[] dbuf;
00450 }
00451
00452 return true;
00453 }
00454
00455 virtual bool containsKey(const Key& kt) const {
00456 Data d;
00457 return get(kt, d);
00458 }
00459
00460 virtual void put(const Key& kt, const Data& d) {
00461 static Accumulator* dbWriteBytes = Accumulator::Instance(Accumulator::DB_WRITE_BYTES);
00462 static Accumulator* dbWriteCount = Accumulator::Instance(Accumulator::DB_WRITE_COUNT);
00463
00464 ASSERT(isOpen);
00465
00466
00467
00468 std::string sk = serialize(&kt);
00469 if (KEY_SIZE) {
00470 ASSERT(sk.size() <= KEY_SIZE);
00471 }
00472
00473 Dbt k((void*)sk.data(), sk.size());
00474
00475 std::string s = serialize(&d);
00476 if (DATA_SIZE) {
00477 ASSERT(s.size() <= DATA_SIZE);
00478 }
00479
00480
00481 Dbt v((void*)s.data(), s.size());
00482
00483 ASSERT(db->put(NULL, &k, &v, 0) == 0);
00484 dbWriteCount->accumulate(1);
00485 dbWriteBytes->accumulate(s.size());
00486
00487 if (alwaysSync) {
00488 sync();
00489 }
00490 }
00491
00492 virtual void sync() {
00493 ASSERT(isOpen);
00494 ASSERT(db->sync(0) == 0);
00495 }
00496
00497 virtual baseMap& assign(const baseMap& other) {
00498 ABORT("XXX assign not supported");
00499 }
00500
00501 virtual void clear() {
00502 ASSERT(isOpen);
00503 DBTYPE dbtype;
00504 ASSERT(db->get_type(&dbtype) == 0);
00505 db->close(0);
00506 if (dbenv) {
00507 ASSERT(dbenv->dbremove(0, dbfile.empty() ? NULL : dbfile.c_str(), NULL, 0) == 0);
00508 }
00509 else {
00510 ABORT("clear not supported yet");
00511 ASSERT(db->remove(dbfile.empty() ? NULL : dbfile.c_str(), NULL, 0) == 0);
00512 }
00513 delete db;
00514 db = 0;
00515 _open(dbtype);
00516 }
00517
00518 virtual size_t erase(const Key& kt) {
00519 static Accumulator* dbEraseCount = Accumulator::Instance(Accumulator::DB_ERASE_COUNT);
00520 ASSERT(isOpen);
00521
00522 std::string sk = serialize(&kt);
00523
00524 Dbt k((void*)sk.data(), sk.size());
00525 int r = db->del(NULL, &k, 0);
00526 dbEraseCount->accumulate(1);
00527 ASSERT(r == 0 || r == DB_NOTFOUND);
00528 if (alwaysSync) {
00529 sync();
00530 }
00531 return (r == 0 ? 1 : 0);
00532 }
00533
00534 virtual const_iterator erase(const_iterator i) {
00535 static Accumulator* dbEraseCount = Accumulator::Instance(Accumulator::DB_ERASE_COUNT);
00536 dbEraseCount->accumulate(1);
00537 return i.erase();
00538 }
00539
00540 virtual size_t size() const {
00541 size_t s = 0;
00542 for (const_iterator i = begin(); i != end(); i++) {
00543 s++;
00544 }
00545 return s;
00546 }
00547
00548 virtual bool empty() const {
00549 return begin() == end();
00550 }
00551
00552 const_iterator begin() const {
00553 Dbc* c;
00554 ASSERT(db->cursor(NULL, &c, 0) == 0);
00555 return const_iterator(
00556 typename const_iterator::IteratorPtr(
00557 new _inner_iterator(c, KEY_SIZE, DATA_SIZE)));
00558 }
00559
00560 const_iterator end() const {
00561 return const_iterator(
00562 typename const_iterator::IteratorPtr(
00563 new _inner_iterator(0, KEY_SIZE, DATA_SIZE)));
00564 }
00565
00566 const_iterator find(const Key& kt) const {
00567 Dbc* c;
00568 ASSERT(db->cursor(NULL, &c, 0) == 0);
00569 return const_iterator(
00570 typename const_iterator::IteratorPtr(
00571 new _inner_iterator(c, kt, KEY_SIZE, DATA_SIZE)));
00572 }
00573
00574 const_iterator lower_bound(const Key& kt) const {
00575 Dbc* c;
00576 ASSERT(db->cursor(NULL, &c, 0) == 0);
00577 return const_iterator(
00578 typename const_iterator::IteratorPtr(
00579 new _inner_iterator(c, kt, KEY_SIZE, DATA_SIZE, DB_SET_RANGE)));
00580 }
00581
00582 bool equals(const baseMap& other) const {
00583 ABORT("equals not implemented");
00584 }
00585
00586 void print(std::ostream& printer) const {
00587
00588 printer << "mdb<does not support printing!>";
00589 }
00590
00591 void print(PrintNode& printer, const std::string& name) const {
00592 printMap(printer, name, "DB<" + getTypeName() + ">", begin(), end());
00593 }
00594
00595 const std::string& getTypeName() const {
00596 const char* types[] = { "Key", "Data", 0 };
00597 static const StrUtilNamespace::StdStringList myTypes = StrUtilNamespace::getTypeFromTemplate(__PRETTY_FUNCTION__, types);
00598 static string r = myTypes[0]+"->"+myTypes[1];
00599 return r;
00600 }
00601
00602
00603 protected:
00604 void init() {
00605 pthread_mutex_init(&lock, 0);
00606 if (DATA_SIZE) {
00607 databuf = new char[DATA_SIZE];
00608 }
00609 }
00610
00611 void init(uint32_t cacheSize, const std::string& errorPrefix) {
00612 init();
00613
00614 dbenv->set_error_stream(&std::cerr);
00615 dbenv->set_errpfx(errorPrefix.c_str());
00616
00617 if (cacheSize) {
00618 ASSERT(dbenv->set_cachesize(0, cacheSize, 1) == 0);
00619 }
00620 }
00621
00622 static void initDbt(Dbt& dbt, char* buf, size_t sz) {
00623 dbt.set_data(buf);
00624 dbt.set_size(sz);
00625 dbt.set_ulen(sz);
00626 dbt.set_flags(DB_DBT_USERMEM);
00627 }
00628
00629 virtual void _open(DBTYPE dbtype) {
00630 if (db == 0) {
00631 db = new Db(dbenv, 0);
00632 db->set_error_stream(&std::cerr);
00633 }
00634
00635
00636 ASSERT(db->open(NULL, dbfile.empty() ? NULL : dbfile.c_str(), NULL, dbtype,
00637
00638 DB_CREATE, 0) == 0);
00639
00640
00641
00642 isOpen = true;
00643 }
00644
00645
00646 private:
00647 bool isOpen;
00648 bool privateEnv;
00649 bool alwaysSync;
00650 std::string dbfile;
00651 DbEnv *dbenv;
00652 Db *db;
00653
00654 const size_t KEY_SIZE;
00655 const size_t DATA_SIZE;
00656 char* databuf;
00657
00658 pthread_mutex_t lock;
00659
00660 static const size_t DEFAULT_KEY_SIZE = 1024;
00661 static const size_t DEFAULT_DATA_SIZE = 4096;
00662
00663 };
00664
00667 }
00668
00669 #endif // _MACE_DB_H