00001 #include "UDPForwarder.h"
00002 #include "GetTime.h"
00003 #include "MTUSize.h"
00004 #include "SocketLayer.h"
00005 #include "WSAStartupSingleton.h"
00006 #include "RakSleep.h"
00007 #include "DS_OrderedList.h"
00008
00009 using namespace RakNet;
00010 static const unsigned short DEFAULT_MAX_FORWARD_ENTRIES=64;
00011
00012 RAK_THREAD_DECLARATION(UpdateUDPForwarder);
00013
00014 bool operator<( const DataStructures::MLKeyRef<UDPForwarder::SrcAndDest> &inputKey, const UDPForwarder::ForwardEntry *cls )
00015 {
00016 return inputKey.Get().source < cls->srcAndDest.source ||
00017 (inputKey.Get().source == cls->srcAndDest.source && inputKey.Get().destination < cls->srcAndDest.destination);
00018 }
00019 bool operator>( const DataStructures::MLKeyRef<UDPForwarder::SrcAndDest> &inputKey, const UDPForwarder::ForwardEntry *cls )
00020 {
00021 return inputKey.Get().source > cls->srcAndDest.source ||
00022 (inputKey.Get().source == cls->srcAndDest.source && inputKey.Get().destination > cls->srcAndDest.destination);
00023 }
00024 bool operator==( const DataStructures::MLKeyRef<UDPForwarder::SrcAndDest> &inputKey, const UDPForwarder::ForwardEntry *cls )
00025 {
00026 return inputKey.Get().source == cls->srcAndDest.source && inputKey.Get().destination == cls->srcAndDest.destination;
00027 }
00028
00029
00030 UDPForwarder::ForwardEntry::ForwardEntry() {readSocket=INVALID_SOCKET; timeLastDatagramForwarded=RakNet::GetTimeMS();}
00031 UDPForwarder::ForwardEntry::~ForwardEntry() {
00032 if (readSocket!=INVALID_SOCKET)
00033 closesocket(readSocket);
00034 }
00035
00036 UDPForwarder::UDPForwarder()
00037 {
00038 #ifdef _WIN32
00039 WSAStartupSingleton::AddRef();
00040 #endif
00041
00042 maxForwardEntries=DEFAULT_MAX_FORWARD_ENTRIES;
00043 isRunning=false;
00044 threadRunning=false;
00045 }
00046 UDPForwarder::~UDPForwarder()
00047 {
00048 Shutdown();
00049
00050 #ifdef _WIN32
00051 WSAStartupSingleton::Deref();
00052 #endif
00053 }
00054 void UDPForwarder::Startup(void)
00055 {
00056 if (isRunning==true)
00057 return;
00058
00059 isRunning=true;
00060 threadRunning=false;
00061
00062 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00063 int errorCode = RakNet::RakThread::Create(UpdateUDPForwarder, this);
00064 if ( errorCode != 0 )
00065 {
00066 RakAssert(0);
00067 return;
00068 }
00069
00070 while (threadRunning==false)
00071 RakSleep(30);
00072 #endif
00073 }
00074 void UDPForwarder::Shutdown(void)
00075 {
00076 if (isRunning==false)
00077 return;
00078
00079 isRunning=false;
00080
00081 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00082 while (threadRunning==true)
00083 RakSleep(30);
00084 #endif
00085
00086 forwardList.ClearPointers(true,__FILE__,__LINE__);
00087 }
00088 void UDPForwarder::Update(void)
00089 {
00090 #ifndef UDP_FORWARDER_EXECUTE_THREADED
00091 UpdateThreaded();
00092 #endif
00093
00094 }
00095 void UDPForwarder::UpdateThreaded(void)
00096 {
00097 fd_set readFD;
00098
00099 FD_ZERO(&readFD);
00100
00101 timeval tv;
00102 int selectResult;
00103 tv.tv_sec=0;
00104 tv.tv_usec=0;
00105
00106 RakNetTimeMS curTime = RakNet::GetTimeMS();
00107
00108 SOCKET largestDescriptor=0;
00109 DataStructures::DefaultIndexType i;
00110
00111
00112 i=0;
00113 while (i < forwardList.GetSize())
00114 {
00115 if (curTime > forwardList[i]->timeLastDatagramForwarded &&
00116 curTime > forwardList[i]->timeLastDatagramForwarded+forwardList[i]->timeoutOnNoDataMS)
00117 {
00118 RakNet::OP_DELETE(forwardList[i],__FILE__,__LINE__);
00119 forwardList.RemoveAtIndex(i,__FILE__,__LINE__);
00120 }
00121 else
00122 i++;
00123 }
00124
00125 if (forwardList.GetSize()==0)
00126 return;
00127
00128 for (i=0; i < forwardList.GetSize(); i++)
00129 {
00130 #ifdef _MSC_VER
00131 #pragma warning( disable : 4127 ) // warning C4127: conditional expression is constant
00132 #endif
00133 FD_SET(forwardList[i]->readSocket, &readFD);
00134
00135
00136 if (forwardList[i]->readSocket > largestDescriptor)
00137 largestDescriptor = forwardList[i]->readSocket;
00138 }
00139
00140 #if defined(_PS3) || defined(__PS3__) || defined(SN_TARGET_PS3)
00141
00142 #else
00143 selectResult=(int) select((int) largestDescriptor+1, &readFD, 0, 0, &tv);
00144 #endif
00145
00146 char data[ MAXIMUM_MTU_SIZE ];
00147 sockaddr_in sa;
00148 socklen_t len2;
00149
00150 if (selectResult > 0)
00151 {
00152 DataStructures::Queue<ForwardEntry*> entriesToRead;
00153 ForwardEntry *feSource;
00154
00155 for (i=0; i < forwardList.GetSize(); i++)
00156 {
00157 feSource = forwardList[i];
00158
00159 if (FD_ISSET(feSource->readSocket, &readFD))
00160 entriesToRead.Push(feSource,__FILE__,__LINE__);
00161 }
00162
00163 while (entriesToRead.IsEmpty()==false)
00164 {
00165 feSource=entriesToRead.Pop();
00166
00167 const int flag=0;
00168 int receivedDataLen, len=0;
00169 unsigned short portnum=0;
00170 len2 = sizeof( sa );
00171 sa.sin_family = AF_INET;
00172 receivedDataLen = recvfrom( feSource->readSocket, data, MAXIMUM_MTU_SIZE, flag, ( sockaddr* ) & sa, ( socklen_t* ) & len2 );
00173 portnum = ntohs( sa.sin_port );
00174
00175 if (feSource->srcAndDest.source.binaryAddress==sa.sin_addr.s_addr)
00176 {
00177 if (feSource->updatedSourceAddress==false)
00178 {
00179 feSource->updatedSourceAddress=true;
00180
00181 if (feSource->srcAndDest.source.port!=portnum)
00182 {
00183
00184 DataStructures::DefaultIndexType sourceIndex, destIndex;
00185 SrcAndDest srcAndDest;
00186 srcAndDest.source=feSource->srcAndDest.destination;
00187 srcAndDest.destination=feSource->srcAndDest.source;
00188 destIndex=forwardList.GetIndexOf(srcAndDest);
00189 ForwardEntry *feDest = forwardList[destIndex];
00190
00191 forwardList.RemoveAtIndex(destIndex,__FILE__,__LINE__);
00192 srcAndDest.source=feSource->srcAndDest.source;
00193 srcAndDest.destination=feSource->srcAndDest.destination;
00194 sourceIndex=forwardList.GetIndexOf(srcAndDest);
00195 forwardList.RemoveAtIndex(sourceIndex,__FILE__,__LINE__);
00196
00197 feSource->srcAndDest.source.port=portnum;
00198 feDest->srcAndDest.destination.port=portnum;
00199
00200 feSource->timeLastDatagramForwarded=curTime;
00201 feDest->timeLastDatagramForwarded=curTime;
00202
00203 forwardList.Push(feSource,feSource->srcAndDest,__FILE__,__LINE__);
00204 forwardList.Push(feDest,feDest->srcAndDest,__FILE__,__LINE__);
00205
00206 }
00207 }
00208
00209 if (feSource->srcAndDest.source.port==portnum)
00210 {
00211
00212 len=0;
00213 sockaddr_in saOut;
00214 saOut.sin_port = htons( feSource->srcAndDest.destination.port );
00215 saOut.sin_addr.s_addr = feSource->srcAndDest.destination.binaryAddress;
00216 saOut.sin_family = AF_INET;
00217 do
00218 {
00219 len = sendto( feSource->writeSocket, data, receivedDataLen, 0, ( const sockaddr* ) & saOut, sizeof( saOut ) );
00220 }
00221 while ( len == 0 );
00222
00223 feSource->timeLastDatagramForwarded=curTime;
00224 }
00225 }
00226 }
00227 }
00228 }
00229 void UDPForwarder::SetMaxForwardEntries(unsigned short maxEntries)
00230 {
00231 RakAssert(maxEntries>0 && maxEntries<65535/2);
00232 maxForwardEntries=maxEntries;
00233 }
00234 int UDPForwarder::GetMaxForwardEntries(void) const
00235 {
00236 return maxForwardEntries;
00237 }
00238 int UDPForwarder::GetUsedForwardEntries(void) const
00239 {
00240 return (int) forwardList.GetSize();
00241 }
00242 unsigned short UDPForwarder::AddForwardingEntry(SrcAndDest srcAndDest, RakNetTimeMS timeoutOnNoDataMS, const char *forceHostAddress)
00243 {
00244 DataStructures::DefaultIndexType insertionIndex;
00245 insertionIndex = forwardList.GetInsertionIndex(srcAndDest);
00246 if (insertionIndex!=(DataStructures::DefaultIndexType)-1)
00247 {
00248 int sock_opt;
00249 sockaddr_in listenerSocketAddress;
00250 listenerSocketAddress.sin_port = 0;
00251 ForwardEntry *fe = RakNet::OP_NEW<UDPForwarder::ForwardEntry>(__FILE__,__LINE__);
00252 fe->srcAndDest=srcAndDest;
00253 fe->timeoutOnNoDataMS=timeoutOnNoDataMS;
00254 fe->readSocket = socket( AF_INET, SOCK_DGRAM, 0 );
00255 fe->updatedSourceAddress=false;
00256
00257
00258
00259
00260 sock_opt=1024*256;
00261 setsockopt(fe->readSocket, SOL_SOCKET, SO_RCVBUF, ( char * ) & sock_opt, sizeof ( sock_opt ) );
00262
00263
00264 sock_opt=0;
00265 setsockopt(fe->readSocket, SOL_SOCKET, SO_LINGER, ( char * ) & sock_opt, sizeof ( sock_opt ) );
00266
00267 listenerSocketAddress.sin_family = AF_INET;
00268
00269 if (forceHostAddress && forceHostAddress[0])
00270 {
00271 listenerSocketAddress.sin_addr.s_addr = inet_addr( forceHostAddress );
00272 }
00273 else
00274 {
00275 listenerSocketAddress.sin_addr.s_addr = INADDR_ANY;
00276 }
00277
00278 int ret = bind( fe->readSocket, ( struct sockaddr * ) & listenerSocketAddress, sizeof( listenerSocketAddress ) );
00279 if (ret==-1)
00280 {
00281 RakNet::OP_DELETE(fe,__FILE__,__LINE__);
00282 return 0;
00283 }
00284
00285 DataStructures::DefaultIndexType oldSize = forwardList.GetSize();
00286 forwardList.InsertAtIndex(fe,insertionIndex,__FILE__,__LINE__);
00287 RakAssert(forwardList.GetIndexOf(fe->srcAndDest)!=(unsigned int) -1);
00288 RakAssert(forwardList.GetSize()==oldSize+1);
00289 return SocketLayer::GetLocalPort ( fe->readSocket );
00290 }
00291 else
00292 {
00293
00294 DataStructures::DefaultIndexType existingSrcIndex,existingDstIndex;
00295 SrcAndDest dest;
00296 ForwardEntry *feSrc, *feDst;
00297 dest.destination=srcAndDest.source;
00298 dest.source=srcAndDest.destination;
00299 existingSrcIndex = forwardList.GetIndexOf(srcAndDest);
00300 feSrc=forwardList[existingSrcIndex];
00301 existingDstIndex = forwardList.GetIndexOf(dest);
00302 feSrc->timeLastDatagramForwarded=RakNet::GetTimeMS();
00303 if (existingDstIndex==(DataStructures::DefaultIndexType)-1)
00304 {
00305 feDst=0;
00306 }
00307 else
00308 {
00309
00310 feDst=forwardList[existingDstIndex];
00311 feDst->timeLastDatagramForwarded=feSrc->timeLastDatagramForwarded;
00312 }
00313
00314
00315 return SocketLayer::GetLocalPort ( feSrc->readSocket );
00316 }
00317 }
00318 UDPForwarderResult UDPForwarder::StartForwarding(SystemAddress source, SystemAddress destination, RakNetTimeMS timeoutOnNoDataMS, const char *forceHostAddress,
00319 unsigned short *srcToDestPort, unsigned short *destToSourcePort, SOCKET *srcToDestSocket, SOCKET *destToSourceSocket)
00320 {
00321
00322 if (timeoutOnNoDataMS == 0 || timeoutOnNoDataMS > UDP_FORWARDER_MAXIMUM_TIMEOUT || source==UNASSIGNED_SYSTEM_ADDRESS || destination==UNASSIGNED_SYSTEM_ADDRESS)
00323 return UDPFORWARDER_INVALID_PARAMETERS;
00324
00325 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00326 ThreadOperation threadOperation;
00327 threadOperation.source=source;
00328 threadOperation.destination=destination;
00329 threadOperation.timeoutOnNoDataMS=timeoutOnNoDataMS;
00330 threadOperation.forceHostAddress=forceHostAddress;
00331 threadOperation.operation=ThreadOperation::TO_START_FORWARDING;
00332 threadOperationIncomingMutex.Lock();
00333 threadOperationIncomingQueue.Push(threadOperation, __FILE__, __LINE__ );
00334 threadOperationIncomingMutex.Unlock();
00335
00336 while (1)
00337 {
00338 RakSleep(0);
00339 threadOperationOutgoingMutex.Lock();
00340 if (threadOperationOutgoingQueue.Size()!=0)
00341 {
00342 threadOperation=threadOperationOutgoingQueue.Pop();
00343 threadOperationOutgoingMutex.Unlock();
00344 if (srcToDestPort)
00345 *srcToDestPort=threadOperation.srcToDestPort;
00346 if (destToSourcePort)
00347 *destToSourcePort=threadOperation.destToSourcePort;
00348 if (srcToDestSocket)
00349 *srcToDestSocket=threadOperation.srcToDestSocket;
00350 if (destToSourceSocket)
00351 *destToSourceSocket=threadOperation.destToSourceSocket;
00352 return threadOperation.result;
00353 }
00354 threadOperationOutgoingMutex.Unlock();
00355
00356 }
00357 #else
00358 return StartForwardingThreaded(source, destination, timeoutOnNoDataMS, forceHostAddress, srcToDestPort, destToSourcePort, srcToDestSocket, destToSourceSocket);
00359 #endif
00360
00361 }
00362 UDPForwarderResult UDPForwarder::StartForwardingThreaded(SystemAddress source, SystemAddress destination, RakNetTimeMS timeoutOnNoDataMS, const char *forceHostAddress,
00363 unsigned short *srcToDestPort, unsigned short *destToSourcePort, SOCKET *srcToDestSocket, SOCKET *destToSourceSocket)
00364 {
00365 SrcAndDest srcAndDest;
00366 srcAndDest.destination=destination;
00367 srcAndDest.source=source;
00368 SrcAndDest destAndSrc;
00369 destAndSrc.destination=source;
00370 destAndSrc.source=destination;
00371
00372
00373
00374
00375
00376 *srcToDestPort = AddForwardingEntry(srcAndDest, timeoutOnNoDataMS, forceHostAddress);
00377
00378 if (*srcToDestPort==0)
00379 return UDPFORWARDER_NO_SOCKETS;
00380
00381 *destToSourcePort = AddForwardingEntry(destAndSrc, timeoutOnNoDataMS, forceHostAddress);
00382
00383 if (*destToSourcePort==0)
00384 return UDPFORWARDER_NO_SOCKETS;
00385
00386 DataStructures::DefaultIndexType idxSrcAndDest, idxDestAndSrc;
00387 idxSrcAndDest = forwardList.GetIndexOf(srcAndDest);
00388 idxDestAndSrc = forwardList.GetIndexOf(destAndSrc);
00389 forwardList[idxSrcAndDest]->writeSocket=forwardList[idxDestAndSrc]->readSocket;
00390 forwardList[idxDestAndSrc]->writeSocket=forwardList[idxSrcAndDest]->readSocket;
00391 if (*srcToDestSocket)
00392 *srcToDestSocket=forwardList[idxSrcAndDest]->writeSocket;
00393 if (*destToSourceSocket)
00394 *destToSourceSocket=forwardList[idxSrcAndDest]->readSocket;
00395
00396 return UDPFORWARDER_SUCCESS;
00397 }
00398 void UDPForwarder::StopForwarding(SystemAddress source, SystemAddress destination)
00399 {
00400 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00401 ThreadOperation threadOperation;
00402 threadOperation.source=source;
00403 threadOperation.destination=destination;
00404 threadOperation.operation=ThreadOperation::TO_STOP_FORWARDING;
00405 threadOperationIncomingMutex.Lock();
00406 threadOperationIncomingQueue.Push(threadOperation, __FILE__, __LINE__ );
00407 threadOperationIncomingMutex.Unlock();
00408 #else
00409 StopForwardingThreaded(source, destination);
00410 #endif
00411 }
00412 void UDPForwarder::StopForwardingThreaded(SystemAddress source, SystemAddress destination)
00413 {
00414
00415 SrcAndDest srcAndDest;
00416 srcAndDest.destination=destination;
00417 srcAndDest.source=source;
00418 DataStructures::DefaultIndexType idx = forwardList.GetIndexOf(srcAndDest);
00419 if (idx!=(DataStructures::DefaultIndexType)-1)
00420 {
00421 RakNet::OP_DELETE(forwardList[idx],__FILE__,__LINE__);
00422 forwardList.RemoveAtIndex(idx,__FILE__,__LINE__);
00423 }
00424
00425 srcAndDest.destination=source;
00426 srcAndDest.source=destination;
00427
00428 idx = forwardList.GetIndexOf(srcAndDest);
00429 if (idx!=(DataStructures::DefaultIndexType)-1)
00430 {
00431 RakNet::OP_DELETE(forwardList[idx],__FILE__,__LINE__);
00432 forwardList.RemoveAtIndex(idx,__FILE__,__LINE__);
00433 }
00434 }
00435 #ifdef UDP_FORWARDER_EXECUTE_THREADED
00436 RAK_THREAD_DECLARATION(UpdateUDPForwarder)
00437 {
00438 UDPForwarder * udpForwarder = ( UDPForwarder * ) arguments;
00439 udpForwarder->threadRunning=true;
00440 UDPForwarder::ThreadOperation threadOperation;
00441 while (udpForwarder->isRunning)
00442 {
00443 udpForwarder->threadOperationIncomingMutex.Lock();
00444 while (udpForwarder->threadOperationIncomingQueue.Size())
00445 {
00446 threadOperation=udpForwarder->threadOperationIncomingQueue.Pop();
00447 udpForwarder->threadOperationIncomingMutex.Unlock();
00448 if (threadOperation.operation==UDPForwarder::ThreadOperation::TO_START_FORWARDING)
00449 {
00450 threadOperation.result=udpForwarder->StartForwardingThreaded(threadOperation.source, threadOperation.destination, threadOperation.timeoutOnNoDataMS,
00451 threadOperation.forceHostAddress, &threadOperation.srcToDestPort, &threadOperation.destToSourcePort, &threadOperation.srcToDestSocket, &threadOperation.destToSourceSocket);
00452 udpForwarder->threadOperationOutgoingMutex.Lock();
00453 udpForwarder->threadOperationOutgoingQueue.Push(threadOperation, __FILE__, __LINE__ );
00454 udpForwarder->threadOperationOutgoingMutex.Unlock();
00455 }
00456 else
00457 {
00458 udpForwarder->StopForwardingThreaded(threadOperation.source, threadOperation.destination);
00459 }
00460
00461
00462 udpForwarder->threadOperationIncomingMutex.Lock();
00463 }
00464 udpForwarder->threadOperationIncomingMutex.Unlock();
00465
00466 udpForwarder->UpdateThreaded();
00467 RakSleep(0);
00468 }
00469 udpForwarder->threadRunning=false;
00470 return 0;
00471 }
00472 #endif