00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "Network.h"
00027 #include "DataSample.h"
00028
00029 namespace cmlabs {
00030
00031
00032
00033
00034
00035 Network::Network(TCPReceiver* recv) : JThread() {
00036 netCheckTimeout = 5000;
00037 isInitialized = false;
00038 allowNetConnections = true;
00039 netProtocols = NULL;
00040 primaryCon = NULL;
00041 primaryLoc = NULL;
00042 preferredNetProtocol = new NetMessageProtocol();
00043
00044 mySocket = NULL;
00045 cons = new ObjectCollection();
00046 cons->doDelete();
00047 receiverCons = new ObjectDictionary();
00048 receiverCons->doDelete();
00049 remoteReceiverCons = new ObjectDictionary();
00050 remoteReceiverCons->noDelete();
00051 port = 0;
00052 receiver = recv;
00053 shouldContinue = true;
00054 isDoneRunning = false;
00055 }
00056
00057 Network::~Network() {
00058 shouldContinue = false;
00059 allowNetConnections = false;
00060
00061 if (DEBUGLEVEL(DETAILEDSTATUS))
00062 printf("Network %s deleting...\n", (char*) name);
00063 if (isInitialized) {
00064 JTime t;
00065
00066 while (JTime() - t < 1000) {
00067 if (isDoneRunning)
00068 break;
00069 wait(5);
00070 }
00071 }
00072 if (mySocket != NULL)
00073 delete(mySocket);
00074 if (DEBUGLEVEL(DETAILEDSTATUS))
00075 printf("Network %s terminating...\n", (char*) name);
00076 waitThread(10);
00077 this->terminate();
00078 if (DEBUGLEVEL(DETAILEDSTATUS))
00079 printf("Network %s deleting connections...\n", (char*) name);
00080 deleteConnections();
00081 this->removeAllProtocols();
00082 delete(preferredNetProtocol);
00083 preferredNetProtocol = NULL;
00084 delete(primaryLoc);
00085 if (DEBUGLEVEL(DETAILEDSTATUS))
00086 printf("Network %s deleted...\n", (char*) name);
00087 }
00088
00089 bool Network::init(int portNum) {
00090
00091 if (portNum > 0)
00092 port = portNum;
00093
00094
00095
00096 isInitialized = false;
00097
00098 if (mySocket != NULL) {
00099 wait(200);
00100 mySocket->disconnectNow();
00101 delete(mySocket);
00102 mySocket = NULL;
00103 }
00104
00105 if (port > 0) {
00106
00107 mySocket = new JSocket("", port, "");
00108
00109 if (!mySocket->setUpListening()) {
00110 if (DEBUGLEVEL(STATUS)) {
00111 printf("\n [%s] Port %d not available.\n", (char*) name, port);
00112 }
00113 port = 0;
00114 delete(mySocket);
00115 mySocket = NULL;
00116 return false;
00117 }
00118 }
00119
00120 isInitialized = true;
00121 return true;
00122 }
00123
00124 bool Network::allowConnections(bool allow) {
00125 allowNetConnections = allow;
00126 return true;
00127 }
00128
00129
00130 void Network::run()
00131 {
00132 NetworkConnection* con;
00133 JSocket* c = NULL;
00134 NetProtocol* protocol;
00135
00136
00137 JTime lastCheck;
00138 JTime t;
00139
00140 this->setPriority(90);
00141 while (shouldContinue) {
00142
00143 while (!isInitialized) {
00144 if (!shouldContinue) {
00145 isDoneRunning = true;
00146 return;
00147 }
00148 wait(50);
00149 }
00150
00151 if (mySocket != NULL) {
00152 c = mySocket->waitForConnection(100);
00153 if (c != NULL) {
00154 if (!allowNetConnections) {
00155 c->terminate();
00156 delete(c);
00157 }
00158 else if (c->isConnected()) {
00159 protocol = detectProtocol(c, MSGTIMEOUT);
00160 if (protocol != NULL) {
00161 protocol->initializeConversation(c);
00162 con = this->openIncomingConnection(c, protocol);
00163 }
00164 else {
00165 if (c->isConnected()) {
00166 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00167 printf("[%s] Not able to identify protocol, using low level...\n", (char*)name);
00168 }
00169 con = this->openIncomingConnection(c, NULL);
00170 }
00171 else {
00172 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00173 printf("[%s] Connection closed before identifying protocol...\n", (char*)name);
00174 }
00175 delete(c);
00176 }
00177 }
00178 }
00179 }
00180 }
00181 else {
00182 wait(100);
00183 }
00184
00185
00186 if (lastCheck.getAge() > netCheckTimeout) {
00187 if (!checkPrimaryConnection()) {
00188 t = JTime(netCheckTimeout);
00189 if (DEBUGLEVEL(DEBUG)) {
00190 JTime now;
00191 printf("%s: Main Connection is down and reconnect failed - recheck in %s\n", (char*) now.print(), (char*) t.printDifTime());
00192 }
00193 }
00194 checkReceivingConnections();
00195 checkOpenConnections();
00196 tryReconnectingLostConnections();
00197 lastCheck = JTime();
00198 }
00199 }
00200 isDoneRunning = true;
00201
00202 }
00203
00204
00205 void Network::deleteConnections() {
00206 remoteReceiverCons->removeAllNoDelete();
00207
00208 int n;
00209 NetworkConnection* con;
00210 for (n=0; n<cons->getCount(); n++) {
00211 con = (NetworkConnection*) cons->get(n);
00212 if (con != NULL) {
00213 if (DEBUGLEVEL(MOREDETAILED)) {
00214 printf("Network terminating the following connection:\n %s\n", (char*) con->print());
00215 }
00216 con->terminate();
00217 }
00218 }
00219 for (n=0; n<receiverCons->getCount(); n++) {
00220 con = (NetworkConnection*) receiverCons->get(n);
00221 if (con != NULL) {
00222 if (DEBUGLEVEL(MOREDETAILED)) {
00223 printf("Network terminating the following connection:\n %s\n", (char*) con->print());
00224 }
00225 con->terminate();
00226 }
00227 }
00228 delete(cons);
00229 delete(receiverCons);
00230 if (primaryCon != NULL)
00231 delete(primaryCon);
00232 delete(remoteReceiverCons);
00233 }
00234
00235 JString Network::print() {
00236 if ( (cons == NULL) || (cons->getCount() == 0)) {
00237 return JString::format("[%s] No network connections", (char*) name);
00238 }
00239 else {
00240 return JString::format("[%s] %d connections:\n\t%s\n",
00241 (char*) name, cons->getCount(), (char*) cons->printListLine("\n\t"));
00242 }
00243 }
00244
00245
00246
00247
00248
00249
00250 bool Network::addProtocol(NetProtocol* protocol) {
00251
00252 if (netProtocols == NULL) {
00253 netProtocols = protocol;
00254 protocol->next = protocol->prev = NULL;
00255 }
00256 else {
00257 NetProtocol* p = netProtocols;
00258 while (p->next != NULL) p = p->next;
00259 p->next = protocol;
00260 protocol->prev = p;
00261 protocol->next = NULL;
00262 }
00263 return true;
00264 }
00265
00266 bool Network::removeProtocol(NetProtocol* protocol) {
00267
00268 if (netProtocols == NULL) return true;
00269
00270 NetProtocol* p = netProtocols;
00271 while (p != NULL) {
00272 if (p->equals(protocol)) {
00273 if ( (p->prev == NULL) && (p->next == NULL) )
00274 netProtocols = NULL;
00275 else {
00276 if (p->prev != NULL)
00277 p->prev->next = p->next;
00278 if (p->next != NULL)
00279 p->next->prev = p->prev;
00280 }
00281 delete(p);
00282 return true;
00283 }
00284 p = p->next;
00285 }
00286
00287 return false;
00288 }
00289
00290 bool Network::removeAllProtocols() {
00291
00292 NetProtocol* p = netProtocols;
00293
00294 while (netProtocols != NULL) {
00295 p = netProtocols->next;
00296 delete(netProtocols);
00297 netProtocols = p;
00298 }
00299 return true;
00300 }
00301
00302 NetProtocol* Network::detectProtocol(JSocket* socket, long timeout) {
00303 if (netProtocols == NULL) return NULL;
00304 JTime start;
00305
00306 NetProtocol* p = NULL;
00307 while (p == NULL) {
00308 socket->readIntoBuffer();
00309 if (!socket->isConnected()) {
00310 if (DEBUGLEVEL(DETAILEDSTATUS))
00311 printf("[%s] detectProtocol - not connected...\n", (char*)name);
00312 return NULL;
00313 }
00314 p = netProtocols;
00315 while (p != NULL) {
00316 if (p->checkBufferForCompatibility(socket->charBuffer, socket->charBufferCount)) {
00317 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00318 printf("[Network] Found matching protocol for new con: %s\n", (char*) p->name);
00319 }
00320 break;
00321 }
00322 p = p->next;
00323 }
00324
00325 if (start.getAge() > timeout)
00326 break;
00327 if (p == NULL)
00328 wait(10);
00329 }
00330 if (p == NULL) {
00331 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00332 JTime now;
00333 printf("[%s] detectProtocol - timeout detecting protocol (%s - %s)...\n", (char*)name, (char*) start.printTimeMS(), (char*) now.printTimeMS());
00334 }
00335 return NULL;
00336 }
00337 else {
00338 if (DEBUGLEVEL(DETAILEDSTATUS))
00339 printf("[%s] detectProtocol - found protocol...\n", (char*)name);
00340 return (NetProtocol*) p->clone();
00341 }
00342 }
00343
00344
00345
00346
00347
00348 JString Network::getLocalHostname() {
00349
00350 if (localHostname.length() > 0)
00351 return localHostname;
00352
00353 if (mySocket != NULL)
00354 localHostname = mySocket->getLocalHostname();
00355 else {
00356 JSocket* sock = new JSocket("", 80);
00357 localHostname = sock->getLocalHostname();
00358 delete sock;
00359 }
00360 return localHostname;
00361 }
00362
00363 JString Network::getLocalIPAddress() {
00364
00365 if (localIPAddress.length() > 0)
00366 return localIPAddress;
00367
00368 if (mySocket != NULL)
00369 localIPAddress = mySocket->getLocalIPAddress();
00370 else {
00371 JSocket* sock = new JSocket("", 80);
00372 localIPAddress = sock->getLocalIPAddress();
00373 delete sock;
00374 }
00375 return localIPAddress;
00376 }
00377
00378 Collection Network::getLocalIPAddresses() {
00379
00380 if (localIPAddresses.getCount() > 0)
00381 return localIPAddresses;
00382
00383 if (mySocket != NULL)
00384 localIPAddresses = mySocket->getLocalIPAddresses();
00385 else {
00386 JSocket* sock = new JSocket("", 80);
00387 localIPAddresses = sock->getLocalIPAddresses();
00388 delete sock;
00389 }
00390 if (localIPAddresses.getCount() == 0)
00391 localIPAddresses.add("127.0.0.1");
00392 return localIPAddresses;
00393 }
00394
00395 JString Network::getRemoteHostname() {
00396
00397 if (mySocket == NULL)
00398 return "";
00399
00400
00401
00402
00403 return mySocket->getRemoteHostname();
00404 }
00405
00406 IPNumber Network::getIPNumberFromName(const JString& name) {
00407 IPNumber ipnum;
00408
00409 if (name.length() == 0) {
00410 if (DEBUGLEVEL(STATUS)) {
00411 printf("[%s] IPNumber for '', defaulting to 127.0.0.1...", (char*) name);
00412 }
00413 ipnum.a = 127;
00414 ipnum.b = 0;
00415 ipnum.c = 0;
00416 ipnum.d = 1;
00417 return ipnum;
00418 }
00419
00420 if (mySocket != NULL) {
00421 ipnum = mySocket->getIPNumberFromName(name);
00422 }
00423 else {
00424 JSocket* sock = new JSocket("", 0);
00425 ipnum = sock->getIPNumberFromName(name);
00426 delete(sock);
00427 }
00428 return ipnum;
00429 }
00430
00431 JString Network::getIPAddressFromName(const JString& name) {
00432 JString ipaddr;
00433
00434 if (mySocket != NULL) {
00435 ipaddr = mySocket->getIPAddressFromName(name);
00436 }
00437 else {
00438 JSocket* sock = new JSocket("", 0);
00439 ipaddr = sock->getIPAddressFromName(name);
00440 delete(sock);
00441 }
00442 return ipaddr;
00443 }
00444
00445
00446
00447
00448
00449
00450 NetProtocol* Network::getPreferredNetProtocol() {
00451 return (NetProtocol*) preferredNetProtocol->clone();
00452 }
00453
00454 bool Network::setPreferredNetProtocol(NetProtocol* prefProtocol) {
00455 if (preferredNetProtocol != NULL)
00456 delete(preferredNetProtocol);
00457 preferredNetProtocol = prefProtocol;
00458 return true;
00459 }
00460
00461 NetworkConnection* Network::openConnection(const TCPLocation& loc) {
00462
00463 if ((loc.netProtocolName.length() == 0) || (loc.netProtocolName.equalsIgnoreCase(preferredNetProtocol->name)))
00464 return openConnection(loc, getPreferredNetProtocol());
00465 else
00466 return openConnection(loc, new NetMessageProtocol());
00467
00468 }
00469
00470 NetworkConnection* Network::openConnection(const TCPLocation& loc, NetProtocol* netprotocol) {
00471
00472
00473
00474 if (loc.name.equalsIgnoreCase(name)) {
00475
00476 return false;
00477 }
00478
00479
00480
00481
00482
00483 NetworkConnection* con = findConnectedConnection(loc.name);
00484 if (con != NULL) {
00485
00486 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00487 if (con->isRemoteCallback()) {
00488
00489 printf("Found Remote Callback to send on to %s out of: %s\n\t%s\n", (char*) loc.name, (char*) con->print(), (char*) cons->printListLine("\n\t"));
00490 }
00491 else {
00492 printf("Found own connection to send on to %s out of: %s\n\t%s\n", (char*) loc.name, (char*) con->print(), (char*) cons->printListLine("\n\t"));
00493
00494 }
00495 }
00496 delete(netprotocol);
00497
00498
00499
00500
00501 return con;
00502 }
00503 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00504 printf("[%s] Didn't find connection to send on to (%s) out of: \n\t%s\n", (char*) name, (char*) loc.asText(), (char*) cons->printListLine("\n\t"));
00505
00506 }
00507
00508
00509 con = findResetConnection();
00510
00511
00512 if (con != NULL) {
00513
00514
00515
00516
00517
00518 con->restart(loc, netprotocol, NULL);
00519 }
00520 else {
00521
00522
00523
00524
00525
00526 con = new NetworkConnection(loc, netprotocol, NULL);
00527 con->parent = this;
00528 con->name = name;
00529 cons->add(con);
00530 }
00531
00532 if (con->isConnected())
00533 return con;
00534 else {
00535 con->reset();
00536 return NULL;
00537 }
00538 }
00539
00540 NetworkConnection* Network::createNetworkConnectionTo(const TCPLocation& loc, TCPReceiver* rec) {
00541
00542 return openConnection(loc, NULL);
00543 }
00544
00545 NetworkConnection* Network::openIncomingConnection(JSocket* sock, NetProtocol* protocol) {
00546
00547
00548
00549
00550 NetworkConnection* con = findResetConnection();
00551
00552
00553 if (con != NULL) {
00554
00555 con->restart(sock, protocol, receiver);
00556 }
00557 else {
00558
00559 con = new NetworkConnection(sock, protocol, receiver);
00560 con->parent = this;
00561 con->name = name;
00562 cons->add(con);
00563 }
00564
00565 if (receiver != NULL) {
00566 if (!receiver->connectNotify(con->remoteName, con)) {
00567 con->reset();
00568 return NULL;
00569 }
00570 }
00571
00572 if (con->isConnected()) {
00573
00574 if (!con->isReceiver()) {
00575 if (DEBUGLEVEL(DEBUG)) {
00576 printf("### [%s] Registered Receiver: %s ###\n", (char*) name, (char*) con->remoteName);
00577 fflush(stdout);
00578 }
00579
00580 if (DEBUGLEVEL(DETAILEDSTATUS))
00581 printf("### [%s] Now have these connections: ### \n\t%s\n", (char*) name, (char*) cons->printListLine("\n\t"));
00582
00583 remoteReceiverCons->put(con->remoteName, con);
00584
00585
00586 NetworkConnection* con2 = NULL;
00587 for (int n=0; n<remoteReceiverCons->getCount(); n++) {
00588 con2 = (NetworkConnection*) remoteReceiverCons->get(n);
00589 if ((con2 != NULL) && (con2 != con))
00590 if (!con2->isReceiver())
00591 if (con2->isConnectedTo(con->remoteName))
00592 if (con2->isPersistent()) {
00593 if (DEBUGLEVEL(DEBUG)) {
00594 printf("### [%s] Disconnected previous receiver: %s ###\n", (char*) name, (char*) con->remoteName);
00595 }
00596
00597 con2->reset();
00598 remoteReceiverCons->removeNoDelete(n);
00599 n--;
00600 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00601 printf("### [%s] Now have these connections: ### \n\t%s\n", (char*) name, (char*) cons->printListLine("\n\t"));
00602 }
00603 }
00604 }
00605 }
00606
00607 return con;
00608 }
00609 else {
00610 con->reset();
00611 return NULL;
00612 }
00613 }
00614
00615 NetworkConnection* Network::openReceivingConnection(const TCPLocation& loc) {
00616 if ((loc.netProtocolName.length() == 0) || (loc.netProtocolName.equalsIgnoreCase(preferredNetProtocol->name)))
00617 return openReceivingConnection(loc, getPreferredNetProtocol());
00618 else
00619 return openReceivingConnection(loc, new NetMessageProtocol());
00620 }
00621
00622 NetworkConnection* Network::openReceivingConnection(const TCPLocation& loc, NetProtocol* netprotocol) {
00623
00624 if (loc.name.equalsIgnoreCase(name)) {
00625
00626 return false;
00627 }
00628
00629 NetworkConnection* con = (NetworkConnection*) receiverCons->get(loc.name);
00630 if (con != NULL) {
00631 if (con->isConnected())
00632 return con;
00633 }
00634
00635 con = new NetworkConnection(loc, netprotocol, NULL);
00636 con->name = name;
00637
00638 if (!con->initializeAsReceiver(receiver)) {
00639 delete(con);
00640 return NULL;
00641 }
00642
00643 con->parent = this;
00644 if (loc.name.length() > 0)
00645 receiverCons->put(loc.name, con);
00646 else
00647 receiverCons->put(loc.asText(), con);
00648 return con;
00649 }
00650
00651 bool Network::checkReceivingConnections() {
00652
00653 NetworkConnection* con = NULL;
00654 NetworkConnection* newcon = NULL;
00655 JString key;
00656 for (int n=0; n<receiverCons->getCount(); n++) {
00657 key = receiverCons->getKey(n);
00658 con = (NetworkConnection*) receiverCons->get(key);
00659 newcon = NULL;
00660 if (con != NULL) {
00661 if (con->isConnected()) {
00662 if (DEBUGLEVEL(HEARTBEAT)) {
00663 printf("[%s] Callback to '%s' still valid!\n", (char*) name, (char*) key);
00664 }
00665 }
00666
00667 else {
00668 TCPLocation loc = con->getLastConnectedTo();
00669 NetProtocol* netprot = (NetProtocol*) con->netprotocol->clone();
00670 con->reset();
00671 receiverCons->remove(key);
00672 if (loc.isValid()) {
00673 if (netprot != NULL)
00674 newcon = openReceivingConnection(loc, netprot);
00675 else
00676 newcon = openReceivingConnection(loc);
00677 }
00678 if (newcon != NULL) {
00679 if (DEBUGLEVEL(STATUS)) {
00680 printf("### Lost callback to '%s' (%s), re-established successfully!\n", (char*) key, (char*) loc.print());
00681 }
00682 }
00683 else {
00684 if (DEBUGLEVEL(STATUS))
00685 printf("### Lost callback to '%s' (%s) and was unable to re-establish!\n", (char*) key, (char*) loc.print());
00686 lostReceiveConnections.put(key, loc.clone());
00687
00688
00689 }
00690
00691 return checkReceivingConnections();
00692 }
00693 }
00694 }
00695
00696 return true;
00697 }
00698
00699 bool Network::checkOpenConnections() {
00700
00701 NetworkConnection* con = NULL;
00702 NetworkConnection* newcon = NULL;
00703 TCPLocation loc;
00704 NetProtocol* netprot = NULL;
00705 JString remoteName;
00706 for (int n=0; n<cons->getCount(); n++) {
00707 con = (NetworkConnection*) cons->get(n);
00708 newcon = NULL;
00709 if (con != NULL) {
00710 remoteName = con->remoteName;
00711 loc = con->getLastConnectedTo();
00712 if (con->isConnected()) {
00713 if (DEBUGLEVEL(HEARTBEAT)) {
00714 printf("%d: [%s] Connection to '%s' still valid!\n", n, (char*) name, (char*) remoteName);
00715 }
00716 }
00717 else if (con->isBroken()) {
00718 if (DEBUGLEVEL(STATUS)) {
00719 if (con->isRemoteCallback()) {
00720 printf("%d: [%s] Callback from '%s' broken!\n", n, (char*) name, (char*) remoteName);
00721 con->reset();
00722 continue;
00723 }
00724 else
00725 printf("%d: [%s] Connection to '%s' broken!\n", n, (char*) name, (char*) remoteName);
00726 }
00727
00728 if (loc.isValid()) {
00729
00730 if (con->netprotocol == NULL)
00731 netprot = NULL;
00732 else
00733 netprot = (NetProtocol*) con->netprotocol->clone();
00734 con->reset();
00735
00736 if (netprot != NULL)
00737 newcon = openConnection(loc, netprot);
00738 else
00739 newcon = openConnection(loc);
00740 if ( (newcon != NULL) && (!newcon->ping(name, loc.name)) ) {
00741
00742 newcon->reset();
00743 newcon = NULL;
00744 }
00745
00746 if (newcon != NULL) {
00747 if (DEBUGLEVEL(STATUS)) {
00748 printf("### Lost connection to '%s' (%s), re-established successfully!\n", (char*) remoteName, (char*) loc.print());
00749 }
00750 }
00751 else {
00752 if (DEBUGLEVEL(STATUS))
00753 printf("### Lost connection to '%s' (%s) and was unable to re-establish!\n", (char*) remoteName, (char*) loc.print());
00754 lostSendConnections.put(remoteName, loc.clone());
00755 if (receiver != NULL)
00756 receiver->disconnectNotify(remoteName, NULL);
00757 }
00758 }
00759 else
00760 con->reset();
00761 }
00762 else {
00763 if (DEBUGLEVEL(HEARTBEAT)) {
00764 printf("%d: [%s] Connection to '%s' reset!\n", n, (char*) name, (char*) remoteName);
00765 }
00766 }
00767 }
00768 }
00769
00770 return true;
00771 }
00772
00773 bool Network::tryReconnectingLostConnections() {
00774 int n;
00775 TCPLocation* loc;
00776 NetworkConnection* con;
00777 for (n=0; n<lostSendConnections.getCount(); n++) {
00778 loc = (TCPLocation*) lostSendConnections.get(n);
00779 if (loc != NULL) {
00780 if ( (con = openConnection(*loc)) != NULL ) {
00781 if (con->ping(name, loc->name)) {
00782 if (DEBUGLEVEL(STATUS)) {
00783 printf("### Lost connection to '%s' (%s) was reconnected successfully!\n", (char*) loc->name, (char*) loc->print());
00784 }
00785 lostSendConnections.remove(n);
00786 n--;
00787 }
00788 else {
00789 con->reset();
00790
00791 }
00792 }
00793 }
00794 }
00795
00796 for (n=0; n<lostReceiveConnections.getCount(); n++) {
00797 loc = (TCPLocation*) lostReceiveConnections.get(n);
00798 if (loc != NULL) {
00799 if (openReceivingConnection(*loc)) {
00800 if (DEBUGLEVEL(STATUS)) {
00801 printf("### Lost callback to '%s' (%s) was reconnected successfully!\n", (char*) loc->name, (char*) loc->print());
00802 }
00803 lostReceiveConnections.remove(n);
00804 n--;
00805 }
00806 }
00807 }
00808 return true;
00809
00810 }
00811
00812
00813 bool Network::checkPrimaryConnection() {
00814 if (primaryLoc == NULL)
00815 return true;
00816
00817 if (primaryCon == NULL) {
00818 if (DEBUGLEVEL(STATUS))
00819 printf("### Primary connection NULL, trying to connect...\n");
00820 if (setPrimaryConnection(*primaryLoc)) {
00821 if (DEBUGLEVEL(STATUS))
00822 printf("### Primary connection reconnected after NULL...\n");
00823 if (receiver != NULL)
00824 receiver->primaryReconnectNotify();
00825 return true;
00826 }
00827 else {
00828 if (DEBUGLEVEL(STATUS))
00829 printf("### Primary connection failed after NULL...\n");
00830 if (receiver != NULL)
00831 receiver->primaryDisconnectNotify();
00832 return false;
00833 }
00834
00835 }
00836
00837 if (!primaryMutex.EnterMutex(1000))
00838 return false;
00839 if (!primaryCon->isConnectedTo(primaryLoc->name)) {
00840 bool wasConnected = true;
00841 if (primaryCon->isReset()) {
00842 if (DEBUGLEVEL(STATUS))
00843 printf("### Primary connection no longer connected and reset...\n");
00844 wasConnected = false;
00845 }
00846 else {
00847 if (DEBUGLEVEL(STATUS))
00848 printf("### Primary connection broken...\n");
00849 primaryCon->reset();
00850 }
00851 if (primaryCon->restart(*primaryLoc, getPreferredNetProtocol(), NULL)) {
00852 primaryCon->parent = this;
00853 if (primaryCon->ping(name, primaryLoc->name)) {
00854 if (DEBUGLEVEL(STATUS))
00855 printf("### Primary connection reconnected and ping success...\n");
00856 primaryMutex.LeaveMutex();
00857 if (receiver != NULL)
00858 receiver->primaryReconnectNotify();
00859 return true;
00860 }
00861 else {
00862 if (DEBUGLEVEL(STATUS))
00863 printf("### Primary connection reconnected, but ping failed, resetting......\n");
00864 primaryCon->reset();
00865 primaryMutex.LeaveMutex();
00866 if ((receiver != NULL) && (wasConnected))
00867 receiver->primaryDisconnectNotify();
00868 return false;
00869 }
00870 }
00871 else {
00872 if (DEBUGLEVEL(STATUS))
00873 printf("### Primary connection did not reconnect...\n");
00874 primaryCon->reset();
00875 primaryMutex.LeaveMutex();
00876 if ((receiver != NULL) && (wasConnected))
00877 receiver->primaryDisconnectNotify();
00878 return false;
00879 }
00880 }
00881 primaryMutex.LeaveMutex();
00882 return true;
00883 }
00884
00885 bool Network::setPrimaryConnection(const JString& addr, int port, const JString& name) {
00886 TCPLocation loc = TCPLocation(addr, port, name);
00887 return setPrimaryConnection(loc);
00888 }
00889
00890 bool Network::setPrimaryConnection(const TCPLocation& loc) {
00891 if (!loc.isValid())
00892 return false;
00893 if (primaryLoc != NULL)
00894 delete(primaryLoc);
00895 primaryLoc = (TCPLocation*) loc.clone();
00896
00897 if (!primaryMutex.EnterMutex(1000))
00898 return false;
00899 if (primaryCon != NULL) {
00900 if (primaryCon->isConnectedTo(loc.name)) {
00901 primaryMutex.LeaveMutex();
00902 return true;
00903 }
00904 else {
00905 primaryCon->reset();
00906 primaryCon->restart(*primaryLoc, getPreferredNetProtocol(), NULL);
00907 }
00908 }
00909 else {
00910 primaryCon = new NetworkConnection(*primaryLoc, getPreferredNetProtocol(), NULL);
00911 primaryCon->name = name;
00912 }
00913
00914 primaryCon->parent = this;
00915 if (primaryCon->ping(name, primaryLoc->name)) {
00916 primaryMutex.LeaveMutex();
00917 return true;
00918 }
00919 else {
00920 primaryMutex.LeaveMutex();
00921 return false;
00922 }
00923 }
00924
00925 bool Network::isPrimaryConnection(const JString& name) {
00926 if (primaryLoc == NULL)
00927 return false;
00928 else
00929 return (primaryLoc->name.equalsIgnoreCase(name));
00930 }
00931
00932
00933
00934 bool Network::isCurrentlyConnectedTo(const JString& name) {
00935
00936 if ( name.length() == 0 )
00937 return false;
00938
00939 if ( (isPrimaryConnection(name)) && (checkPrimaryConnection()) )
00940 return (primaryCon != NULL);
00941
00942 NetworkConnection* con = NULL;
00943 for (int n=cons->getCount()-1; n>=0; n--) {
00944 con = (NetworkConnection*) cons->get(n);
00945 if (con != NULL)
00946 if (con->isConnectedTo(name))
00947 if (!con->isReceiver())
00948 if (con->isPersistent()) {
00949 return true;
00950 }
00951 }
00952
00953 return false;
00954 }
00955
00956
00957 NetworkConnection* Network::findConnectedConnection(const JString& name) {
00958
00959 if ( name.length() == 0 )
00960 return NULL;
00961
00962 if (DEBUGLEVEL(KITCHENSINK)) {
00963 printf("[%s] Trying to find connection to send on to %s out of: \n\t%s\n", (char*) this->name, (char*) name, (char*) cons->printListLine("\n\t"));
00964 }
00965
00966 if ( (isPrimaryConnection(name)) && (checkPrimaryConnection()) )
00967 return primaryCon;
00968
00969 NetworkConnection* con = NULL;
00970 for (int n=cons->getCount()-1; n>=0; n--) {
00971 con = (NetworkConnection*) cons->get(n);
00972 if (con != NULL) {
00973 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00974
00975 }
00976 if (con->isConnectedTo(name)) {
00977 if (!con->isReceiver()) {
00978 if (con->isPersistent()) {
00979 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00980 printf("[%s] Found connection to send on to %s: %s\n", (char*) this->name, (char*) name, (char*) con->print());
00981 }
00982 return con;
00983 }
00984 }
00985 }
00986 }
00987 }
00988
00989 if (DEBUGLEVEL(DETAILEDSTATUS)) {
00990 printf("[%s] Found no connection to send on to %s\n", (char*) this->name, (char*) name);
00991 }
00992 return NULL;
00993
00994 }
00995
00996 NetworkConnection* Network::findConnectedConnection(const TCPLocation& loc, NetProtocol* netprotocol) {
00997
00998 if ( (!loc.isValid()) || (netprotocol == NULL) )
00999 return NULL;
01000
01001 if ( (isPrimaryConnection(loc.name)) && (checkPrimaryConnection()) && (netprotocol->equals(primaryCon->netprotocol)) )
01002 return primaryCon;
01003
01004 NetworkConnection* con = NULL;
01005 for (int n=cons->getCount()-1; n>=0; n--) {
01006 con = (NetworkConnection*) cons->get(n);
01007 if (con != NULL)
01008 if (con->isConnectedTo(loc))
01009 if (netprotocol->equals(con->netprotocol))
01010 if (!con->isReceiver())
01011 if (con->isPersistent())
01012 return con;
01013 }
01014
01015 return NULL;
01016 }
01017
01018 NetworkConnection* Network::findResetConnection() {
01019
01020 NetworkConnection* con = NULL;
01021
01022 int count = cons->getCount();
01023
01024
01025
01026 for (int n=0; n<count; n++) {
01027 con = (NetworkConnection*) cons->get(n);
01028 if (con->isReset()) {
01029 return con;
01030 }
01031 }
01032
01033
01034 return NULL;
01035 }
01036
01037
01038 Message* Network::receiveTCPInput(Message* msg) {
01039 return NULL;
01040 }
01041
01042
01043 Message* Network::netObjectReceive(Message* msg, NetworkConnection* con) {
01044
01045 if (msg == NULL)
01046 return NULL;
01047
01048 bool noreply = (msg->noreply.length() > 0);
01049
01050 Message* rmsg = new Message();
01051
01052 JString type = msg->getType();
01053 JString content = msg->getContent();
01054 if ((type.equals("HTTP")) && (msg->getObject() != NULL)) {
01055 printf("Tester received HTTP message!\n");
01056 }
01057 else if ((type.equals("TelnetInput")) && (content.length() > 0)) {
01058
01059 rmsg->setContent(JString("You said: ") + msg->getContent(), "");
01060 }
01061 else {
01062 if (type.equalsIgnoreCase("MediaTest")) {
01063 DataSample* sample = (DataSample*) msg->takeObject();
01064 if (sample == NULL) {
01065 printf("Tester received empty Media message!\n");
01066 }
01067 else if (sample->getDataLink() == NULL) {
01068 printf("Tester received Media message with no data!\n");
01069 }
01070 else {
01071 char ch = sample->getDataLink()[0];
01072 memset(sample->getDataLink(), ch+1, sample->getDataSize());
01073 rmsg->setType(type);
01074 rmsg->setObject(sample);
01075 rmsg->setType("BinaryOnly");
01076 }
01077 }
01078 else if (type.equalsIgnoreCase("MediaTest2")) {
01079 ObjectCollection* col = (ObjectCollection*) msg->takeObject();
01080 if (col != NULL) {
01081 rmsg->setType(type);
01082 rmsg->setObject(col);
01083 }
01084 else {
01085 printf("Tester received empty Media Collection Message!\n");
01086 }
01087 }
01088 else {
01089 rmsg->setType(type);
01090 rmsg->setContent(content + "x", "");
01091
01092 }
01093 }
01094
01095 delete(msg);
01096 return rmsg;
01097 }
01098
01099 JString Network::getName() {
01100 return name;
01101 }
01102
01103 int Network::getPort() {
01104 return port;
01105 }
01106
01107 int Network::getConnectionCount() {
01108 return cons->getCount();
01109 }
01110
01111 int Network::getSendingConnectionCount(const JString& name) {
01112
01113 if ( name.length() == 0 )
01114 return 0;
01115
01116 if ( (isPrimaryConnection(name)) && (checkPrimaryConnection()) ) {
01117 if (primaryCon != NULL)
01118 return 1;
01119 else
01120 return 0;
01121 }
01122
01123
01124 int count = 0;
01125 NetworkConnection* con = NULL;
01126 for (int n=cons->getCount()-1; n>=0; n--) {
01127 con = (NetworkConnection*) cons->get(n);
01128 if ( (con != NULL) && (!con->isReceiver()) ) {
01129
01130 if (con->remoteName.equals(name)) {
01131 if (con->isConnected())
01132 count++;
01133 }
01134 }
01135 }
01136
01137
01138 return count;
01139 }
01140
01141 int Network::getReceivingConnectionCount(const JString& name) {
01142
01143 if ( name.length() == 0 )
01144 return 0;
01145
01146 if ( (isPrimaryConnection(name)) && (checkPrimaryConnection()) ) {
01147 if (primaryCon != NULL)
01148 return 1;
01149 else
01150 return 0;
01151 }
01152
01153
01154 int count = 0;
01155 NetworkConnection* con = NULL;
01156 for (int n=cons->getCount()-1; n>=0; n--) {
01157 con = (NetworkConnection*) cons->get(n);
01158 if ( (con != NULL) && (con->isReceiver()) ) {
01159
01160 if (con->remoteName.equals(name)) {
01161 if (con->isConnected())
01162 count++;
01163 }
01164 }
01165 }
01166
01167
01168 return count;
01169 }
01170
01171 bool Network::trainNetworkProfile(const TCPLocation& loc, ConnectionProfile* profile) {
01172 if ((profile == NULL) || (!loc.isValid()))
01173 return false;
01174 NetworkConnection* con = openConnection(loc);
01175 if (con == NULL)
01176 return false;
01177 return con->trainNetworkProfile(profile);
01178 }
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188
01189
01190
01191
01192
01193
01194
01195
01196
01197
01198
01199 bool Network::unitTest() {
01200
01201 printf("\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b"); fflush(stdout);
01202
01203
01204 this->receiver = this;
01205
01206 this->addProtocol(new NetMessageProtocol());
01207 this->addProtocol(new NetOriginalMessageProtocol());
01208 this->addProtocol(new NetHTTPProtocol());
01209 this->addProtocol(new NetTelnetProtocol());
01210
01211
01212
01213
01214 Message* hmsg = new Message("Test", "Test", "Test", "Test", "");;
01215 JTime t1, t2;
01216 int i;
01217
01218 NetworkConnection* con = NULL;
01219 JString str = "Test";
01220 JString str2 = "Test";
01221
01222 int maxcount = 20;
01223
01224 str = "Test";
01225 str2 = "Test";
01226 t1 = JTime();
01227 for (i = 0; i < maxcount; i++) {
01228 printf(".\b"); fflush(stdout);
01229 con = new NetworkConnection("localhost", port, new NetOriginalMessageProtocol(), NULL);
01230 con->name = name;
01231 printf("o\b"); fflush(stdout);
01232 if (con->sendObject(hmsg, false)) {
01233 printf("O\b"); fflush(stdout);
01234 delete(hmsg);
01235 hmsg = con->receiveObject(1000);
01236 if (hmsg == NULL) {
01237 this->addUnitTestLog(JString("Error: No response message! "));
01238 printf("[Network] Error: No response message!\n");
01239 delete(con);
01240 return false;
01241 }
01242 else {
01243 str = hmsg->getContent();
01244 str2 = str2 + "x";
01245 if (!str.equals(str2)) {
01246 this->addUnitTestLog(JString("Error in receiving message2: ") + hmsg->getContent());
01247 printf("[Network] Error in receiving message2: %s\n", (char*) str);
01248 delete(con);
01249 delete(hmsg);
01250 return false;
01251 }
01252 }
01253 printf(" \b"); fflush(stdout);
01254 }
01255 else {
01256 delete(hmsg);
01257 printf("\n [Network] Could not write data to socket, giving up...\n\n");
01258 }
01259 wait(5);
01260 delete(con);
01261 }
01262 t2 = JTime();
01263
01264
01265 delete(hmsg);
01266
01267
01268
01269 hmsg = new Message("Test", "Test", "Test", "Test", "");
01270 str = "Test";
01271 str2 = "Test";
01272 t1 = JTime();
01273 for (i = 0; i < maxcount; i++) {
01274 printf("O.\b\b"); fflush(stdout);
01275
01276 con = new NetworkConnection("localhost", port, new NetMessageProtocol(), NULL);
01277 con->name = name;
01278
01279
01280 printf("Oo\b\b"); fflush(stdout);
01281 if (con->sendObject(hmsg, false)) {
01282
01283 printf("OO\b\b"); fflush(stdout);
01284 delete(hmsg);
01285 hmsg = con->receiveObject(200);
01286 if (hmsg == NULL) {
01287
01288 this->addUnitTestLog(JString("Error: No response message! "));
01289 printf("[Network] Error: No response message!\n");
01290 delete(con);
01291 return false;
01292 }
01293 else {
01294
01295 str = hmsg->getContent();
01296 str2 = str2 + "x";
01297
01298 if (!str.equals(str2)) {
01299 this->addUnitTestLog(JString("Error in receiving message: ") + hmsg->getContent());
01300 printf("[Network] Error in receiving message: %s\n", (char*) str);
01301 delete(con);
01302 delete(hmsg);
01303 return false;
01304 }
01305 }
01306 printf("O \b\b"); fflush(stdout);
01307
01308 }
01309 else {
01310 delete(hmsg);
01311 printf("\nCould not write data to socket, giving up...\n\n");
01312 }
01313 delete(con);
01314 wait(5);
01315 }
01316 t2 = JTime();
01317
01318
01319
01320 delete(hmsg);
01321
01322
01323
01324
01325
01326 maxcount = 10;
01327 int datasize = 50000;
01328 char* data;
01329 DataSample* sample;
01330 char ch;
01331 ObjectCollection* datacol;
01332 con = new NetworkConnection("localhost", port, new NetMessageProtocol(), NULL);
01333 con->name = name;
01334
01335 printf("OOm\b\b\b"); fflush(stdout);
01336 t1 = JTime();
01337 for (i=0; i<maxcount; i++) {
01338
01339
01340 data = new char[datasize];
01341 memset(data, '1', datasize);
01342 sample = new DataSample("1");
01343 sample->giveData(data, datasize, 5);
01344
01345 hmsg = new Message("", "", "MediaTest", sample);
01346
01347 if (con->sendObject(hmsg, false)) {
01348
01349
01350 delete(hmsg);
01351 hmsg = con->receiveObject(2000);
01352 if (hmsg == NULL) {
01353 this->addUnitTestLog(JString("Error in receiving media message 1"));
01354 printf("[Network] Error in receiving media message 1 [%d]\n", i);
01355 delete(con);
01356 return false;
01357 }
01358 sample = (DataSample*) hmsg->takeObject();
01359 if (sample == NULL) {
01360 printf("UnitTest received empty Media message!\n");
01361 delete(con);
01362 delete(hmsg);
01363 return false;
01364 }
01365 else if (sample->getDataLink() == NULL) {
01366 printf("UnitTest received Media message with no data!\n");
01367 delete(con);
01368 delete(hmsg);
01369 delete(sample);
01370 return false;
01371 }
01372 else {
01373 ch = sample->getDataLink()[0];
01374 if (ch != '2') {
01375 printf("UnitTest received wrong Media sample!\n");
01376 delete(con);
01377 delete(hmsg);
01378 delete(sample);
01379 return false;
01380 }
01381 }
01382 delete(hmsg);
01383 delete(sample);
01384 }
01385 else {
01386 this->addUnitTestLog(JString("Error in sending media message 1"));
01387 printf("[Network] Error in sending media message 1 [%d]\n", i);
01388 delete(con);
01389 delete(hmsg);
01390 return false;
01391 }
01392
01393 t2 = JTime();
01394 }
01395
01396 double perf = (datasize*maxcount*2) * (1000000.0/t2.microDifference(t1));
01397
01398
01399
01400
01401 printf("OOMm\b\b\b\b"); fflush(stdout);
01402 int chunks = 10;
01403 t1 = JTime();
01404 for (i=0; i<maxcount; i++) {
01405
01406
01407
01408 datacol = new ObjectCollection();
01409 for (int q=0; q<chunks; q++) {
01410 ch = 48+q;
01411 data = new char[datasize];
01412 memset(data, ch, datasize);
01413 sample = new DataSample(JString(q));
01414 sample->giveData(data, datasize, 5);
01415 datacol->add(sample);
01416 }
01417
01418 hmsg = new Message("", "", "MediaTest2", datacol);
01419
01420 if (con->sendObject(hmsg, false)) {
01421
01422
01423 delete(hmsg);
01424 hmsg = con->receiveObject(2000);
01425 if (hmsg == NULL) {
01426 this->addUnitTestLog(JString("Error in receiving media message 2"));
01427 printf("[Network] Error in receiving media message 2\n");
01428 delete(con);
01429 delete(hmsg);
01430 return false;
01431 }
01432 datacol = (ObjectCollection*) hmsg->takeObject();
01433 if (datacol == NULL) {
01434 printf("UnitTest received empty Media Collection message!\n");
01435 delete(con);
01436 delete(hmsg);
01437 return false;
01438 }
01439 else {
01440 for (int qq = 0; qq<datacol->getCount(); qq++) {
01441 sample = (DataSample*) datacol->get(qq);
01442 if (sample == NULL) {
01443 printf("UnitTest received no Media Collection sample!\n");
01444 delete(datacol);
01445 delete(con);
01446 delete(hmsg);
01447 return false;
01448 }
01449 else if (sample->getDataLink() == NULL) {
01450 printf("UnitTest received empty Media Collection sample!\n");
01451 delete(datacol);
01452 delete(con);
01453 delete(hmsg);
01454 return false;
01455 }
01456 else {
01457 ch = sample->getDataLink()[0];
01458 if (ch != 48+qq) {
01459 printf("UnitTest received wrong Media Collection sample!\n");
01460 delete(datacol);
01461 delete(con);
01462 delete(hmsg);
01463 return false;
01464 }
01465 }
01466 }
01467 }
01468 delete(datacol);
01469 delete(hmsg);
01470 }
01471 else {
01472 this->addUnitTestLog(JString("Error in sending media message 1"));
01473 printf("[Network] Error in sending media message1\n");
01474 delete(con);
01475 delete(hmsg);
01476 return false;
01477 }
01478
01479
01480 }
01481 t2 = JTime();
01482 printf("OOMM\b\b\b\b"); fflush(stdout);
01483
01484 delete(con);
01485
01486 JString perfStr = JString::bytifyRates(perf,
01487 (datasize*chunks*maxcount*2) * (1000000.0/t2.microDifference(t1)) );
01488
01489 printf("%s", (char*) JString(perfStr,47, 0));
01490
01491 return true;
01492 }
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506 }