• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

UDPForwarder.cpp

Go to the documentation of this file.
00001 #include "UDPForwarder.h"
00002 #include "GetTime.h"
00003 #include "MTUSize.h"
00004 #include "SocketLayer.h"
00005 #include "WSAStartupSingleton.h"
00006 #include "RakSleep.h"
00007 #include "DS_OrderedList.h"
00008 
00009 using namespace RakNet;
00010 static const unsigned short DEFAULT_MAX_FORWARD_ENTRIES=64;
00011 
00012 RAK_THREAD_DECLARATION(UpdateUDPForwarder);
00013 
00014 bool operator<( const DataStructures::MLKeyRef<UDPForwarder::SrcAndDest> &inputKey, const UDPForwarder::ForwardEntry *cls )
00015 {
00016         return inputKey.Get().source < cls->srcAndDest.source ||
00017                 (inputKey.Get().source == cls->srcAndDest.source && inputKey.Get().destination < cls->srcAndDest.destination);
00018 }
00019 bool operator>( const DataStructures::MLKeyRef<UDPForwarder::SrcAndDest> &inputKey, const UDPForwarder::ForwardEntry *cls )
00020 {
00021         return inputKey.Get().source > cls->srcAndDest.source ||
00022                 (inputKey.Get().source == cls->srcAndDest.source && inputKey.Get().destination > cls->srcAndDest.destination);
00023 }
00024 bool operator==( const DataStructures::MLKeyRef<UDPForwarder::SrcAndDest> &inputKey, const UDPForwarder::ForwardEntry *cls )
00025 {
00026         return inputKey.Get().source == cls->srcAndDest.source && inputKey.Get().destination == cls->srcAndDest.destination;
00027 }
00028 
00029 
00030 UDPForwarder::ForwardEntry::ForwardEntry() {readSocket=INVALID_SOCKET; timeLastDatagramForwarded=RakNet::GetTimeMS();}
00031 UDPForwarder::ForwardEntry::~ForwardEntry() {
00032         if (readSocket!=INVALID_SOCKET)
00033                 closesocket(readSocket);
00034 }
00035 
00036 UDPForwarder::UDPForwarder()
00037 {
00038 #ifdef _WIN32
00039         WSAStartupSingleton::AddRef();
00040 #endif
00041 
00042         maxForwardEntries=DEFAULT_MAX_FORWARD_ENTRIES;
00043         isRunning=false;
00044         threadRunning=false;
00045 }
00046 UDPForwarder::~UDPForwarder()
00047 {
00048         Shutdown();
00049 
00050 #ifdef _WIN32
00051         WSAStartupSingleton::Deref();
00052 #endif
00053 }
00054 void UDPForwarder::Startup(void)
00055 {
00056         if (isRunning==true)
00057                 return;
00058 
00059         isRunning=true;
00060         threadRunning=false;
00061 
00062 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00063         int errorCode = RakNet::RakThread::Create(UpdateUDPForwarder, this);
00064         if ( errorCode != 0 )
00065         {
00066                 RakAssert(0);
00067                 return;
00068         }
00069 
00070         while (threadRunning==false)
00071                 RakSleep(30);
00072 #endif
00073 }
00074 void UDPForwarder::Shutdown(void)
00075 {
00076         if (isRunning==false)
00077                 return;
00078 
00079         isRunning=false;
00080 
00081 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00082         while (threadRunning==true)
00083                 RakSleep(30);
00084 #endif
00085 
00086         forwardList.ClearPointers(true,__FILE__,__LINE__);
00087 }
00088 void UDPForwarder::Update(void)
00089 {
00090 #ifndef UDP_FORWARDER_EXECUTE_THREADED
00091         UpdateThreaded();
00092 #endif
00093 
00094 }
00095 void UDPForwarder::UpdateThreaded(void)
00096 {
00097         fd_set      readFD;
00098         //fd_set exceptionFD;
00099         FD_ZERO(&readFD);
00100 //      FD_ZERO(&exceptionFD);
00101         timeval tv;
00102         int selectResult;
00103         tv.tv_sec=0;
00104         tv.tv_usec=0;
00105 
00106         RakNetTimeMS curTime = RakNet::GetTimeMS();
00107 
00108         SOCKET largestDescriptor=0;
00109         DataStructures::DefaultIndexType i;
00110 
00111         // Remove unused entries
00112         i=0;
00113         while (i < forwardList.GetSize())
00114         {
00115                 if (curTime > forwardList[i]->timeLastDatagramForwarded && // Account for timestamp wrap
00116                         curTime > forwardList[i]->timeLastDatagramForwarded+forwardList[i]->timeoutOnNoDataMS)
00117                 {
00118                         RakNet::OP_DELETE(forwardList[i],__FILE__,__LINE__);
00119                         forwardList.RemoveAtIndex(i,__FILE__,__LINE__);
00120                 }
00121                 else
00122                         i++;
00123         }
00124 
00125         if (forwardList.GetSize()==0)
00126                 return;
00127 
00128         for (i=0; i < forwardList.GetSize(); i++)
00129         {
00130 #ifdef _MSC_VER
00131 #pragma warning( disable : 4127 ) // warning C4127: conditional expression is constant
00132 #endif
00133                 FD_SET(forwardList[i]->readSocket, &readFD);
00134 //              FD_SET(forwardList[i]->readSocket, &exceptionFD);
00135 
00136                 if (forwardList[i]->readSocket > largestDescriptor)
00137                         largestDescriptor = forwardList[i]->readSocket;
00138         }
00139 
00140 #if defined(_PS3) || defined(__PS3__) || defined(SN_TARGET_PS3)
00141                                                                     
00142 #else
00143         selectResult=(int) select((int) largestDescriptor+1, &readFD, 0, 0, &tv);
00144 #endif
00145 
00146         char data[ MAXIMUM_MTU_SIZE ];
00147         sockaddr_in sa;
00148         socklen_t len2;
00149 
00150         if (selectResult > 0)
00151         {
00152                 DataStructures::Queue<ForwardEntry*> entriesToRead;
00153                 ForwardEntry *feSource;
00154 
00155                 for (i=0; i < forwardList.GetSize(); i++)
00156                 {
00157                         feSource = forwardList[i];
00158                         // I do this because I'm updating the forwardList, and don't want to lose FD_ISSET as the list is no longer in order
00159                         if (FD_ISSET(feSource->readSocket, &readFD))
00160                                 entriesToRead.Push(feSource,__FILE__,__LINE__);
00161                 }
00162 
00163                 while (entriesToRead.IsEmpty()==false)
00164                 {
00165                         feSource=entriesToRead.Pop();
00166 
00167                         const int flag=0;
00168                         int receivedDataLen, len=0;
00169                         unsigned short portnum=0;
00170                         len2 = sizeof( sa );
00171                         sa.sin_family = AF_INET;
00172                         receivedDataLen = recvfrom( feSource->readSocket, data, MAXIMUM_MTU_SIZE, flag, ( sockaddr* ) & sa, ( socklen_t* ) & len2 );
00173                         portnum = ntohs( sa.sin_port );
00174 
00175                         if (feSource->srcAndDest.source.binaryAddress==sa.sin_addr.s_addr)
00176                         {
00177                                 if (feSource->updatedSourceAddress==false)
00178                                 {
00179                                         feSource->updatedSourceAddress=true;
00180 
00181                                         if (feSource->srcAndDest.source.port!=portnum)
00182                                         {
00183                                                 // Remove both source and dest from list, update addresses, and reinsert in order
00184                                                 DataStructures::DefaultIndexType sourceIndex, destIndex;
00185                                                 SrcAndDest srcAndDest;
00186                                                 srcAndDest.source=feSource->srcAndDest.destination;
00187                                                 srcAndDest.destination=feSource->srcAndDest.source;
00188                                                 destIndex=forwardList.GetIndexOf(srcAndDest);
00189                                                 ForwardEntry *feDest = forwardList[destIndex];
00190 
00191                                                 forwardList.RemoveAtIndex(destIndex,__FILE__,__LINE__);
00192                                                 srcAndDest.source=feSource->srcAndDest.source;
00193                                                 srcAndDest.destination=feSource->srcAndDest.destination;
00194                                                 sourceIndex=forwardList.GetIndexOf(srcAndDest);
00195                                                 forwardList.RemoveAtIndex(sourceIndex,__FILE__,__LINE__);
00196 
00197                                                 feSource->srcAndDest.source.port=portnum;
00198                                                 feDest->srcAndDest.destination.port=portnum;
00199 
00200                                                 feSource->timeLastDatagramForwarded=curTime;
00201                                                 feDest->timeLastDatagramForwarded=curTime;
00202                                                 
00203                                                 forwardList.Push(feSource,feSource->srcAndDest,__FILE__,__LINE__);
00204                                                 forwardList.Push(feDest,feDest->srcAndDest,__FILE__,__LINE__);
00205 
00206                                         }
00207                                 }
00208 
00209                                 if (feSource->srcAndDest.source.port==portnum)
00210                                 {
00211                                         // Forward to destination
00212                                         len=0;
00213                                         sockaddr_in saOut;
00214                                         saOut.sin_port = htons( feSource->srcAndDest.destination.port ); // User port
00215                                         saOut.sin_addr.s_addr = feSource->srcAndDest.destination.binaryAddress;
00216                                         saOut.sin_family = AF_INET;
00217                                         do
00218                                         {
00219                                                 len = sendto( feSource->writeSocket, data, receivedDataLen, 0, ( const sockaddr* ) & saOut, sizeof( saOut ) );
00220                                         }
00221                                         while ( len == 0 );
00222 
00223                                         feSource->timeLastDatagramForwarded=curTime;
00224                                 }
00225                         }
00226                 }
00227         }
00228 }
00229 void UDPForwarder::SetMaxForwardEntries(unsigned short maxEntries)
00230 {
00231         RakAssert(maxEntries>0 && maxEntries<65535/2);
00232         maxForwardEntries=maxEntries;
00233 }
00234 int UDPForwarder::GetMaxForwardEntries(void) const
00235 {
00236         return maxForwardEntries;
00237 }
00238 int UDPForwarder::GetUsedForwardEntries(void) const
00239 {
00240         return (int) forwardList.GetSize();
00241 }
00242 unsigned short UDPForwarder::AddForwardingEntry(SrcAndDest srcAndDest, RakNetTimeMS timeoutOnNoDataMS, const char *forceHostAddress)
00243 {
00244         DataStructures::DefaultIndexType insertionIndex;
00245         insertionIndex = forwardList.GetInsertionIndex(srcAndDest);
00246         if (insertionIndex!=(DataStructures::DefaultIndexType)-1)
00247         {
00248                 int sock_opt;
00249                 sockaddr_in listenerSocketAddress;
00250                 listenerSocketAddress.sin_port = 0;
00251                 ForwardEntry *fe = RakNet::OP_NEW<UDPForwarder::ForwardEntry>(__FILE__,__LINE__);
00252                 fe->srcAndDest=srcAndDest;
00253                 fe->timeoutOnNoDataMS=timeoutOnNoDataMS;
00254                 fe->readSocket = socket( AF_INET, SOCK_DGRAM, 0 );
00255                 fe->updatedSourceAddress=false;
00256 
00257                 //printf("Made socket %i\n", fe->readSocket);
00258 
00259                 // This doubles the max throughput rate
00260                 sock_opt=1024*256;
00261                 setsockopt(fe->readSocket, SOL_SOCKET, SO_RCVBUF, ( char * ) & sock_opt, sizeof ( sock_opt ) );
00262 
00263                 // Immediate hard close. Don't linger the readSocket, or recreating the readSocket quickly on Vista fails.
00264                 sock_opt=0;
00265                 setsockopt(fe->readSocket, SOL_SOCKET, SO_LINGER, ( char * ) & sock_opt, sizeof ( sock_opt ) );
00266 
00267                 listenerSocketAddress.sin_family = AF_INET;
00268 
00269                 if (forceHostAddress && forceHostAddress[0])
00270                 {
00271                         listenerSocketAddress.sin_addr.s_addr = inet_addr( forceHostAddress );
00272                 }
00273                 else
00274                 {
00275                         listenerSocketAddress.sin_addr.s_addr = INADDR_ANY;
00276                 }
00277 
00278                 int ret = bind( fe->readSocket, ( struct sockaddr * ) & listenerSocketAddress, sizeof( listenerSocketAddress ) );
00279                 if (ret==-1)
00280                 {
00281                         RakNet::OP_DELETE(fe,__FILE__,__LINE__);
00282                         return 0;
00283                 }
00284 
00285                 DataStructures::DefaultIndexType oldSize = forwardList.GetSize();
00286                 forwardList.InsertAtIndex(fe,insertionIndex,__FILE__,__LINE__);
00287                 RakAssert(forwardList.GetIndexOf(fe->srcAndDest)!=(unsigned int) -1);
00288                 RakAssert(forwardList.GetSize()==oldSize+1);
00289                 return SocketLayer::GetLocalPort ( fe->readSocket );
00290         }
00291         else
00292         {
00293                 // Takeover existing
00294                 DataStructures::DefaultIndexType existingSrcIndex,existingDstIndex;
00295                 SrcAndDest dest;
00296                 ForwardEntry *feSrc, *feDst;
00297                 dest.destination=srcAndDest.source;
00298                 dest.source=srcAndDest.destination;
00299                 existingSrcIndex = forwardList.GetIndexOf(srcAndDest);
00300                 feSrc=forwardList[existingSrcIndex];
00301                 existingDstIndex = forwardList.GetIndexOf(dest);
00302                 feSrc->timeLastDatagramForwarded=RakNet::GetTimeMS();
00303                 if (existingDstIndex==(DataStructures::DefaultIndexType)-1)
00304                 {
00305                         feDst=0;
00306                 }
00307                 else
00308                 {
00309                         // Refresh destination so communication isn't unidirectional
00310                         feDst=forwardList[existingDstIndex];
00311                         feDst->timeLastDatagramForwarded=feSrc->timeLastDatagramForwarded;
00312                 }
00313 
00314 
00315                 return SocketLayer::GetLocalPort ( feSrc->readSocket );
00316         }
00317 }
00318 UDPForwarderResult UDPForwarder::StartForwarding(SystemAddress source, SystemAddress destination, RakNetTimeMS timeoutOnNoDataMS, const char *forceHostAddress,
00319                                                                    unsigned short *srcToDestPort, unsigned short *destToSourcePort, SOCKET *srcToDestSocket, SOCKET *destToSourceSocket)
00320 {
00321         // Invalid parameters?
00322         if (timeoutOnNoDataMS == 0 || timeoutOnNoDataMS > UDP_FORWARDER_MAXIMUM_TIMEOUT || source==UNASSIGNED_SYSTEM_ADDRESS || destination==UNASSIGNED_SYSTEM_ADDRESS)
00323                 return UDPFORWARDER_INVALID_PARAMETERS;
00324 
00325 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00326         ThreadOperation threadOperation;
00327         threadOperation.source=source;
00328         threadOperation.destination=destination;
00329         threadOperation.timeoutOnNoDataMS=timeoutOnNoDataMS;
00330         threadOperation.forceHostAddress=forceHostAddress;
00331         threadOperation.operation=ThreadOperation::TO_START_FORWARDING;
00332         threadOperationIncomingMutex.Lock();
00333         threadOperationIncomingQueue.Push(threadOperation, __FILE__, __LINE__ );
00334         threadOperationIncomingMutex.Unlock();
00335 
00336         while (1)
00337         {
00338                 RakSleep(0);
00339                 threadOperationOutgoingMutex.Lock();
00340                 if (threadOperationOutgoingQueue.Size()!=0)
00341                 {
00342                         threadOperation=threadOperationOutgoingQueue.Pop();
00343                         threadOperationOutgoingMutex.Unlock();
00344                         if (srcToDestPort)
00345                                 *srcToDestPort=threadOperation.srcToDestPort;
00346                         if (destToSourcePort)
00347                                 *destToSourcePort=threadOperation.destToSourcePort;
00348                         if (srcToDestSocket)
00349                                 *srcToDestSocket=threadOperation.srcToDestSocket;
00350                         if (destToSourceSocket)
00351                                 *destToSourceSocket=threadOperation.destToSourceSocket;
00352                         return threadOperation.result;
00353                 }
00354                 threadOperationOutgoingMutex.Unlock();
00355 
00356         }
00357 #else
00358         return StartForwardingThreaded(source, destination, timeoutOnNoDataMS, forceHostAddress, srcToDestPort, destToSourcePort, srcToDestSocket, destToSourceSocket);
00359 #endif
00360 
00361 }
00362 UDPForwarderResult UDPForwarder::StartForwardingThreaded(SystemAddress source, SystemAddress destination, RakNetTimeMS timeoutOnNoDataMS, const char *forceHostAddress,
00363                                                                    unsigned short *srcToDestPort, unsigned short *destToSourcePort, SOCKET *srcToDestSocket, SOCKET *destToSourceSocket)
00364 {
00365         SrcAndDest srcAndDest;
00366         srcAndDest.destination=destination;
00367         srcAndDest.source=source;
00368         SrcAndDest destAndSrc;
00369         destAndSrc.destination=source;
00370         destAndSrc.source=destination;
00371 
00372 // Just takeover existing if this happens
00373 //      if (forwardList.GetIndexOf(srcAndDest) != (DataStructures::DefaultIndexType) -1)
00374 //              return UDPFORWARDER_FORWARDING_ALREADY_EXISTS;
00375 
00376         *srcToDestPort = AddForwardingEntry(srcAndDest, timeoutOnNoDataMS, forceHostAddress);
00377 
00378         if (*srcToDestPort==0)
00379                 return UDPFORWARDER_NO_SOCKETS;
00380 
00381         *destToSourcePort = AddForwardingEntry(destAndSrc, timeoutOnNoDataMS, forceHostAddress);
00382 
00383         if (*destToSourcePort==0)
00384                 return UDPFORWARDER_NO_SOCKETS;
00385 
00386         DataStructures::DefaultIndexType idxSrcAndDest, idxDestAndSrc;
00387         idxSrcAndDest = forwardList.GetIndexOf(srcAndDest);
00388         idxDestAndSrc = forwardList.GetIndexOf(destAndSrc);
00389         forwardList[idxSrcAndDest]->writeSocket=forwardList[idxDestAndSrc]->readSocket;
00390         forwardList[idxDestAndSrc]->writeSocket=forwardList[idxSrcAndDest]->readSocket;
00391         if (*srcToDestSocket)
00392                 *srcToDestSocket=forwardList[idxSrcAndDest]->writeSocket;
00393         if (*destToSourceSocket)
00394                 *destToSourceSocket=forwardList[idxSrcAndDest]->readSocket;
00395 
00396         return UDPFORWARDER_SUCCESS;
00397 }
00398 void UDPForwarder::StopForwarding(SystemAddress source, SystemAddress destination)
00399 {
00400 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00401         ThreadOperation threadOperation;
00402         threadOperation.source=source;
00403         threadOperation.destination=destination;
00404         threadOperation.operation=ThreadOperation::TO_STOP_FORWARDING;
00405         threadOperationIncomingMutex.Lock();
00406         threadOperationIncomingQueue.Push(threadOperation, __FILE__, __LINE__ );
00407         threadOperationIncomingMutex.Unlock();
00408 #else
00409         StopForwardingThreaded(source, destination);
00410 #endif
00411 }
00412 void UDPForwarder::StopForwardingThreaded(SystemAddress source, SystemAddress destination)
00413 {
00414 
00415         SrcAndDest srcAndDest;
00416         srcAndDest.destination=destination;
00417         srcAndDest.source=source;
00418         DataStructures::DefaultIndexType idx = forwardList.GetIndexOf(srcAndDest);
00419         if (idx!=(DataStructures::DefaultIndexType)-1)
00420         {
00421                 RakNet::OP_DELETE(forwardList[idx],__FILE__,__LINE__);
00422                 forwardList.RemoveAtIndex(idx,__FILE__,__LINE__);
00423         }
00424 
00425         srcAndDest.destination=source;
00426         srcAndDest.source=destination;
00427 
00428         idx = forwardList.GetIndexOf(srcAndDest);
00429         if (idx!=(DataStructures::DefaultIndexType)-1)
00430         {
00431                 RakNet::OP_DELETE(forwardList[idx],__FILE__,__LINE__);
00432                 forwardList.RemoveAtIndex(idx,__FILE__,__LINE__);
00433         }
00434 }
00435 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00436 RAK_THREAD_DECLARATION(UpdateUDPForwarder)
00437 {
00438         UDPForwarder * udpForwarder = ( UDPForwarder * ) arguments;
00439         udpForwarder->threadRunning=true;
00440         UDPForwarder::ThreadOperation threadOperation;
00441         while (udpForwarder->isRunning)
00442         {
00443                 udpForwarder->threadOperationIncomingMutex.Lock();
00444                 while (udpForwarder->threadOperationIncomingQueue.Size())
00445                 {
00446                         threadOperation=udpForwarder->threadOperationIncomingQueue.Pop();
00447                         udpForwarder->threadOperationIncomingMutex.Unlock();
00448                         if (threadOperation.operation==UDPForwarder::ThreadOperation::TO_START_FORWARDING)
00449                         {
00450                                 threadOperation.result=udpForwarder->StartForwardingThreaded(threadOperation.source, threadOperation.destination, threadOperation.timeoutOnNoDataMS,
00451                                         threadOperation.forceHostAddress, &threadOperation.srcToDestPort, &threadOperation.destToSourcePort, &threadOperation.srcToDestSocket, &threadOperation.destToSourceSocket);
00452                                 udpForwarder->threadOperationOutgoingMutex.Lock();
00453                                 udpForwarder->threadOperationOutgoingQueue.Push(threadOperation, __FILE__, __LINE__ );
00454                                 udpForwarder->threadOperationOutgoingMutex.Unlock();
00455                         }
00456                         else
00457                         {
00458                                 udpForwarder->StopForwardingThreaded(threadOperation.source, threadOperation.destination);
00459                         }
00460 
00461 
00462                         udpForwarder->threadOperationIncomingMutex.Lock();
00463                 }
00464                 udpForwarder->threadOperationIncomingMutex.Unlock();
00465 
00466                 udpForwarder->UpdateThreaded();
00467                 RakSleep(0);
00468         }
00469         udpForwarder->threadRunning=false;
00470         return 0;
00471 }
00472 #endif

Generated on Thu Sep 30 2010 01:27:29 for RakNet by  doxygen 1.7.1