00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "NetMessageProtocol.h"
00026
00027 namespace cmlabs {
00028
00029
00030
00031
00032
00033 NetMessageProtocol::NetMessageProtocol():NetProtocol("Message")
00034 {
00035 netTimeout = 60000;
00036 }
00037
00038 NetMessageProtocol::~NetMessageProtocol()
00039 {
00040
00041 }
00042
00043 Object* NetMessageProtocol::clone() const {
00044 NetMessageProtocol* nmp = new NetMessageProtocol();
00045 nmp->isLocalCallback = isLocalCallback;
00046 nmp->isRemoteCallback = isRemoteCallback;
00047 return nmp;
00048 }
00049
00050 bool NetMessageProtocol::checkBufferForCompatibility(char* buffer, int length) {
00051
00052 isRemoteCallback = false;
00053 if (length < 7) return false;
00054
00055 JString str = buffer;
00056 if (str.startsWith("Message"))
00057 return true;
00058 else if (str.startsWith("Receive")) {
00059 isRemoteCallback = true;
00060 isLocalCallback = false;
00061 return true;
00062 }
00063
00064 return false;
00065 }
00066
00067 bool NetMessageProtocol::initializeConversation(JSocket* socket) {
00068 if (socket == NULL)
00069 return false;
00070
00071 return true;
00072 }
00073
00074 bool NetMessageProtocol::initializeAsReceiver(JSocket* socket, JString from) {
00075
00076 int res;
00077
00078 Message msg = Message(from, "", "RECEIVE_NAME_NOTIFICATION");
00079 JString str = msg.toXML();
00080 long size = str.length();
00081
00082 char* buffer = new char[12 + size + 1];
00083
00084 strcpy(buffer, "Receive");
00085
00086 writeLongToBytes(buffer+8, size);
00087
00088 memcpy(buffer+12, str.charpoint(), size);
00089
00090 if ((res=socket->write(buffer, 12+size) != (12+size))) {
00091 delete [] buffer;
00092 return false;
00093 }
00094
00095 isLocalCallback = true;
00096 isRemoteCallback = false;
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 if (DEBUGLEVEL(DEBUG)) {
00128 printf("######### [NetMessageProtocol] InitAsRec success!\n");
00129 }
00130
00131 delete [] buffer;
00132 return true;
00133 }
00134
00135
00136 bool NetMessageProtocol::sendObject(JSocket* socket, Message* msg, bool isReply) {
00137
00138
00139
00140
00141 currentUnsentBufferSize = 0;
00142 currentSentBufferSize = 0;
00143
00144
00145 socket->readIntoBuffer();
00146 if (socket->charBufferCount > 0) {
00147 if (DEBUGLEVEL(STATUS)) {
00148 printf("######### [NetMessageProtocol] Something already in input buffer\n");
00149 }
00150 return false;
00151 }
00152
00153 int res;
00154
00155
00156
00157
00158
00159
00160 bool binaryOnly = false;
00161
00162 DataSample* sample;
00163 int binChunkCount = msg->getBinaryChunkCount();
00164 int binSize = 0;
00165 Collection bufSizes;
00166 if ( (binChunkCount == 1) && (msg->getType().equalsIgnoreCase("BinaryOnly") ) ) {
00167 sample = (DataSample*) msg->getObject();
00168 if (sample == NULL)
00169 return false;
00170 if (msg->noreply.length() > 0)
00171 sample->setParam("---MsgNoReply---", msg->noreply);
00172 binSize = msg->getBinarySize(0);
00173 binaryOnly = true;
00174 }
00175 else if (binChunkCount > 0) {
00176 int subBinSize = 0;
00177 JString bufString;
00178 for (int i=0; i<binChunkCount; i++) {
00179 subBinSize = msg->getBinarySize(i);
00180 binSize += subBinSize;
00181 bufSizes.add(JString(subBinSize));
00182 }
00183 if (bufSizes.getCount() > 0) {
00184 bufString = bufSizes.printListLine(",");
00185 msg->set("BinarySizes", bufString);
00186 msg->set("TotalBinarySize", JString(binSize));
00187 }
00188 else {
00189 binChunkCount = 0;
00190 binSize = 0;
00191 }
00192 }
00193
00194 JString str;
00195 long size = 0;
00196
00197 if (!binaryOnly) {
00198 str = msg->toXML();
00199 size = str.length();
00200
00201 }
00202
00203 long totalBytes = size + 12 + binSize;
00204 currentUnsentBufferSize = totalBytes;
00205 char* buffer = new char[totalBytes+1];
00206 long bytesSoFar = 0;
00207
00208 if (binChunkCount > 0) {
00209
00210 if (!binaryOnly) {
00211 strcpy(buffer, "Message");
00212 writeLongToBytes(buffer+8, size);
00213 memcpy(buffer+12, str.charpoint(), size);
00214
00215 JString bufString;
00216 long bufsize;
00217
00218 for (int j=0; j<binChunkCount; j++) {
00219 bufsize = bufSizes.get(j).toLong();
00220 msg->toBinaryBuffer(j, buffer+12+size+bytesSoFar, bufsize);
00221
00222 bytesSoFar += bufsize;
00223 }
00224
00225 }
00226 else {
00227
00228 sample = (DataSample*) msg->getObject();
00229 if (sample == NULL) {
00230 return false;
00231 }
00232 if (!sample->toBinaryBuffer(0, buffer+12, binSize)) {
00233
00234 }
00235 strcpy(buffer, "BinaryM");
00236 writeLongToBytes(buffer+8, binSize);
00237
00238 }
00239
00240 if (totalBytes <= socket->getSendBufferSize()) {
00241 if ((res=socket->write(buffer, totalBytes) != (totalBytes))) {
00242 currentUnsentBufferSize = 0;
00243 currentSentBufferSize = 0;
00244 delete [] buffer;
00245 return false;
00246 }
00247 currentSentBufferSize = totalBytes;
00248 }
00249 else {
00250 long breakmax = socket->getSendBufferSize();
00251 res = 0;
00252 int subres = 0;
00253 char* lpbuffer = buffer;
00254 int l = totalBytes;
00255 int n = l / breakmax;
00256 for (int i=0; i < n; i++) {
00257 subres = socket->write(&lpbuffer[i*breakmax], breakmax);
00258 if (subres == breakmax) {
00259 res += subres;
00260 currentSentBufferSize = res;
00261 }
00262 else {
00263 printf("*** Only partial messasge sent - %d out of %d ***\n", res, totalBytes);
00264 currentUnsentBufferSize = 0;
00265 currentSentBufferSize = 0;
00266 delete [] buffer;
00267 return false;
00268 }
00269
00270 }
00271
00272 n = n * breakmax;
00273
00274 subres = socket->write(&lpbuffer[n], l-n);
00275 if (subres == l-n) {
00276 res += subres;
00277 currentSentBufferSize = res;
00278 }
00279 else {
00280 printf("*** Only partial messasge sent - %d out of %d ***\n", res, totalBytes);
00281 currentUnsentBufferSize = 0;
00282 currentSentBufferSize = 0;
00283 delete [] buffer;
00284 return false;
00285 }
00286 }
00287
00288 }
00289 else {
00290 strcpy(buffer, "Message");
00291
00292 writeLongToBytes(buffer+8, size);
00293 memcpy(buffer+12, str.charpoint(), size);
00294
00295
00296 if ((res=socket->write(buffer, 12+size) != (12+size))) {
00297 currentUnsentBufferSize = 0;
00298 currentSentBufferSize = 0;
00299 delete [] buffer;
00300 return false;
00301 }
00302 currentSentBufferSize = size+12;
00303 }
00304
00305
00306
00307
00308
00309
00310
00311 delete [] buffer;
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323 return true;
00324
00325 }
00326
00327 Message* NetMessageProtocol::receiveObject(JSocket* socket, int timeout) {
00328 Message* msg = NULL;
00329 JTime start;
00330
00331
00332
00333
00334 currentUnreceivedBufferSize = 0;
00335 currentReceivedBufferSize = 0;
00336
00337
00338 socket->readIntoBuffer();
00339 if (socket->charBufferCount < 12) {
00340
00341 socket->waitForReadability(timeout);
00342 socket->readIntoBuffer();
00343 if (socket->charBufferCount < 12) {
00344 if (DEBUGLEVEL(KITCHENSINK)) {
00345
00346 }
00347 if (socket->charBufferCount > 0) {
00348
00349 printf("Incomplete Buffer: {%c %c %c %c %c %c %c %c %c %c %c %c}",
00350 socket->charBuffer[0],
00351 socket->charBuffer[1],
00352 socket->charBuffer[2],
00353 socket->charBuffer[3],
00354 socket->charBuffer[4],
00355 socket->charBuffer[5],
00356 socket->charBuffer[6],
00357 socket->charBuffer[7],
00358 socket->charBuffer[8],
00359 socket->charBuffer[9],
00360 socket->charBuffer[10],
00361 socket->charBuffer[11]);fflush(stdout);
00362 }
00363
00364
00365 return NULL;
00366 }
00367 }
00368
00369
00370 char* buffer = new char[12];
00371 int count = 0;
00372
00373 count = socket->read(buffer, 12, timeout);
00374
00375
00376
00377
00378
00379 if (count <= 0) {
00380 delete [] buffer;
00381 if (DEBUGLEVEL(KITCHENSINK)) {
00382
00383 }
00384
00385
00386 return NULL;
00387 }
00388 else if (count < 12) {
00389 if (DEBUGLEVEL(STATUS)) {
00390 printf("[NetMsg] Received Something: %d chars!\n", count);
00391 }
00392 delete [] buffer;
00393 return NULL;
00394 }
00395
00396
00397
00398
00399
00400
00401 JString str = buffer;
00402
00403
00404
00405
00406 if ( (!str.equals("Message")) && (!str.equals("Receive")) && (!str.equals("BinaryM")) ) {
00407 wait(10);
00408 int fc = socket->flush();
00409 if (DEBUGLEVEL(STATUS)) {
00410 printf("[NetMsg] Received Something not Message - Read '%s' [%c %c %c] - flushed %d!\n", (char*)str.getFirst(10), buffer[0], buffer[1], buffer[2], fc);
00411 }
00412 delete [] buffer;
00413 return NULL;
00414 }
00415
00416
00417 long size = getLongFromBytes(buffer+8);
00418 delete [] buffer;
00419
00420 if (size < 0) {
00421 wait(10);
00422 int fc = socket->flush();
00423 if (DEBUGLEVEL(STATUS)) {
00424 printf("[NetMsg] Received wrong length %d!\n", size);
00425 }
00426 return NULL;
00427 }
00428
00429 currentUnreceivedBufferSize = 0;
00430 currentReceivedBufferSize = 0;
00431
00432
00433
00434 buffer = new char[size+1];
00435
00436 int tm = timeout - start.getAge();
00437 if (tm <= 0) tm = 10;
00438 count = socket->read(buffer, size, MSGTIMEOUT);
00439
00440 if (count != size) {
00441 currentUnreceivedBufferSize = 0;
00442 currentReceivedBufferSize = 0;
00443 wait(10);
00444 int fc = socket->flush();
00445 if (DEBUGLEVEL(STATUS)) {
00446 printf("[NetMsg] Received wrong amount %d ! %d!\n", count, size);
00447 }
00448 delete [] buffer;
00449 return NULL;
00450 }
00451
00452
00453
00454 buffer[count] = 0;
00455
00456 if (!str.equals("BinaryM")) {
00457 msg = new Message(buffer);
00458
00459
00460 }
00461 else {
00462 DataSample* sample = new DataSample();
00463
00464 if (!sample->fromBinaryBuffer(0, buffer, size)) {
00465 delete(sample);
00466
00467
00468 return new Message("" ,"", "BINARY_RECEIVE_FAILED");
00469 }
00470 msg = new Message("" ,"", "BinaryOnly", sample);
00471 if (sample->getParam("---MsgNoReply---").length() > 0) {
00472 msg->noreply = sample->getParam("---MsgNoReply---");
00473 sample->params.remove("---MsgNoReply---");
00474 }
00475
00476 }
00477
00478
00479 JString hostname = socket->getRemoteIPAddress();
00480 if (hostname.equals("localhost"))
00481 hostname = socket->getLocalIPAddress();
00482 if (msg->getOrigin().length() == 0)
00483 msg->setOrigin(hostname);
00484 msg->set("netprotocol", name);
00485
00486 delete [] buffer;
00487
00488 JString binString = msg->get("BinarySizes");
00489 if (binString.length() == 0) {
00490
00491 currentUnreceivedBufferSize = size+12;
00492 currentReceivedBufferSize = size+12;
00493 return msg;
00494 }
00495
00496 Collection binSizes = binString.split(",");
00497
00498
00499 long totalBinSize = msg->get("TotalBinarySize").toLong();
00500
00501 currentUnreceivedBufferSize = totalBinSize + size + 12;
00502 currentReceivedBufferSize = size+12;
00503
00504
00505
00506 long subBinSize;
00507 for (int n=0; n<binSizes.getCount(); n++) {
00508
00509 subBinSize = binSizes.get(n).toLong();
00510 if (subBinSize <= 0) {
00511
00512 }
00513 else {
00514 buffer = new char[subBinSize];
00515 count = socket->read(buffer, subBinSize, 10000);
00516 if (count != subBinSize) {
00517 delete [] buffer;
00518
00519 }
00520 else {
00521
00522 if (!msg->fromBinaryBuffer(n, buffer, subBinSize)) {
00523
00524 }
00525 delete [] buffer;
00526 }
00527 currentReceivedBufferSize += subBinSize;
00528 }
00529 }
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548 return msg;
00549 }
00550
00551
00552 long NetMessageProtocol::getLongFromBytes(char* buffer) {
00553 long val = ( (unsigned char) buffer[0] + ( (unsigned char) buffer[1] << 8) + ( (unsigned char) buffer[2] << 16) + ( (unsigned char) buffer[3] << 24) );
00554
00555
00556 return val;
00557 }
00558
00559 bool NetMessageProtocol::writeLongToBytes(char* buffer, long val) {
00560 buffer[0] = (unsigned char) val;
00561 buffer[1] = (unsigned char) (val >> 8);
00562 buffer[2] = (unsigned char) (val >> 16);
00563 buffer[3] = (unsigned char) (val >> 24);
00564 return true;
00565 }
00566
00567
00568
00569
00570
00571
00572 }