00001 00002 00003 00004 00005 00006 00007 00008 // No longer used as I no longer support IO Completion ports 00009 /* 00010 00011 #ifdef __USE_IO_COMPLETION_PORTS 00012 00013 #include "AsynchronousFileIO.h" 00014 #include "ClientContextStruct.h" 00015 #include <process.h> 00016 #include "ExtendedOverlappedPool.h" 00017 #include <stdio.h> 00018 #include "RakAssert.h" 00019 00020 // All these are used for the Read callback. For general Asynch file IO you would change these 00021 #include "RakNetTypes.h" 00022 00023 class RakPeer; 00024 00025 #ifdef _WIN32 00026 extern void __stdcall ProcessNetworkPacket( unsigned int binaryAddress, unsigned short port, const char *data, int length, RakPeer *rakPeer ); 00027 #else 00028 extern void ProcessNetworkPacket( unsigned int binaryAddress, unsigned short port, const char *data, int length, RakPeer *rakPeer ); 00029 #endif 00030 00031 AsynchronousFileIO AsynchronousFileIO::I; 00032 00033 AsynchronousFileIO::AsynchronousFileIO() 00034 { 00035 userCount = 0; 00036 threadCount = 0; 00037 completionPort = NULL; 00038 00039 // Determine how many processors are on the system. 00040 GetSystemInfo( &systemInfo ); 00041 } 00042 00043 void AsynchronousFileIO::IncreaseUserCount() 00044 { 00045 userCountMutex.Lock(); 00046 ++userCount; 00047 00048 if ( userCount == 1 ) 00049 { 00050 00051 // Create the completion port that will be used by all the worker 00052 // threads. 00053 completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, systemInfo.dwNumberOfProcessors * 2 ); 00054 00055 if ( completionPort == NULL ) 00056 { 00057 userCount = 0; 00058 userCountMutex.Unlock(); 00059 return ; 00060 } 00061 00062 UINT nThreadID; 00063 HANDLE workerHandle; 00064 00065 // Create worker threads 00066 00067 // One worker thread per processor 00068 00069 for ( DWORD i = 0; i < systemInfo.dwNumberOfProcessors * 2; i++ ) 00070 // In debug just make one worker thread so it's easier to trace 00071 //for ( i = 0; i < systemInfo.dwNumberOfProcessors * 1; i++ ) 00072 { 00073 workerHandle = ( HANDLE ) _beginthreadex( NULL, // Security 00074 0, // Stack size - use default 00075 ThreadPoolFunc, // Thread fn entry point 00076 ( void* ) completionPort, // Param for thread 00077 0, // Init flag 00078 &nThreadID ); // Thread address 00079 00080 00081 // Feel free to comment this out for regular thread priority 00082 SetThreadPriority( workerHandle, THREAD_PRIORITY_HIGHEST ); 00083 00084 CloseHandle( workerHandle ); 00085 } 00086 00087 00088 // Wait for the threads to start 00089 while ( threadCount < systemInfo.dwNumberOfProcessors * 2 ) 00090 Sleep( 0 ); 00091 } 00092 00093 userCountMutex.Unlock(); 00094 } 00095 00096 void AsynchronousFileIO::DecreaseUserCount() 00097 { 00098 userCountMutex.Lock(); 00099 00100 assert( userCount > 0 ); 00101 00102 if ( userCount == 0 ) 00103 return ; 00104 00105 userCount--; 00106 00107 if ( userCount == 0 ) 00108 Shutdown(); 00109 00110 userCountMutex.Unlock(); 00111 } 00112 00113 void AsynchronousFileIO::Shutdown( void ) 00114 { 00115 killThreads = true; 00116 00117 if ( completionPort != NULL ) 00118 for ( DWORD i = 0; i < systemInfo.dwNumberOfProcessors * 2; i++ ) 00119 PostQueuedCompletionStatus( completionPort, 0, 0 , 0 ); 00120 00121 // Kill worker threads 00122 while ( threadCount > 0 ) 00123 Sleep( 0 ); 00124 00125 if ( completionPort != NULL ) 00126 CloseHandle( completionPort ); 00127 } 00128 00129 int AsynchronousFileIO::GetUserCount( void ) 00130 { 00131 return userCount; 00132 } 00133 00134 AsynchronousFileIO::~AsynchronousFileIO() 00135 { 00136 if ( threadCount > 0 ) 00137 Shutdown(); 00138 } 00139 00140 bool AsynchronousFileIO::AssociateSocketWithCompletionPort( SOCKET socket, DWORD dwCompletionKey ) 00141 { 00142 HANDLE h = CreateIoCompletionPort( ( HANDLE ) socket, completionPort, dwCompletionKey, 0 ); 00143 return h == completionPort; 00144 } 00145 00146 BOOL ReadAsynch( HANDLE handle, ExtendedOverlappedStruct *extended ) 00147 { 00148 BOOL success; 00149 extended->read = true; 00150 success = ReadFile( handle, extended->data, extended->length, 0, ( LPOVERLAPPED ) extended ); 00151 00152 if ( !success ) 00153 { 00154 DWORD dwErrCode = GetLastError(); 00155 00156 if ( dwErrCode != ERROR_IO_PENDING ) 00157 { 00158 #if defined(_WIN32) && !defined(_XBOX) && defined(_DEBUG) 00159 LPVOID messageBuffer; 00160 FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 00161 NULL, dwErrCode, MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ), // Default language 00162 ( LPTSTR ) & messageBuffer, 0, NULL ); 00163 // something has gone wrong here... 00164 RAKNET_DEBUG_PRINTF( "ReadFile failed:Error code - %d\n%s", dwErrCode, messageBuffer ); 00165 //Free the buffer. 00166 LocalFree( messageBuffer ); 00167 #endif 00168 00169 return FALSE; 00170 } 00171 } 00172 00173 return TRUE; 00174 } 00175 00176 void WriteAsynch( HANDLE handle, ExtendedOverlappedStruct *extended ) 00177 { 00178 //RAKNET_DEBUG_PRINTF("Beginning asynch write of %i bytes.\n",extended->length); 00179 //for (int i=0; i < extended->length && i < 10; i++) 00180 // RAKNET_DEBUG_PRINTF("%i ", extended->data[i]); 00181 //RAKNET_DEBUG_PRINTF("\n\n"); 00182 BOOL success; 00183 extended->read = false; 00184 success = WriteFile( handle, extended->data, extended->length, 0, ( LPOVERLAPPED ) extended ); 00185 00186 if ( !success ) 00187 { 00188 DWORD dwErrCode = GetLastError(); 00189 00190 if ( dwErrCode != ERROR_IO_PENDING ) 00191 { 00192 #if defined(_WIN32) && !defined(_XBOX) && defined(_DEBUG) 00193 LPVOID messageBuffer; 00194 FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 00195 NULL, dwErrCode, MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ), // Default language 00196 ( LPTSTR ) & messageBuffer, 0, NULL ); 00197 // something has gone wrong here... 00198 RAKNET_DEBUG_PRINTF( "WriteFile failed:Error code - %d\n%s", dwErrCode, messageBuffer ); 00199 00200 //Free the buffer. 00201 LocalFree( messageBuffer ); 00202 #endif 00203 00204 } 00205 } 00206 } 00207 00208 unsigned __stdcall ThreadPoolFunc( LPVOID arguments ) 00209 { 00210 DWORD dwIoSize; 00211 ClientContextStruct* lpClientContext; 00212 ExtendedOverlappedStruct* lpOverlapped; 00213 LPOVERLAPPED temp; 00214 BOOL bError; 00215 00216 HANDLE *completionPort = ( HANDLE * ) arguments; 00217 AsynchronousFileIO::Instance()->threadCount++; 00218 00219 while ( 1 ) 00220 { 00221 // Get a completed IO request. 00222 BOOL returnValue = GetQueuedCompletionStatus( 00223 completionPort, 00224 &dwIoSize, 00225 ( LPDWORD ) & lpClientContext, 00226 &temp, INFINITE ); 00227 00228 lpOverlapped = ( ExtendedOverlappedStruct* ) temp; 00229 00230 DWORD dwIOError = GetLastError(); 00231 00232 if ( lpOverlapped == 0 ) 00233 break; // Cancelled thread 00234 00235 if ( !returnValue && dwIOError != WAIT_TIMEOUT ) 00236 { 00237 if ( dwIOError != ERROR_OPERATION_ABORTED ) 00238 { 00239 // Print all but this very common error message 00240 #if defined(_WIN32) && !defined(_XBOX) && defined(_DEBUG) 00241 LPVOID messageBuffer; 00242 FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 00243 NULL, dwIOError, MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ), // Default language 00244 ( LPTSTR ) & messageBuffer, 0, NULL ); 00245 // something has gone wrong here... 00246 RAKNET_DEBUG_PRINTF( "GetQueuedCompletionStatus failed:Error code - %d\n%s", dwIOError, messageBuffer ); 00247 00248 //Free the buffer. 00249 LocalFree( messageBuffer ); 00250 #endif 00251 00252 } 00253 00254 HANDLE_ERROR: 00255 // Some kind of error. Erase the data for this call 00256 bError = true; 00257 00258 // This socket is no longer used 00259 00260 if ( lpOverlapped ) 00261 RakNet::OP_DELETE(lpOverlapped, __FILE__, __LINE__); 00262 00263 if ( lpClientContext ) 00264 RakNet::OP_DELETE(lpClientContext, __FILE__, __LINE__); 00265 00266 // If we are killing the threads, then we keep posting fake completion statuses until we get a fake one through the queue (i.e. lpOverlapped==0 as above) 00267 // This way we delete all the data from the real calls before exiting the thread 00268 if ( AsynchronousFileIO::Instance()->killThreads ) 00269 { 00270 PostQueuedCompletionStatus( completionPort, 0, 0, 0 ); 00271 } 00272 00273 } 00274 00275 else 00276 bError = false; 00277 00278 if ( !bError ) 00279 { 00280 if ( returnValue && NULL != lpOverlapped && NULL != lpClientContext ) 00281 { 00282 if ( lpOverlapped->read == true ) 00283 { 00284 assert( dwIoSize > 0 ); 00285 00286 ProcessNetworkPacket( lpOverlapped->binaryAddress, lpOverlapped->port, lpOverlapped->data, dwIoSize, lpOverlapped->rakPeer ); 00287 00288 // Issue a new read so we always have one outstanding read per socket 00289 // Finished a read. Reuse the overlapped pointer 00290 bError = ReadAsynch( lpClientContext->handle, lpOverlapped ); 00291 00292 if ( !bError ) 00293 goto HANDLE_ERROR; // Windows is super unreliable! 00294 } 00295 00296 else 00297 { 00298 // AsynchronousFileIO::Instance()->Write(lpClientContext); 00299 // Finished a write 00300 ExtendedOverlappedPool::Instance()->ReleasePointer( lpOverlapped ); 00301 } 00302 } 00303 00304 else 00305 assert( 0 ); 00306 } 00307 } 00308 00309 AsynchronousFileIO::Instance()->threadCount--; 00310 return 0; 00311 } 00312 00313 #endif 00314 */