00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "NetworkConnection.h"
00026
00027 namespace cmlabs {
00028
00029
00030
00031
00032
00033 NetworkConnection::NetworkConnection(JSocket* con, NetProtocol* netProtocol, TCPReceiver* rec) : JThread() {
00034
00035 allowMessageDrop = false;
00036 toSendMaxQueueSize = 10;
00037 createdAsReceiver = true;
00038 mySocket = con;
00039 keepAliveTimeout = 60000;
00040 netprotocol = netProtocol;
00041
00042
00043 init(rec);
00044 }
00045
00046 NetworkConnection::NetworkConnection(const TCPLocation& loc, NetProtocol* netProtocol, TCPReceiver* rec) : JThread() {
00047
00048 allowMessageDrop = false;
00049 toSendMaxQueueSize = 10;
00050 createdAsReceiver = false;
00051
00052 netprotocol = netProtocol;
00053 mySocket = new JSocket(loc);
00054 lastConnectedTo = loc;
00055 keepAliveTimeout = 60000;
00056 if (mySocket->connectNow())
00057 remoteName = loc.name;
00058 init(rec);
00059 }
00060
00061 NetworkConnection::NetworkConnection(const JString& addr, int port, NetProtocol* netProtocol, TCPReceiver* rec) : JThread() {
00062
00063 allowMessageDrop = false;
00064 toSendMaxQueueSize = 10;
00065 createdAsReceiver = false;
00066 netprotocol = netProtocol;
00067 mySocket = new JSocket(addr, port);
00068 lastConnectedTo = TCPLocation(addr, port, "***NoNameProvided***");
00069 keepAliveTimeout = 60000;
00070 if (mySocket->connectNow())
00071 remoteName = "***NoNameProvided***";
00072 init(rec);
00073 }
00074
00075 NetworkConnection::~NetworkConnection() {
00076 if (recHTTP != NULL)
00077 delete(recHTTP);
00078 if (DEBUGLEVEL(STATUS))
00079 printf("NetCon (%s) terminating...\n", (char*) print());
00080 terminate();
00081 if (netprotocol != NULL)
00082 delete(netprotocol);
00083 if (mySocket != NULL) {
00084 if (DEBUGLEVEL(STATUS))
00085 printf("NetCon (%s) disconnecting socket...\n", (char*) print());
00086 mySocket->disconnectNow();
00087 delete(mySocket);
00088 mySocket = NULL;
00089 }
00090
00091 if (DEBUGLEVEL(STATUS))
00092 printf("NetCon (%s) terminated...\n", (char*) print());
00093 }
00094
00095
00096
00097
00098 bool NetworkConnection::restart(JSocket* con, NetProtocol* netProtocol, TCPReceiver* rec) {
00099
00100 allowMessageDrop = false;
00101 createdAsReceiver = true;
00102
00103 if (!conMutex.EnterMutex(2000)) {
00104 if (DEBUGLEVEL(DEBUG)) {
00105 printf("[%p][%s]->[%s] Could not get Mutex in Restart\n", this, (char*) name, (char*) remoteName);
00106 }
00107 return false;
00108 }
00109 if (netprotocol != NULL)
00110 delete(netprotocol);
00111 netprotocol = netProtocol;
00112 remoteName = "";
00113 mySocket = con;
00114
00115 init(rec);
00116 conMutex.LeaveMutex();
00117 return true;
00118 }
00119
00120 bool NetworkConnection::restart(const TCPLocation& loc, NetProtocol* netProtocol, TCPReceiver* rec) {
00121
00122 allowMessageDrop = false;
00123 createdAsReceiver = false;
00124
00125 if (!conMutex.EnterMutex(2000)) {
00126 if (DEBUGLEVEL(DEBUG)) {
00127
00128 printf("[%p][%s]->[%s] Could not get Mutex in Restart\n", this, (char*) name, (char*) remoteName);
00129 }
00130 return false;
00131 }
00132 mySocket = new JSocket(loc);
00133 if (netprotocol != NULL)
00134 delete(netprotocol);
00135 netprotocol = netProtocol;
00136 if (mySocket->connectNow()) {
00137 init(rec);
00138 remoteName = loc.name;
00139 conMutex.LeaveMutex();
00140 return true;
00141 }
00142 else {
00143 remoteName = "";
00144 conMutex.LeaveMutex();
00145 return false;
00146 }
00147 }
00148
00149 bool NetworkConnection::restart(const JString& addr, int port, NetProtocol* netProtocol, TCPReceiver* rec) {
00150
00151 allowMessageDrop = false;
00152 createdAsReceiver = false;
00153
00154 if (!conMutex.EnterMutex(2000)) {
00155 if (DEBUGLEVEL(DEBUG)) {
00156 printf("[%p][%s]->[%s] Could not get Mutex in Restart\n", this, (char*) name, (char*) remoteName);
00157
00158 }
00159 return false;
00160 }
00161 mySocket = new JSocket(addr, port);
00162 mySocket->connectNow();
00163 if (netprotocol != NULL)
00164 delete(netprotocol);
00165 netprotocol = netProtocol;
00166 if (mySocket->connectNow()) {
00167 init(rec);
00168 remoteName = "***NoNameProvided***";
00169 conMutex.LeaveMutex();
00170 return true;
00171 }
00172 else {
00173 remoteName = "";
00174 conMutex.LeaveMutex();
00175 return false;
00176 }
00177 }
00178
00179
00180
00181
00182 void NetworkConnection::init(TCPReceiver* rec) {
00183 parent = NULL;
00184 recHTTP = NULL;
00185
00186 conID = createUniqueID("connection");
00187
00188 lastRead = JTime();
00189 lastWrite = JTime();
00190
00191 isDoneRunning = true;
00192 isTerminated = false;
00193 receiver = rec;
00194
00195 if (netprotocol != NULL) {
00196 timeout = netprotocol->netTimeout;
00197 if ((netprotocol->isRemoteCallback) && (receiver != NULL)) {
00198
00199 receiver = NULL;
00200 if (DEBUGLEVEL(DEBUG)) {
00201 printf("### Incoming Receiver: %s ###\n", (char*) remoteName);
00202 }
00203 }
00204 if (DEBUGLEVEL(DEBUG)) {
00205 printf("[init]\n");fflush(stdout);
00206 }
00207 Message *msg = receiveObject(200);
00208 if (msg != NULL)
00209 receivedMessages.add(msg);
00210 }
00211 else {
00212 timeout = 8000;
00213 }
00214
00215 shouldContinue = (receiver != NULL);
00216
00217 this->start();
00218 this->setPriority(90);
00219
00220 wait(0);
00221 }
00222
00223 bool NetworkConnection::initializeAsReceiver(TCPReceiver* rec) {
00224
00225 if (netprotocol == NULL)
00226 return false;
00227
00228 if (!netprotocol->initializeAsReceiver(mySocket, rec->getName()))
00229 return false;
00230
00231 receiver = rec;
00232 shouldContinue = true;
00233 return true;
00234 }
00235
00236
00237 void NetworkConnection::run()
00238 {
00239 JTime t;
00240 if (isTerminated) {
00241 if (DEBUGLEVEL(DEBUG)) {
00242 printf("********** [NetCon] Connection terminated before start...\n");
00243 }
00244 return;
00245 }
00246 JString str = "";
00247 JString from;
00248 JString id;
00249 Message* sendmsg = NULL;
00250 Message* hmsg = NULL;
00251
00252 while (!isTerminated) {
00253
00254 int c = 0;
00255 while ((!shouldContinue) && (!isTerminated)) {
00256
00257 while ( (sendmsg = (Message*) toSendMessages.retrieveEntry(0)) != NULL) {
00258 toSendSemaphore.post();
00259 hmsg = this->sendReceiveObject(sendmsg, 1000);
00260 if (hmsg == NULL)
00261 delete(sendmsg);
00262 else
00263 delete(sendmsg);
00264
00265
00266
00267 delete(hmsg);
00268
00269 }
00270 toSendMessages.waitForNewEntryToAppear(10);
00271
00272 if ((c++ <= 10) && (!shouldContinue) && (!isTerminated)) {
00273 if ( (!this->isRemoteCallback()) && (!this->isLocalCallback()) && (isConnected()) && (this->msSinceLastActivity() > keepAliveTimeout) )
00274 sendKeepAlive();
00275 c = 0;
00276 }
00277 }
00278
00279 isDoneRunning = false;
00280
00281 if (DEBUGLEVEL(HEARTBEAT))
00282 printf("NetworkCon just starting run [%p]\n", mySocket); fflush(stdout);
00283
00284 double* ioCounts;
00285 while ((shouldContinue) && (isConnected())) {
00286 if (conMutex.EnterMutex(300)) {
00287 if (this->netprotocol != NULL) {
00288 if (mySocket != NULL)
00289 mySocket->resetIOCounts();
00290
00291
00292 hmsg = receiveObject(100);
00293 if (hmsg != NULL) {
00294 lastRead = JTime();
00295 if (!shouldContinue) break;
00296 from = hmsg->getFrom();
00297 if ( (from.length() > 0) && ( (remoteName.length() == 0) || (remoteName.equalsIgnoreCase("***NoNameProvided***")) ) )
00298 remoteName = from;
00299
00300 if (hmsg->getType().containsIgnoreCase("benchmark"))
00301 sendmsg = performBenchmark(hmsg);
00302 else
00303 sendmsg = receiver->netObjectReceive(hmsg, this);
00304
00305 if (sendmsg != NULL) {
00306 if (!netprotocol->sendObject(mySocket, sendmsg, true)) {
00307
00308
00309 if (DEBUGLEVEL(STATUS)) {
00310 printf("######### [NetCon] Could not reply to received message\n");
00311 }
00312
00313
00314 delete(sendmsg);
00315 }
00316 else {
00317 delete(sendmsg);
00318 lastWrite = JTime();
00319 }
00320 }
00321 if (!shouldContinue) break;
00322 hmsg = NULL;
00323 ioCounts = mySocket->getIOCounts();
00324 localProfile.addDataStat((long)ioCounts[1], (long)ioCounts[0]);
00325 }
00326 else {
00327
00328 }
00329 }
00330 else {
00331 str = readln(10);
00332 if ((str.length() > 0) || (str.isCRTerminated)) {
00333 receiver->tcpInput(str, this);
00334 }
00335 else {
00336 if ((timeout > 0) && (msSinceLastActivity() > timeout)) {
00337 if (DEBUGLEVEL(DEBUG)) {
00338 printf("[NetCon] ### TIMEOUT ### length: %d timeout: %d msSince: %d this: %p\n", str.length(), timeout, msSinceLastActivity(), this->parent);
00339 }
00340 conMutex.LeaveMutex();
00341 shouldContinue = false;
00342 if (mySocket != NULL)
00343 mySocket->disconnectNow();
00344 reset();
00345 break;
00346 }
00347 }
00348 }
00349 if ( (this->isRemoteCallback()) && (this->msSinceLastActivity() > keepAliveTimeout) )
00350 sendKeepAlive();
00351 conMutex.LeaveMutex();
00352
00353 }
00354 else {
00355 if (DEBUGLEVEL(DETAILEDSTATUS))
00356 printf("[%p][%s]->[%s] Could not get Mutex in Run\n", this, (char*) name, (char*) remoteName);
00357 }
00358 }
00359 if (DEBUGLEVEL(HEARTBEAT))
00360 printf("NetworkCon out of main run [%p]\n", mySocket);
00361
00362 conMutex.LeaveMutex();
00363
00364 isDoneRunning = true;
00365
00366
00367 if (!isTerminated) {
00368 if (receiver != NULL)
00369 receiver->disconnectNotify(remoteName, this);
00370 if (DEBUGLEVEL(HEARTBEAT))
00371 printf("NetworkCon out of main run resetting [%p]\n", mySocket); fflush(stdout);
00372 reset();
00373 if (DEBUGLEVEL(HEARTBEAT))
00374 printf("NetworkCon out of main run reset! [%p]\n", mySocket); fflush(stdout);
00375 }
00376
00377 if (DEBUGLEVEL(HEARTBEAT))
00378 printf("NetworkCon out of main run end [%p]\n", mySocket); fflush(stdout);
00379 }
00380
00381 if (DEBUGLEVEL(HEARTBEAT))
00382 printf("NetworkCon run exit [%p]\n", mySocket); fflush(stdout);
00383 }
00384
00385
00386 bool NetworkConnection::terminate()
00387 {
00388 if ((isDoneRunning) && (!isTerminated)) {
00389 isTerminated = true;
00390 wait(20);
00391 }
00392 else {
00393 isTerminated = true;
00394 }
00395
00396 reset();
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409 return true;
00410 }
00411
00412 bool NetworkConnection::reset() {
00413
00414 allowMessageDrop = false;
00415 if (!conMutex.EnterMutex(2000)) {
00416 if (DEBUGLEVEL(DEBUG)) {
00417 printf("[%p][%s]->[%s] Could not get Mutex in Reset\n", this, (char*) name, (char*) remoteName);
00418
00419 }
00420 return false;
00421 }
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436 remoteName = "";
00437
00438
00439
00440
00441 shouldContinue = false;
00442
00443 if (mySocket != NULL) {
00444 mySocket->disconnectNow();
00445 }
00446
00447 JTime t;
00448
00449
00450 while (t.getAge() < 1000) {
00451 if (isDoneRunning)
00452 break;
00453 wait(5);
00454 }
00455
00456
00457
00458
00459 delete(mySocket);
00460 mySocket = NULL;
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470 receivedMessages.removeAll();
00471 toSendMessages.removeAll();
00472 conMutex.LeaveMutex();
00473 return true;
00474 }
00475
00476 bool NetworkConnection::isLocalCallback() {
00477 if (netprotocol == NULL)
00478 return false;
00479 return (netprotocol->isLocalCallback);
00480 }
00481
00482 bool NetworkConnection::isRemoteCallback() {
00483 if (netprotocol == NULL)
00484 return false;
00485 return (netprotocol->isRemoteCallback);
00486 }
00487
00488 bool NetworkConnection::isIncomingConnection() {
00489 if (netprotocol == NULL)
00490 return false;
00491 return ( (!netprotocol->isRemoteCallback) && (receiver != NULL));
00492
00493 }
00494
00495 JString NetworkConnection::print() {
00496
00497
00498 if (!isConnected()) {
00499 return "NetworkConnection not connected";
00500 }
00501
00502 if (netprotocol == NULL) {
00503 return JString::format("Raw connection from me to %s (%d)", (char*) getRemoteHostname(), (int) mySocket->mySocket);
00504 }
00505
00506
00507 if ( (!netprotocol->isLocalCallback) && (!netprotocol->isRemoteCallback) && (receiver == NULL)) {
00508 return JString::format("%s connection from me to %s (%d)", (char*) netprotocol->name, (char*) remoteName, (int) mySocket->mySocket);
00509 }
00510
00511
00512 else if ( (netprotocol->isLocalCallback) && (!netprotocol->isRemoteCallback) && (receiver != NULL)) {
00513 return JString::format("%s callback from me to %s (%d)", (char*) netprotocol->name, (char*) remoteName, (int) mySocket->mySocket);
00514 }
00515
00516
00517 else if ( (!netprotocol->isLocalCallback) && (!netprotocol->isRemoteCallback) && (receiver != NULL)) {
00518 return JString::format("%s connection from %s to me (%d)", (char*) netprotocol->name, (char*) remoteName, (int) mySocket->mySocket);
00519 }
00520
00521
00522 else if ( (!netprotocol->isLocalCallback) && (netprotocol->isRemoteCallback) && (receiver == NULL)) {
00523 return JString::format("%s callback from %s to me (%d)", (char*) netprotocol->name, (char*) remoteName, (int) mySocket->mySocket);
00524 }
00525
00526 return "Unknown NetworkConnection";
00527 }
00528
00529 bool NetworkConnection::isReceiver() {
00530 return (receiver != NULL);
00531 }
00532
00533
00534
00535 bool NetworkConnection::unitTest() {
00536
00537 return true;
00538 }
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549 Message* NetworkConnection::sendReceiveObject(Message* msg, unsigned int msTimeout) {
00550
00551
00552
00553 receivedMessages.removeAll();
00554
00555 if (!conMutex.EnterMutex(2000)) {
00556 if (DEBUGLEVEL(DEBUG)) {
00557
00558 printf("[%p][%s]->[%s] Could not get Mutex in SendReceiveObject\n", this, (char*) name, (char*) remoteName);
00559 }
00560
00561 return NULL;
00562 }
00563
00564 Message* hmsg = NULL;
00565
00566
00567 bool noreply = (msg->noreply.length() > 0);
00568
00569
00570
00571
00572 int count = 0;
00573 double* ioCounts;
00574 if (mySocket != NULL)
00575 mySocket->resetIOCounts();
00576 while (!sendObject(msg, false)) {
00577
00578 count++;
00579 conMutex.LeaveMutex();
00580 wait(10);
00581 if (!conMutex.EnterMutex(2000)) {
00582 if (DEBUGLEVEL(DEBUG)) {
00583
00584 printf("[%p][%s]->[%s] Could not get second Mutex in SendReceiveObject\n", this, (char*) name, (char*) remoteName);
00585 }
00586 return NULL;
00587 }
00588 if (count > 4) {
00589 if (DEBUGLEVEL(STATUS)) {
00590 printf("[NetCon] Could not send in SendReceiveObject\n");
00591 }
00592 conMutex.LeaveMutex();
00593 return NULL;
00594 }
00595 }
00596
00597 if (noreply) {
00598 conMutex.LeaveMutex();
00599 return new Message("", "", "RECEIVE_ACCEPT");
00600 }
00601
00602 JTime start;
00603
00604
00605
00606 hmsg = receiveObject(msTimeout);
00607
00608 int count2 = 0;
00609 while (hmsg == NULL) {
00610 if (start.getAge() > (int)msTimeout) break;
00611
00612
00613 hmsg = receiveObject(msTimeout);
00614
00615 if (count2++ > 5)
00616 break;
00617 }
00618
00619 if (hmsg == NULL) {
00620
00621 if (DEBUGLEVEL(STATUS)) {
00622 printf("[NetCon] Could not receive in SendReceiveObject\n");
00623 }
00624 }
00625 else {
00626 ioCounts = mySocket->getIOCounts();
00627 localProfile.addDataStat((long)ioCounts[1], (long)ioCounts[0]);
00628 }
00629
00630
00631 conMutex.LeaveMutex();
00632
00633 if (hmsg == NULL)
00634 hmsg = new Message("", "", "SENDRECEIVE_FAILED");
00635 return hmsg;
00636 }
00637
00638 Message* NetworkConnection::receiveObject(unsigned int msTimeout) {
00639
00640 Message* msg = (Message*) receivedMessages.retrieveEntry(0);
00641 if (msg != NULL) {
00642 return msg;
00643 }
00644
00645 if (netprotocol == NULL)
00646 return NULL;
00647
00648 if (mySocket == NULL)
00649 return NULL;
00650
00651 if (!mySocket->isConnected())
00652 return NULL;
00653
00654
00655
00656 msg = netprotocol->receiveObject(mySocket, msTimeout);
00657
00658 if (msg != NULL) {
00659 JString type = msg->getType();
00660 JString from = msg->getFrom();
00661 lastRead = JTime();
00662 if ( (from.length() > 0) && ( (remoteName.length() == 0) || (remoteName.equalsIgnoreCase("***NoNameProvided***")) ) )
00663 remoteName = from;
00664 if (type.equalsIgnoreCase("RECEIVE_NAME_NOTIFICATION")) {
00665
00666
00667
00668 delete(msg);
00669 return NULL;
00670 }
00671 else if (type.equalsIgnoreCase("KEEPALIVE")) {
00672 delete(msg);
00673 msg = new Message("", "", "KEEPALIVESUCCESS");
00674 if (!sendObject(msg, true))
00675 delete(msg);
00676 else
00677 delete(msg);
00678 msg = NULL;
00679 }
00680 }
00681
00682 return msg;
00683 }
00684
00685 bool NetworkConnection::sendObject(Message* msg, bool isReply) {
00686
00687 if (netprotocol == NULL) {
00688
00689 return false;
00690 }
00691
00692 if (mySocket == NULL)
00693 return false;
00694
00695 if (!mySocket->isConnected()) {
00696
00697 return false;
00698 }
00699
00700 if (netprotocol->sendObject(mySocket, msg, isReply)) {
00701 lastWrite = JTime();
00702 return true;
00703 }
00704 else {
00705
00706 return false;
00707 }
00708 }
00709
00710 bool NetworkConnection::allowMessageDropping(bool allow) {
00711 allowMessageDrop = allow;
00712 return true;
00713 }
00714
00715 bool NetworkConnection::sendObjectWhenReady(Message* msg, int timeout) {
00716
00717 if (toSendMessages.getCount() >= toSendMaxQueueSize) {
00718 if (allowMessageDrop) {
00719 delete(toSendMessages.retrieveEntry(0));
00720 }
00721 else {
00722 if (!toSendSemaphore.wait(timeout))
00723 if (toSendMessages.getCount() >= toSendMaxQueueSize)
00724 return false;
00725 }
00726 }
00727 toSendMessages.add(msg);
00728 return true;
00729 }
00730
00731 int NetworkConnection::read(char* buffer, int maxcount, long timeout)
00732 {
00733 if (!isConnected())
00734 return -1;
00735
00736 if (mySocket == NULL)
00737 return -1;
00738
00739 int res = mySocket->read(buffer, maxcount, timeout);
00740
00741 if (res > 0)
00742 lastRead = JTime();
00743
00744 return res;
00745 }
00746
00747
00748 JString NetworkConnection::readString(unsigned int numBytes, unsigned int msTimeout)
00749 {
00750 if (!isConnected())
00751 return "";
00752
00753 JString text = mySocket->readString(numBytes, msTimeout);
00754
00755 if (text.length() > 0)
00756 lastRead = JTime();
00757
00758 return text;
00759 }
00760
00761 JString NetworkConnection::readln(unsigned int msTimeout)
00762 {
00763 if (!isConnected())
00764 return "";
00765
00766 JString text = mySocket->readln(msTimeout);
00767
00768 if ((text.length() > 0) || (text.isCRTerminated))
00769 lastRead = JTime();
00770
00771 return text;
00772 }
00773
00774 bool NetworkConnection::write(char* buffer, int length)
00775 {
00776 if (!isConnected())
00777 return false;
00778
00779 if (mySocket->write(buffer, length) < 1)
00780 return false;
00781
00782 lastWrite = JTime();
00783
00784 return true;
00785 }
00786
00787 bool NetworkConnection::writeString(const JString& str)
00788 {
00789 if (!isConnected())
00790 return false;
00791
00792 if (mySocket->writeString(str) < 1)
00793 return false;
00794
00795 lastWrite = JTime();
00796
00797 return true;
00798 }
00799
00800 bool NetworkConnection::writeln(const JString& str)
00801 {
00802 if (!isConnected())
00803 return false;
00804
00805 if (mySocket->writeln(str) < 1)
00806 return false;
00807
00808 lastWrite = JTime();
00809
00810 return true;
00811 }
00812
00813 bool NetworkConnection::isConnected()
00814 {
00815 if (isTerminated)
00816 return(false);
00817
00818 if (!conMutex.EnterMutex(0)) {
00819 if (DEBUGLEVEL(DEBUG)) {
00820
00821
00822 }
00823 try {
00824 if (mySocket == NULL)
00825 return false;
00826 if (!mySocket->isSocketValid())
00827 return false;
00828 }
00829 catch(...) {
00830 return false;
00831 }
00832
00833
00834 return true;
00835 }
00836
00837 if (mySocket == NULL) {
00838 conMutex.LeaveMutex();
00839 return false;
00840 }
00841
00842 bool res = mySocket->isConnected();
00843 conMutex.LeaveMutex();
00844 return res;
00845
00846
00847
00848
00849
00850
00851
00852
00853 }
00854
00855 bool NetworkConnection::isConnectedTo(const TCPLocation& loc) {
00856 return isConnectedTo(loc.name);
00857 }
00858
00859 bool NetworkConnection::isConnectedTo(const JString& name) {
00860 if ((remoteName.length() > 0) && (name.equals(remoteName))) {
00861 return isConnected();
00862 }
00863
00864
00865
00866
00867 return false ;
00868 }
00869
00870 bool NetworkConnection::isPersistent() {
00871
00872
00873
00874 return true;
00875 }
00876
00877
00878 JString NetworkConnection::getLocalHostname() {
00879
00880 JString hostname;
00881
00882 if (mySocket != NULL) {
00883 hostname = mySocket->getLocalHostname();
00884 }
00885 else {
00886 JSocket* sock = new JSocket("", 80);
00887 hostname = sock->getLocalHostname();
00888 delete sock;
00889 }
00890
00891 return hostname;
00892 }
00893
00894 JString NetworkConnection::getLocalIPAddress() {
00895
00896 JString hostname;
00897
00898 if (mySocket != NULL) {
00899 hostname = mySocket->getLocalIPAddress();
00900 }
00901 else {
00902 JSocket* sock = new JSocket("", 80);
00903 hostname = sock->getLocalIPAddress();
00904 delete sock;
00905 }
00906
00907 return hostname;
00908 }
00909
00910 JString NetworkConnection::getRemoteHostname() {
00911
00912 if (mySocket == NULL)
00913 return "";
00914
00915 if (!mySocket->isConnected())
00916 return "";
00917
00918 return mySocket->getRemoteHostname();
00919 }
00920
00921 TCPLocation NetworkConnection::getLastConnectedTo() {
00922 return lastConnectedTo;
00923 }
00924
00925 TCPLocation NetworkConnection::getRemoteLocation() {
00926 if (isConnected())
00927 return lastConnectedTo;
00928 else
00929 return TCPLocation();
00930 }
00931
00932
00933 bool NetworkConnection::isErrorRecoverable()
00934 {
00935 if (mySocket == NULL)
00936 return false;
00937 return mySocket->isLastErrorRecoverable();
00938 }
00939
00940
00941 bool NetworkConnection::hasTerminated() {
00942 return isTerminated;
00943 }
00944
00945
00946 long NetworkConnection::msSinceLastRead() {
00947
00948
00949 return lastRead.getAge();
00950 }
00951
00952 long NetworkConnection::msSinceLastWrite() {
00953
00954
00955 return lastWrite.getAge();
00956 }
00957
00958 long NetworkConnection::msSinceLastActivity() {
00959 long a = msSinceLastRead();
00960 long b = msSinceLastWrite();
00961 return(a > b ? a:b);
00962 }
00963
00964
00965 bool NetworkConnection::hasNoParent() {
00966 return (parent == NULL);
00967 }
00968
00969
00970 bool NetworkConnection::isReset() {
00971 return ((isDoneRunning) && (mySocket == NULL));
00972 }
00973
00974 bool NetworkConnection::isBroken() {
00975 return ((!isConnected()) && (mySocket != NULL));
00976 }
00977
00978 double NetworkConnection::getSendProgress() {
00979 if (netprotocol != NULL)
00980 return netprotocol->getSendProgress();
00981 else
00982 return 1;
00983 }
00984
00985 double NetworkConnection::getReceiveProgress() {
00986 if (netprotocol != NULL)
00987 return netprotocol->getReceiveProgress();
00988 else
00989 return 1;
00990 }
00991
00992
00993 bool NetworkConnection::setKeepAliveTimeout(long timeout) {
00994 keepAliveTimeout = timeout;
00995 return true;
00996 }
00997
00998 long NetworkConnection::getKeepAliveTimeout() {
00999 return keepAliveTimeout;
01000 }
01001
01002 bool NetworkConnection::useKeepAlive(bool doKeepAlive) {
01003 if ((doKeepAlive) && (keepAliveTimeout <= 0))
01004 keepAliveTimeout = 60000;
01005 else if (!doKeepAlive)
01006 keepAliveTimeout = -1;
01007 return true;
01008 }
01009
01010 bool NetworkConnection::ping(const JString& from, const JString& to, int timeout) {
01011 if (!isConnected())
01012 return false;
01013
01014 bool res = false;
01015 Message* msg = new Message(from, to, "KEEPALIVE");
01016 Message* rmsg = sendReceiveObject(msg, timeout);
01017
01018 if (rmsg == NULL) {
01019 delete(msg);
01020 return false;
01021 }
01022 else if (!rmsg->getType().equalsIgnoreCase("KEEPALIVESUCCESS"))
01023 res = false;
01024 else
01025 res = true;
01026 delete(msg);
01027 delete(rmsg);
01028 return res;
01029 }
01030
01031 bool NetworkConnection::sendKeepAlive() {
01032
01033 if (keepAliveTimeout <= 0)
01034 return true;
01035
01036 if (!isConnected())
01037 return false;
01038
01039
01040 bool res = false;
01041 Message* msg = new Message("", "", "KEEPALIVE");
01042 Message* rmsg = sendReceiveObject(msg, 5000);
01043
01044 if (rmsg == NULL) {
01045 delete(msg);
01046 return false;
01047 }
01048 else if (!rmsg->getType().equalsIgnoreCase("KEEPALIVESUCCESS"))
01049 res = false;
01050 else
01051 res = true;
01052 delete(msg);
01053 delete(rmsg);
01054 return res;
01055 }
01056
01057
01058 bool NetworkConnection::trainNetworkProfile() {
01059 return trainNetworkProfile(&localProfile);
01060 }
01061
01062 bool NetworkConnection::trainNetworkProfile(ConnectionProfile* profile) {
01063 if ((profile == NULL) || (!this->isConnected())) {
01064
01065 return false;
01066 }
01067
01068
01069
01070
01071
01072
01073
01074
01075
01076
01077 double micro = pingRemotePeer();
01078 if (micro < 0) {
01079
01080 return false;
01081 }
01082
01083 int n;
01084 micro = 0;
01085 for (n=0; n<10; n++) {
01086 micro += pingRemotePeer();
01087 }
01088 double avgPing = ((double)micro) / 10.0;
01089
01090 int multi = 0;
01091 micro = -1;
01092 long datasize = 512;
01093 while (micro < 100000) {
01094 if (micro <= 0)
01095 multi = 2;
01096 else {
01097 multi = (int)(100000.0/micro);
01098 if (multi < 2)
01099 multi = 2;
01100 else if (multi > 64)
01101 multi = 64;
01102 }
01103
01104 datasize = datasize * multi;
01105
01106 micro = uploadServerTest(datasize);
01107 if (micro < 0) {
01108
01109 return false;
01110 }
01111 micro -= avgPing;
01112 }
01113
01114 if (datasize < 10)
01115 datasize = 10;
01116 else if (datasize > 1024*1024*1024)
01117 datasize = 1024*1024*1024;
01118
01119 double res;
01120 micro = 0;
01121 for (n=0; n<5; n++) {
01122 res = uploadServerTest(datasize);
01123 if (res < 0) {
01124
01125 return false;
01126 }
01127 micro += (res - avgPing);
01128 }
01129 double avgUpload = ((double)micro) / n;
01130
01131 micro = 0;
01132 for (n=0; n<5; n++) {
01133 res = downloadServerTest(datasize);
01134 if (res < 0) {
01135
01136 return false;
01137 }
01138 micro += (res - avgPing);
01139 }
01140 double avgDownload = ((double)micro) / n;
01141
01142 profile->setSpeed( (long)(datasize*(1000000.0/avgUpload)) , (long)(datasize*(1000000.0/avgDownload)) );
01143
01144 sendLocalProfile();
01145
01146 if (DEBUGLEVEL(DEBUG))
01147 printf("NetworkConnection benchmark [%s]: %s\n", (char*) JString::bytifySize(datasize), (char*) profile->getSpeedString()); fflush(stdout);
01148
01149
01150
01151 if (&localProfile != profile)
01152 localProfile = *profile;
01153
01154 return true;
01155 }
01156
01157
01158 Message* NetworkConnection::performBenchmark(Message* msg) {
01159
01160
01161
01162 JString type = msg->getType();
01163 Message* outMsg = new Message("", "", "");
01164 DataSample* sample = NULL;
01165
01166 if (type.equalsIgnoreCase("BENCHMARK_PING")) {
01167 outMsg->setType("BENCHMARK_PING_SUCCESS");
01168 }
01169 else if (type.equalsIgnoreCase("BENCHMARK_UPSTREAM_TEST")) {
01170 sample = (DataSample*) msg->getObject();
01171 if (sample == NULL)
01172 outMsg->setType("BENCHMARK_FAILED_NO_DATA");
01173 else if (sample->getDataSize() == 0)
01174 outMsg->setType("BENCHMARK_FAILED_WRONG_DATA_SIZE");
01175 else
01176 outMsg->setType("BENCHMARK_UPSTREAM_TEST_SUCCESS");
01177 }
01178 else if (type.equalsIgnoreCase("BENCHMARK_DOWNSTREAM_TEST")) {
01179 long reqsize = msg->getContent().toLong();
01180 if ((reqsize <= 0) || (reqsize > 1024*1024*10))
01181 outMsg->setType("BENCHMARK_FAILED_WRONG_DATA_SIZE");
01182
01183 sample = new DataSample();
01184 char* dat = new char[reqsize];
01185 sample->giveData(dat, reqsize);
01186 outMsg->setObject(sample);
01187 outMsg->setType("BENCHMARK_DOWNSTREAM_TEST_SUCCESS");
01188 }
01189 else if (type.equalsIgnoreCase("BENCHMARK_SET_CONNECTION_PROFILE")) {
01190 ConnectionProfile* profile = (ConnectionProfile*) msg->getObject();
01191 if (profile != NULL) {
01192 remoteProfile = *profile;
01193 outMsg->setType("BENCHMARK_SET_CONNECTION_PROFILE_SUCCESS");
01194 }
01195 else {
01196 outMsg->setType("BENCHMARK_SET_CONNECTION_PROFILE_FAILED");
01197 }
01198 }
01199 else if (type.equalsIgnoreCase("BENCHMARK_GET_CONNECTION_PROFILE")) {
01200 outMsg->setObject(localProfile.clone());
01201 outMsg->setType("BENCHMARK_GET_CONNECTION_PROFILE_SUCCESS");
01202 }
01203 delete(msg);
01204 return outMsg;
01205 }
01206
01207
01208 bool NetworkConnection::sendLocalProfile() {
01209 Message* msg = new Message("", "", "BENCHMARK_SET_CONNECTION_PROFILE", localProfile.clone());
01210
01211 Message* rmsg = sendReceiveObject(msg, 1000);
01212 if (rmsg == NULL) {
01213 delete(msg);
01214 return false;
01215 }
01216 delete(msg);
01217 if (!rmsg->getType().equalsIgnoreCase("BENCHMARK_SET_CONNECTION_PROFILE_SUCCESS")) {
01218 delete(rmsg);
01219 return false;
01220 }
01221 delete(rmsg);
01222 return true;
01223 }
01224
01225 bool NetworkConnection::getRemoteProfile() {
01226 Message* msg = new Message("", "", "BENCHMARK_GET_CONNECTION_PROFILE");
01227 Message* rmsg = sendReceiveObject(msg, 1000);
01228 if (rmsg == NULL) {
01229 delete(msg);
01230 return false;
01231 }
01232 delete(msg);
01233 if (!rmsg->getType().equalsIgnoreCase("BENCHMARK_GET_CONNECTION_PROFILE_SUCCESS")) {
01234 delete(rmsg);
01235 return false;
01236 }
01237 ConnectionProfile* profile = (ConnectionProfile*) rmsg->getObject();
01238 if (profile == NULL) {
01239 delete(rmsg);
01240 return false;
01241 }
01242 remoteProfile = *profile;
01243 delete(rmsg);
01244 return true;
01245 }
01246
01247 long NetworkConnection::pingRemotePeer() {
01248 if (!isConnected()) {
01249
01250 return -1;
01251 }
01252 Message* msg = new Message("", "", "BENCHMARK_PING");
01253 JTime start;
01254
01255 Message* rmsg = sendReceiveObject(msg, 1000);
01256 if (rmsg == NULL) {
01257
01258 delete(msg);
01259 return -1;
01260 }
01261 delete(msg);
01262 JTime end;
01263 long res = -1;
01264 if (rmsg->getType().equalsIgnoreCase("BENCHMARK_PING_SUCCESS"))
01265 res = end.microDifference(start);
01266
01267
01268 delete(rmsg);
01269 return res;
01270 }
01271
01272 long NetworkConnection::uploadServerTest(long size) {
01273
01274 DataSample* sample = new DataSample();
01275 char* dat = new char[size];
01276 sample->giveData(dat, size);
01277 Message* msg = new Message("", "", "BENCHMARK_UPSTREAM_TEST", sample);
01278
01279 JTime start;
01280 Message* rmsg = sendReceiveObject(msg, 1000);
01281 if (rmsg == NULL) {
01282 delete(msg);
01283 return -1;
01284 }
01285 delete(msg);
01286 JTime end;
01287 long res = -1;
01288 if (rmsg->getType().equalsIgnoreCase("BENCHMARK_UPSTREAM_TEST_SUCCESS"))
01289 res = end.microDifference(start);
01290 delete(rmsg);
01291 return res;
01292 }
01293
01294 long NetworkConnection::downloadServerTest(long size) {
01295
01296 Message* msg = new Message("", "", "BENCHMARK_DOWNSTREAM_TEST", JString(size), "");
01297
01298 JTime start;
01299 Message* rmsg = sendReceiveObject(msg, 1000);
01300 if (rmsg == NULL) {
01301 delete(msg);
01302 return -1;
01303 }
01304 delete(msg);
01305 JTime end;
01306 long res = -1;
01307 if (rmsg->getType().equalsIgnoreCase("BENCHMARK_DOWNSTREAM_TEST_SUCCESS")) {
01308 DataSample* sample = (DataSample*) rmsg->getObject();
01309 if (sample != NULL)
01310 if (sample->getDataSize() == size)
01311 res = end.microDifference(start);
01312 }
01313 delete(rmsg);
01314 return res;
01315 }
01316
01317
01318
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329
01330
01331
01332
01333
01334 }