00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <inttypes.h>
00034 #include "Accumulator.h"
00035 #include "TimeUtil.h"
00036 #include "SysUtil.h"
00037 #include "ThreadCreate.h"
00038 #include "mace-macros.h"
00039 #include "DummyServiceMapper.h"
00040
00041 using std::cerr;
00042 using std::endl;
00043
00044 Accumulator::AccumulatorMap Accumulator::instances;
00045 bool Accumulator::isLogging = false;
00046 pthread_t Accumulator::athread;
00047 pthread_mutex_t Accumulator::alock = PTHREAD_MUTEX_INITIALIZER;
00048 const std::string Accumulator::NETWORK_READ = "NETWORK_READ";
00049 const std::string Accumulator::NETWORK_READ_SELECTOR = "Accumulator::NETWORK_READ";
00050 const std::string Accumulator::NETWORK_WRITE = "NETWORK_WRITE";
00051 const std::string Accumulator::NETWORK_WRITE_SELECTOR = "Accumulator::NETWORK_WRITE";
00052 const std::string Accumulator::TCP_READ = "TCP_READ";
00053 const std::string Accumulator::TCP_READ_SELECTOR = "Accumulator::TCP_READ";
00054 const std::string Accumulator::TCP_WRITE = "TCP_WRITE";
00055 const std::string Accumulator::TCP_WRITE_SELECTOR = "Accumulator::TCP_WRITE";
00056 const std::string Accumulator::UDP_READ = "UDP_READ";
00057 const std::string Accumulator::UDP_READ_SELECTOR = "Accumulator::UDP_READ";
00058 const std::string Accumulator::UDP_WRITE = "UDP_WRITE";
00059 const std::string Accumulator::UDP_WRITE_SELECTOR = "Accumulator::UDP_WRITE";
00060 const std::string Accumulator::HTTP_CLIENT_READ = "HTTP_CLIENT_READ";
00061 const std::string Accumulator::HTTP_CLIENT_READ_SELECTOR =
00062 "Accumulator::HTTP_CLIENT_READ";
00063 const std::string Accumulator::HTTP_CLIENT_WRITE = "HTTP_CLIENT_WRITE";
00064 const std::string Accumulator::HTTP_CLIENT_WRITE_SELECTOR =
00065 "Accumulator::HTTP_CLIENT_WRITE";
00066 const std::string Accumulator::HTTP_SERVER_READ = "HTTP_SERVER_READ";
00067 const std::string Accumulator::HTTP_SERVER_READ_SELECTOR =
00068 "Accumulator::HTTP_SERVER_READ";
00069 const std::string Accumulator::HTTP_SERVER_WRITE = "HTTP_SERVER_WRITE";
00070 const std::string Accumulator::HTTP_SERVER_WRITE_SELECTOR =
00071 "Accumulator::HTTP_SERVER_WRITE";
00072 const std::string Accumulator::FILE_WRITE = "FILE_WRITE";
00073 const std::string Accumulator::FILE_WRITE_SELECTOR = "Accumulator::FILE_WRITE";
00074 const std::string Accumulator::TRANSPORT_SEND = "TRANSPORT_SEND";
00075 const std::string Accumulator::TRANSPORT_SEND_SELECTOR = "Accumulator::TRANSPORT_SEND";
00076 const std::string Accumulator::TRANSPORT_RECV = "TRANSPORT_RECV";
00077 const std::string Accumulator::TRANSPORT_RECV_SELECTOR = "Accumulator::TRANSPORT_RECV";
00078 const std::string Accumulator::TRANSPORT_RECV_CANCELED = "TRANSPORT_RECV_CANCELED";
00079 const std::string Accumulator::TRANSPORT_RECV_CANCELED_SELECTOR = "Accumulator::TRANSPORT_RECV_CANCELED";
00080 const std::string Accumulator::APPLICATION_SEND = "APPLICATION_SEND";
00081 const std::string Accumulator::APPLICATION_SEND_SELECTOR = "Accumulator::APPLICATION_SEND";
00082 const std::string Accumulator::APPLICATION_RECV = "APPLICATION_RECV";
00083 const std::string Accumulator::APPLICATION_RECV_SELECTOR = "Accumulator::APPLICATION_RECV";
00084 const std::string Accumulator::DB_WRITE_BYTES = "DB_WRITE_BYTES";
00085 const std::string Accumulator::DB_WRITE_BYTES_SELECTOR = "Accumulator::DB_WRITE_BYTES";
00086 const std::string Accumulator::DB_WRITE_COUNT = "DB_WRITE_COUNT";
00087 const std::string Accumulator::DB_WRITE_COUNT_SELECTOR = "Accumulator::DB_WRITE_COUNT";
00088 const std::string Accumulator::DB_ERASE_COUNT = "DB_ERASE_COUNT";
00089 const std::string Accumulator::DB_ERASE_COUNT_SELECTOR = "Accumulator::DB_ERASE_COUNT";
00090
00091 void* Accumulator::startLoggingThread(void* arg) {
00092 uint64_t interval = *(uint64_t*)arg;
00093
00094 while (isLogging) {
00095 uint64_t now = TimeUtil::timeu();
00096 uint64_t sleepTime = interval;
00097 if (interval == 1*1000*1000) {
00098 sleepTime -= (now % (1000 * 1000));
00099 if (sleepTime < 500*1000) {
00100 sleepTime += interval;
00101 }
00102 }
00103 SysUtil::sleepu(sleepTime);
00104 lock();
00105 for (AccumulatorMap::iterator i = instances.begin();
00106 i != instances.end(); i++) {
00107 Accumulator* a = i->second;
00108 Log::binaryLog(a->logId, AccumulatorLogObject(a->getAmount(), a->resetDiff()));
00109
00110
00111 }
00112 unlock();
00113 }
00114
00115 return 0;
00116 }
00117
00118 void Accumulator::dumpAll() {
00119 ADD_SELECTORS("Accumulator::dumpAll");
00120 for(AccumulatorMap::const_iterator i = instances.begin();
00121 i != instances.end(); i++) {
00122 maceLog("accumulator %s amount %" PRIu64,
00123 i->first.c_str(), i->second->getAmount());
00124 }
00125 }
00126
00127 void Accumulator::logAll() {
00128 for (AccumulatorMap::const_iterator i = instances.begin();
00129 i != instances.end(); i++) {
00130 const Accumulator* a = i->second;
00131 Log::binaryLog(a->logId, AccumulatorLogObject(a->getAmount(), a->getDiff()));
00132
00133
00134 }
00135 }
00136
00137 void Accumulator::startLogging(uint64_t interval) {
00138 static uint64_t sint = 0;
00139 if (!isLogging) {
00140 isLogging = true;
00141 sint = interval;
00142 pthread_attr_t threadAttr;
00143 pthread_attr_init(&threadAttr);
00144 pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
00145
00146 runNewThread(&athread, Accumulator::startLoggingThread, &sint, &threadAttr);
00147 }
00148 else {
00149 Log::err() << "cannot start Accumulator logging more than once" << Log::endl;
00150 }
00151 }
00152
00153 void Accumulator::stopLogging() {
00154 isLogging = false;
00155 }
00156
00157 mace::BinaryLogObject* accumulatorLogFactory() {
00158 return new AccumulatorLogObject();
00159 }
00160
00161 void AccumulatorLogObject::init() {
00162 AccumulatorLogObject objPtr;
00163 mace::DummyServiceMapper mapper;
00164 mapper.addFactory(objPtr.getLogType(), &accumulatorLogFactory);
00165 }