00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "CommunicationRecorder.h"
00026
00027 namespace cmlabs {
00028
00029
00030
00031
00032
00033 CommunicationRecorder::CommunicationRecorder(const JString& storage, bool overwrite) : JThread() {
00034 resetFlag = false;
00035 overwriteRecording = overwrite;
00036 shouldContinue = true;
00037 accessType = COMRECNOINIT;
00038 if ( (storage.endsWith("/")) || (storage.endsWith("\\")) )
00039 this->storage = storage.substring(0, storage.length()-2);
00040 else
00041 this->storage = storage;
00042 indexFilename = "_index.xml";
00043 lastMsgIndex = -1;
00044 lastSampleIndex = -1;
00045 masterOffset = 0;
00046 paused = false;
00047 }
00048
00049 CommunicationRecorder::~CommunicationRecorder() {
00050 finishWriting(3000);
00051 shouldContinue = false;
00052 wait(50);
00053 terminateWait(50);
00054 }
00055
00056 Object* CommunicationRecorder::clone() const {
00057 return NULL;
00058 }
00059
00060 bool CommunicationRecorder::initReading() {
00061 accessType = COMRECREADER;
00062
00063
00064 if (!JFile::doesADirExist(storage))
00065 return false;
00066
00067 if (!loadIndexFile())
00068 return false;
00069 return true;
00070 }
00071
00072 InfoItem* CommunicationRecorder::getRecordingInfo(const JString& storage) {
00073 if (!JFile::doesADirExist(storage))
00074 return false;
00075 InfoItem* info = getIndexFileInfo(storage, "_index.xml");
00076 if (info == NULL)
00077 return NULL;
00078 Collection files = JFile::getFilesInADir(storage);
00079 int msgFiles = 0, sampleFiles = 0, binFiles = 0;
00080 for (JString file = files.getFirst(); file.length() > 0; file = files.getNext()) {
00081 if (file.endsWith(".msg"))
00082 msgFiles++;
00083 else if (file.endsWith(".sample"))
00084 sampleFiles++;
00085 else if (file.endsWith(".bin"))
00086 binFiles++;
00087 }
00088
00089 info->setEntry("Number of Messages", JString(msgFiles));
00090
00091 info->setEntry("Number of DataSamples", JString(sampleFiles));
00092
00093 info->setEntry("Number of Binary Attachments", JString(binFiles));
00094 return info;
00095 }
00096
00097 bool CommunicationRecorder::initWriting() {
00098 accessType = COMRECWRITER;
00099
00100
00101 int n = 1;
00102 if (JFile::doesADirExist(storage)) {
00103
00104 if (overwriteRecording) {
00105 if (!JFile::deleteADir(storage, true))
00106 return false;
00107 }
00108 else {
00109
00110 JString moveTo = JString::format("%s.bak", (char*) storage);
00111 while (JFile::doesADirExist(moveTo))
00112 moveTo = JString::format("%s.bak%d", (char*) storage, n++);
00113 if (!JFile::moveADir(storage, moveTo))
00114 return false;
00115 }
00116 }
00117
00118 if (!JFile::createADir(storage, true))
00119 return false;
00120 if (!saveIndexFile())
00121 return false;
00122
00123
00124 return true;
00125 }
00126
00127 void CommunicationRecorder::run() {
00128
00129 JTime lastMessageWarning;
00130 JTime lastSampleWarning;
00131 Message* msg;
00132 DataSample* sample;
00133 JString basefilename;
00134 while (shouldContinue) {
00135 if ( (msg = (Message*) msgQueue.waitForNewEntry(20)) != NULL ) {
00136
00137 lastMsgIndex++;
00138
00139 basefilename = JString::format("%s/Message%.6d", (char*) storage, lastMsgIndex);
00140 if (!saveMessage(basefilename, msg)) {
00141 printf("CommunicationRecorder: Error saving '%s' to '%d'\n",
00142 (char*) basefilename, (char*) storage);
00143 delete(msg);
00144 }
00145 }
00146 if ( (sample = (DataSample*) sampleQueue.waitForNewEntry(20)) != NULL ) {
00147
00148 lastSampleIndex++;
00149
00150 basefilename = JString::format("%s/Sample%.6d", (char*) storage, lastSampleIndex);
00151 if (!saveSample(basefilename, sample)) {
00152 printf("CommunicationRecorder: Error saving '%s' to '%d'\n",
00153 (char*) basefilename, (char*) storage);
00154 delete(sample);
00155 }
00156 }
00157 if ((msgQueue.getCount() > 100) && (lastMessageWarning.getAge() > 3000)) {
00158 printf("CommunicationRecorder: Message Queue Warning - %d messages waiting to be written to disk\n", msgQueue.getCount());
00159 lastMessageWarning.reset();
00160 }
00161 if ((sampleQueue.getCount() > 100) && (lastSampleWarning.getAge() > 3000)) {
00162 printf("CommunicationRecorder: Stream Queue Warning - %d samples waiting to be written to disk\n", sampleQueue.getCount());
00163 lastSampleWarning.reset();
00164 }
00165 }
00166 }
00167
00168 bool CommunicationRecorder::wasReset() {
00169 if (!resetFlag)
00170 return false;
00171 resetFlag = false;
00172 return true;
00173 }
00174
00175 bool CommunicationRecorder::isPaused() {
00176 return paused;
00177 }
00178
00179 bool CommunicationRecorder::isInitialised() {
00180 return (accessType != COMRECNOINIT);
00181 }
00182
00183 bool CommunicationRecorder::isWriter() {
00184 return (accessType == COMRECWRITER);
00185 }
00186
00187 bool CommunicationRecorder::isReader() {
00188 return (accessType == COMRECREADER);
00189 }
00190
00191
00192
00193 bool CommunicationRecorder::setNewStorageLocation(const JString& storage) {
00194 if (storage.length() == 0)
00195 return false;
00196 if ( (storage.endsWith("/")) || (storage.endsWith("\\")) )
00197 this->storage = storage.substring(0, storage.length()-2);
00198 else
00199 this->storage = storage;
00200 indexFilename = "_index.xml";
00201 lastMsgIndex = -1;
00202 lastSampleIndex = -1;
00203 masterOffset = 0;
00204 resetFlag = true;
00205 return true;
00206 }
00207
00208 bool CommunicationRecorder::startRecording() {
00209 paused = false;
00210 return true;
00211 }
00212
00213 bool CommunicationRecorder::pauseRecording() {
00214 paused = true;
00215 return true;
00216 }
00217
00218 bool CommunicationRecorder::restartRecording() {
00219 paused = false;
00220 lastMsgIndex = -1;
00221 lastSampleIndex = -1;
00222 resetFlag = true;
00223 return initWriting();
00224 }
00225
00226 bool CommunicationRecorder::startPlayback() {
00227 paused = false;
00228 return true;
00229 }
00230
00231 bool CommunicationRecorder::pausePlayback() {
00232 paused = true;
00233 return true;
00234 }
00235
00236 bool CommunicationRecorder::restartPlayback() {
00237 paused = false;
00238 resetFlag = true;
00239 lastMsgIndex = -1;
00240 lastSampleIndex = -1;
00241 return initReading();
00242 }
00243
00244
00245
00246 long CommunicationRecorder::getCurrentOffset() {
00247 return currentStartTime.getAge();
00248 }
00249
00250 JTime CommunicationRecorder::getStartTime() {
00251 return currentStartTime;
00252 }
00253
00254 JTime CommunicationRecorder::calcOriginalTime(const JTime& time) {
00255 return time - masterOffset;
00256 }
00257
00258 JTime CommunicationRecorder::calcCurrentTime(const JTime& originalTime) {
00259 return originalTime + masterOffset;
00260 }
00261
00262 int CommunicationRecorder::getMessageCount() {
00263 return lastMsgIndex;
00264 }
00265
00266 int CommunicationRecorder::getSampleCount() {
00267 return lastSampleIndex;
00268 }
00269
00270 JTime CommunicationRecorder::getFirstMessageTime() {
00271
00272 return JTime::createInvalid();
00273 }
00274
00275 JTime CommunicationRecorder::getLastMessageTime() {
00276
00277 return JTime::createInvalid();
00278 }
00279
00280 JTime CommunicationRecorder::getFirstSampleTime() {
00281
00282 return JTime::createInvalid();
00283 }
00284
00285 JTime CommunicationRecorder::getLastSampleTime() {
00286
00287 return JTime::createInvalid();
00288 }
00289
00290 long CommunicationRecorder::getFirstMessageOffset() {
00291
00292 return -1;
00293 }
00294
00295 long CommunicationRecorder::getLastMessageOffset() {
00296
00297 return -1;
00298 }
00299
00300 long CommunicationRecorder::getFirstSampleOffset() {
00301
00302 return -1;
00303 }
00304
00305 long CommunicationRecorder::getLastSampleOffset() {
00306
00307 return -1;
00308 }
00309
00310
00311 bool CommunicationRecorder::addNewMessage(Message* msg) {
00312 if (msg == NULL)
00313 return false;
00314 msgQueue.add(msg);
00315 return true;
00316 }
00317
00318 bool CommunicationRecorder::addNewSample(DataSample* sample) {
00319 if (sample == NULL)
00320 return false;
00321 sampleQueue.add(sample);
00322 return true;
00323 }
00324
00325
00326 Message* CommunicationRecorder::retrieveMessage(int n) {
00327 return loadMessage(JString::format("%s/Message%.6d", (char*) storage, n));
00328 }
00329
00330 DataSample* CommunicationRecorder::retrieveSample(int n) {
00331 return loadSample(JString::format("%s/Sample%.6d", (char*) storage, n));
00332 }
00333
00334 Message* CommunicationRecorder::retrieveNextMessage(const JTime& time) {
00335
00336 return NULL;
00337 }
00338
00339 DataSample* CommunicationRecorder::retrieveNextSample(const JTime& time) {
00340
00341 return NULL;
00342 }
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352 bool CommunicationRecorder::saveSegment(int start, int end) {
00353
00354 return false;
00355 }
00356 bool CommunicationRecorder::loadSegment(int start, int end) {
00357
00358 return false;
00359 }
00360
00361
00362
00363 Message* CommunicationRecorder::loadMessage(const JString& basefilename) {
00364
00365 JString xml = JFile::readAFileASCII(JString::format("%s.msg", (char*) basefilename));
00366 if (xml.length() == 0)
00367 return NULL;
00368
00369 Message* msg = new Message();
00370 if (!msg->fromXML(xml)) {
00371 delete(msg);
00372 return NULL;
00373 }
00374
00375
00376 char* buffer;
00377 int size;
00378 int i = 0;
00379
00380 buffer = JFile::readAFileBinary(JString::format("%s.%d.bin", (char*) basefilename, i), size);
00381 while ( (buffer != NULL) && (size > 0)) {
00382 msg->fromBinaryBuffer(i, buffer, size);
00383 delete [] buffer;
00384 i++;
00385 buffer = JFile::readAFileBinary(JString::format("%s.%d.bin", (char*) basefilename, i), size);
00386 }
00387
00388
00389 if (msg->postedTime.isValid())
00390 msg->postedTime += masterOffset;
00391 if (msg->receivedTime.isValid())
00392 msg->receivedTime += masterOffset;
00393
00394 return msg;
00395 }
00396
00397 DataSample* CommunicationRecorder::loadSample(const JString& basefilename) {
00398
00399 int size;
00400 char* buffer = JFile::readAFileBinary(JString::format("%s.sample", (char*) basefilename), size);
00401
00402 if ( (buffer == NULL) || (size <= 0)) {
00403 delete [] buffer;
00404 return NULL;
00405 }
00406
00407 DataSample* sample = new DataSample();
00408 if (!sample->fromBinaryBuffer(0, buffer, size)) {
00409 delete [] buffer;
00410 delete(sample);
00411 return NULL;
00412 }
00413 delete [] buffer;
00414
00415
00416 if (sample->timestamp.isValid())
00417 sample->timestamp += masterOffset;
00418
00419 return sample;
00420 }
00421
00422
00423 bool CommunicationRecorder::saveMessage(const JString& basefilename, Message* msg) {
00424 if (msg == NULL) return false;
00425
00426 char* buffer;
00427 int binChunkCount = msg->getBinaryChunkCount();
00428 int maxBinSize = 0;
00429 JString filename;
00430
00431 if (binChunkCount > 0) {
00432 long* bufSizes = new long[binChunkCount];
00433 long subBinSize;
00434 int i;
00435 for (i=0; i<binChunkCount; i++) {
00436 subBinSize = msg->getBinarySize(i);
00437 bufSizes[i] = subBinSize;
00438 if (subBinSize > maxBinSize)
00439 maxBinSize = subBinSize;
00440 }
00441 buffer = new char[maxBinSize];
00442 for (i=0; i<binChunkCount; i++) {
00443 subBinSize = bufSizes[i];
00444 msg->toBinaryBuffer(i, buffer, maxBinSize);
00445 if (!JFile::writeAFileBinary(JString::format("%s.%d.bin", (char*) basefilename, i), buffer, subBinSize)) {
00446 delete [] bufSizes;
00447 delete [] buffer;
00448 return false;
00449 }
00450 }
00451 delete [] bufSizes;
00452 delete [] buffer;
00453 }
00454
00455 JString xml = msg->toXML();
00456 if (!JFile::writeAFileASCII(JString::format("%s.msg", (char*) basefilename), xml))
00457 return false;
00458
00459 delete(msg);
00460 return true;
00461 }
00462
00463 bool CommunicationRecorder::saveSample(const JString& basefilename, DataSample* sample) {
00464
00465 if (sample == NULL) return false;
00466
00467 long size = sample->getBinarySize(0);
00468 char* buffer = new char[size];
00469 sample->toBinaryBuffer(0, buffer, size);
00470 if (!JFile::writeAFileBinary(JString::format("%s.sample", (char*) basefilename), buffer, size)) {
00471 delete [] buffer;
00472 return false;
00473 }
00474
00475 delete [] buffer;
00476 delete(sample);
00477 return true;
00478 }
00479
00480 bool CommunicationRecorder::finishWriting(int timeout) {
00481 JTime start;
00482 bool result = false;
00483 while (start.getAge() < timeout) {
00484 if ( (msgQueue.getCount() == 0) && (sampleQueue.getCount() == 0) ) {
00485 result = true;
00486 break;
00487 }
00488 wait(10);
00489 }
00490 if (!result)
00491 printf("FinishWriting: Still %d messages and %d samples waiting...\n",
00492 msgQueue.getCount(), sampleQueue.getCount());
00493 return result;
00494 }
00495
00496 bool CommunicationRecorder::saveIndexFile() {
00497 if (!isWriter()) return false;
00498 JString xml = JString::format("<!-- Original Start Time: %s -->\n", (char*) originalStartTime.print());
00499 xml += originalStartTime.toXML("originaltime");
00500 JString data = JString::format("<CommunicationRecorder>\n%s</CommunicationRecorder>\n", (char*) xml.indentXML());
00501 JString filename = JString::format("%s/%s", (char*) storage, (char*) indexFilename);
00502 return JFile::writeAFileASCII(filename, data);
00503 }
00504
00505 InfoItem* CommunicationRecorder::getIndexFileInfo(const JString& location, const JString& indexfile) {
00506 JString filename = JString::format("%s/%s", (char*) location, (char*) indexfile);
00507 JString xml = JFile::readAFileASCII(filename);
00508 if (xml.length() == 0)
00509 return NULL;
00510 XMLParser parser;
00511 if (!parser.parseXML(xml))
00512 return NULL;
00513 XMLNode* rootNode = parser.getRootNode();
00514 if (rootNode == NULL)
00515 return NULL;
00516 if (!rootNode->getTag().equalsIgnoreCase("CommunicationRecorder"))
00517 return NULL;
00518 XMLNode* node = rootNode->getChildNode("originaltime");
00519 if (node == NULL)
00520 return NULL;
00521 JTime oriTime;
00522 if (!oriTime.fromXML(node))
00523 return NULL;
00524 InfoItem* info = new InfoItem();
00525 info->setTime("Original Start Time", oriTime);
00526 info->setEntry("Storage Location", location);
00527 info->setEntry("Index File", indexfile);
00528 return info;
00529 }
00530
00531 bool CommunicationRecorder::loadIndexFile() {
00532 if (!isReader()) return false;
00533 JString filename = JString::format("%s/%s", (char*) storage, (char*) indexFilename);
00534 JString xml = JFile::readAFileASCII(filename);
00535 if (xml.length() == 0)
00536 return false;
00537 XMLParser parser;
00538 if (!parser.parseXML(xml))
00539 return false;
00540 XMLNode* rootNode = parser.getRootNode();
00541 if (rootNode == NULL)
00542 return false;
00543 if (!rootNode->getTag().equalsIgnoreCase("CommunicationRecorder"))
00544 return false;
00545 XMLNode* node = rootNode->getChildNode("originaltime");
00546 if (node == NULL)
00547 return false;
00548 if (!originalStartTime.fromXML(node))
00549 return false;
00550 masterOffset = currentStartTime - originalStartTime;
00551 return true;
00552 }
00553
00554
00555 bool CommunicationRecorder::unitTest() {
00556
00557
00558 Message* msg;
00559 DataSample* sample;
00560 char* data;
00561 int n;
00562
00563
00564 addUnitTestLog("Adding Messages and Samples to the Recorder...");
00565
00566 JTime stamp;
00567 for (n=0; n<10; n++) {
00568 msg = new Message("Me", "You", "MyType", JString("Message: ") + JString(n), "");
00569 msg->postedTime = stamp;
00570 this->addNewMessage(msg);
00571 sample = new DataSample("Raw", JString("Sample: ") + JString(n));
00572
00573 stamp += 10;
00574 data = new char[1024];
00575 memset(data, n, 1024);
00576 sample->giveData(data, 1024);
00577 sample->timestamp = stamp;
00578 this->addNewSample(sample);
00579 stamp += 10;
00580 }
00581
00582 wait(200);
00583
00584 if (!finishWriting(1000)) {
00585 addUnitTestLog("Recorder finishWriting failed...");
00586 return false;
00587 }
00588
00589 CommunicationRecorder* comReader = new CommunicationRecorder(this->storage);
00590 if (!comReader->initReading()) {
00591 delete(comReader);
00592 addUnitTestLog("Initialising Reader failed...");
00593 return false;
00594 }
00595 comReader->start();
00596
00597
00598 for (n=0; n<10; n++) {
00599 msg = comReader->retrieveMessage(n);
00600 if (msg == NULL) {
00601 addUnitTestLog(JString::format("Reading message %d failed...", n));
00602 delete(comReader);
00603 return false;
00604 }
00605 else if (!msg->content.equals(JString("Message: ") + JString(n))) {
00606 addUnitTestLog(JString::format("Reading message %d was wrong: '%s'...", n, (char*) msg->content));
00607 delete(msg);
00608 delete(comReader);
00609 return false;
00610 }
00611 delete(msg);
00612
00613 sample = comReader->retrieveSample(n);
00614 if (sample == NULL) {
00615 addUnitTestLog(JString::format("Reading sample %d failed...", n));
00616 delete(comReader);
00617 return false;
00618 }
00619 else if (!sample->name.equals(JString("Sample: ") + JString(n))) {
00620 addUnitTestLog(JString::format("Reading sample %d was wrong: '%s'...", n, (char*) sample->name));
00621 delete(sample);
00622 delete(comReader);
00623 return false;
00624 }
00625 delete(sample);
00626 }
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660 delete(comReader);
00661 return true;
00662 }
00663
00664 }