From 120fd5a19b27990cfeec125102fd7787fe5d0282 Mon Sep 17 00:00:00 2001 From: Robert MacGregor Date: Wed, 3 Sep 2014 12:57:25 -0400 Subject: [PATCH] Non-blocking I/O --- .../TSExtension/include/DXAPI/SimObject.h | 2 +- .../TSExtension/source/DXAPI/SimObject.cpp | 2 +- .../TSExtension/source/DXTCPObjects.cpp | 247 ++++++++++++------ 3 files changed, 169 insertions(+), 82 deletions(-) diff --git a/Mod Sources/TSExtension/TSExtension/include/DXAPI/SimObject.h b/Mod Sources/TSExtension/TSExtension/include/DXAPI/SimObject.h index cbaa611..9f1a02b 100644 --- a/Mod Sources/TSExtension/TSExtension/include/DXAPI/SimObject.h +++ b/Mod Sources/TSExtension/TSExtension/include/DXAPI/SimObject.h @@ -8,7 +8,7 @@ namespace DX SimObject(unsigned int obj); void deleteObject(void); - const char *TSCall(const char *name, unsigned int argc, ...); + const char *CallMethod(const char *name, unsigned int argc, ...); const unsigned int &identifier; const unsigned int base_pointer_value; diff --git a/Mod Sources/TSExtension/TSExtension/source/DXAPI/SimObject.cpp b/Mod Sources/TSExtension/TSExtension/source/DXAPI/SimObject.cpp index a9e9c7f..89c54f8 100644 --- a/Mod Sources/TSExtension/TSExtension/source/DXAPI/SimObject.cpp +++ b/Mod Sources/TSExtension/TSExtension/source/DXAPI/SimObject.cpp @@ -27,7 +27,7 @@ namespace DX } } - const char *SimObject::TSCall(const char *name, unsigned int argc, ...) + const char *SimObject::CallMethod(const char *name, unsigned int argc, ...) { char **argv = (char**)malloc(sizeof(char*) * (2 + argc)); argv[0]= (char*)name; diff --git a/Mod Sources/TSExtension/TSExtension/source/DXTCPObjects.cpp b/Mod Sources/TSExtension/TSExtension/source/DXTCPObjects.cpp index 2d49eb6..36ea35e 100644 --- a/Mod Sources/TSExtension/TSExtension/source/DXTCPObjects.cpp +++ b/Mod Sources/TSExtension/TSExtension/source/DXTCPObjects.cpp @@ -11,6 +11,32 @@ static unsigned int TSEXTENSION_RUNNINGTCPOBJECTCOUNT = 0; static DX::TCPObject *TSEXTENSION_RUNNINGTCPOBJECTS[TCPOBJECT_MAXCOUNT]; +// Since wants TS function call arguments to be of type char*, we use this +// helper function to painlessly pass in unsigned int arguments for things +// such as the return value from WSAGetLastError(). +__forceinline static char *S32ToCharPtr(unsigned int in) +{ + char out[256]; + memset(out, NULL, 256); + sprintf_s<256>(out, "%u", in); + return out; +} + +// Also a helper function to return the status of a socket +static bool DXTCP_GetSocketStatus(SOCKET sock) +{ + fd_set sockets; + sockets.fd_array[0] = sock; + sockets.fd_count = 1; + + // We don't want to do any waiting at all + timeval wait_time; + wait_time.tv_sec = 0; + wait_time.tv_usec = 0; + + return select(sock, &sockets, &sockets, NULL, &wait_time) != SOCKET_ERROR; +} + inline DX::TCPObject *TCPObject_Find(unsigned int identifier) { // Make sure it's in our list of objects @@ -29,8 +55,8 @@ typedef struct unsigned int buffer_length; char *buffer; - unsigned int send_queue_count; - char *send_queue[TCPOBJECT_SENDQUEUELENGTH]; + unsigned int message_count; + char *message_queue[TCPOBJECT_SENDQUEUELENGTH]; bool is_connected; @@ -63,7 +89,7 @@ inline bool TCPObject_Disconnect(unsigned int identifier) TSEXTENSION_RUNNINGTCPOBJECTS[iteration] = TSEXTENSION_RUNNINGTCPOBJECTS[iteration + 1]; TSEXTENSION_RUNNINGTCPOBJECTCOUNT--; - obj->TSCall("onDisconnect", 0); + obj->CallMethod("onDisconnect", 0); return true; } @@ -88,6 +114,7 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar connection->buffer = 0x00; connection->buffer_length = 0; connection->is_connected = false; + connection->message_count = 0; connection->socket = 0; // Hack: Store the Ptr to our connection information struct in the old unused state value @@ -97,11 +124,8 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar char *target_hostname = strlwr(connection->target_hostname); // Is it an IP we got? - bool needs_dns_translation = false; if (strstr(target_hostname, "ip:")) target_hostname += 3; // Chop off the 'ip:' segment - else - needs_dns_translation = true; // Did we get a port #? unsigned int desired_port = 0; @@ -115,32 +139,28 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar else { Con::errorf(0, "No Port"); - operand->TSCall("onConnectFailed", 0); + operand->CallMethod("onConnectFailed", 1, S32ToCharPtr(1)); return "NO_PORT"; } - // Perform a DNS Lookup if we need to - if (needs_dns_translation) + // Perform a DNS Lookup + wchar_t hostname_dns[128]; + std::mbstowcs(hostname_dns, target_hostname, strlen(target_hostname) + 1); + + PDNS_RECORD dns_record; + if (DnsQuery(hostname_dns, DNS_TYPE_A, DNS_QUERY_BYPASS_CACHE, NULL, &dns_record, NULL)) { - wchar_t hostname_dns[128]; - std::mbstowcs(hostname_dns, target_hostname, strlen(target_hostname) + 1); - - PDNS_RECORD dns_record; - if (DnsQuery(hostname_dns, DNS_TYPE_A, DNS_QUERY_BYPASS_CACHE, NULL, &dns_record, NULL)) - { - Con::errorf(0, "DNS Resolution Failed"); - operand->TSCall("onDNSFailed", 0); - return "FAILED_DNS"; - } - IN_ADDR result_address; - result_address.S_un.S_addr = dns_record->Data.A.IpAddress; - - // Free the DNS List - DNS_FREE_TYPE freetype; - DnsRecordListFree(dns_record, freetype); - - target_hostname = inet_ntoa(result_address); + Con::errorf(0, "DNS Resolution Failed"); + operand->CallMethod("onDNSFailed", 0); + return "FAILED_DNS"; } + IN_ADDR result_address; + result_address.S_un.S_addr = dns_record->Data.A.IpAddress; + + DNS_FREE_TYPE freetype; + DnsRecordListFree(dns_record, freetype); + + target_hostname = inet_ntoa(result_address); SOCKADDR_IN target_host; target_host.sin_family = AF_INET; @@ -154,36 +174,33 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar if (connection->socket == INVALID_SOCKET) { Con::errorf(0, "Failed to create Socket!"); - operand->TSCall("onSocketCreationFailed", 0); + operand->CallMethod("onConnectFailed", 2, S32ToCharPtr(2), S32ToCharPtr(WSAGetLastError())); return "FAILED_SOCKET_CREATION"; } + // Set Blocking Mode (a non-zero for imode = non-blocking) + u_long imode = 1; + ioctlsocket(connection->socket, FIONBIO, &imode); + // Stick us in the TCPObject array TSEXTENSION_RUNNINGTCPOBJECTS[TSEXTENSION_RUNNINGTCPOBJECTCOUNT] = operand; TSEXTENSION_RUNNINGTCPOBJECTCOUNT++; // Attempt the Connection - if(connect(connection->socket, (SOCKADDR*)&target_host, sizeof(target_host)) == SOCKET_ERROR) + connect(connection->socket, (SOCKADDR*)&target_host, sizeof(target_host)); + if (DXTCP_GetSocketStatus(connection->socket) == SOCKET_ERROR) { - Con::errorf(0, "Failed to connect!"); - operand->TSCall("onConnectFailed", 0); + operand->CallMethod("onConnectFailed", 2, S32ToCharPtr(3), S32ToCharPtr(WSAGetLastError())); return "CANNOT_CONNECT"; } else { - // Set Blocking Mode - u_long imode = 1; - ioctlsocket(connection->socket, FIONBIO, &imode); - - // Connected connection->is_connected = true; - - // Notify TS - operand->TSCall("onConnected", 0); + operand->CallMethod("onConnected", 0); + return "SUCCESS"; } - - return "unknown_error"; + return "UNKNOWN_ERROR"; } bool conTCPObjectSend(Linker::SimObject *obj, S32 argc, const char *argv[]) @@ -193,18 +210,24 @@ bool conTCPObjectSend(Linker::SimObject *obj, S32 argc, const char *argv[]) if (!TCPObject_Find(atoi(argv[1]))) return false; - Con::errorf(0, "Queued Data"); DX::TCPObject operand((unsigned int)obj); ConnectionInformation *connection = (ConnectionInformation*)operand.state; + // Since we can be attempting to send data before we're connected, we'll just queue + // the data here and send it all in our update function next call if (!connection->is_connected) Con::errorf(0, "Attempted to send before connected."); - if (send(connection->socket, argv[2], strlen(argv[2]), 0) == SOCKET_ERROR) - { - Con::errorf(0, "Unable to send data!"); - } + // Tribes 2 probably deallocates the memory associated with the arguments at some point + // so we'll copy the send payload into an independent chunk of memory + char *send_payload = new char[strlen(argv[2]) + 1]; + memset(send_payload, 0x00, strlen(argv[2]) + 1); + memcpy(send_payload, argv[2], strlen(argv[2]) + 1); + connection->message_queue[connection->message_count] = send_payload; + connection->message_count++; + + Con::errorf(0,"Queued data: %s", argv[2]); return true; } @@ -216,23 +239,71 @@ bool conTCPObjectDisconnect(Linker::SimObject *obj, S32 argc, const char *argv[] bool conTSExtensionUpdate(Linker::SimObject *obj, S32 argc, const char *argv[]) { // Iterate through any active sockets - static char *character_buffer = (char*)malloc(TCPOBJECT_BUFFERSIZE); + static char *incoming_buffer = new char[TCPOBJECT_BUFFERSIZE]; // List of objects to D/C - //TCPObject_Disconnect unsigned int disconnected_object_count = 0; static DX::TCPObject **disconnected_objects = (DX::TCPObject**)malloc(sizeof(DX::TCPObject*) * TCPOBJECT_MAXCOUNT); for (unsigned int iteration = 0; iteration < TSEXTENSION_RUNNINGTCPOBJECTCOUNT; iteration++) { + // Zero out the incoming buffer per iteration + memset(incoming_buffer, 0x00, TCPOBJECT_BUFFERSIZE); + DX::TCPObject *current_connection = TSEXTENSION_RUNNINGTCPOBJECTS[iteration]; ConnectionInformation *connection_information = (ConnectionInformation*)current_connection->state; - unsigned int data_length = recv(connection_information->socket, character_buffer, TCPOBJECT_BUFFERSIZE, 0); + // FIXME: ::onConnect is never called if is where we finally realize we're connected + // Check if we're ready to be performing network operations + if (DXTCP_GetSocketStatus(connection_information->socket)) + connection_information->is_connected = true; + else + { + Con::errorf(0,"Socket status error!"); + disconnected_objects[disconnected_object_count] = current_connection; + disconnected_object_count++; + break; + } + + // Process the send queue first + bool connection_is_ready = true; + if (connection_information->is_connected && connection_information->message_count != 0) + for (unsigned int queue_iteration = 0; queue_iteration < connection_information->message_count; queue_iteration++) + if (send(connection_information->socket, connection_information->message_queue[queue_iteration], strlen(connection_information->message_queue[queue_iteration]), 0) == SOCKET_ERROR) + { + int wsa_error = WSAGetLastError(); + // We're not ready yet, just break and we should eventually be ready + if (wsa_error == WSAEWOULDBLOCK) + { + connection_is_ready = false; + break; + } + + connection_information->is_connected = false; + disconnected_objects[disconnected_object_count] = current_connection; + disconnected_object_count++; + + Con::errorf(0,"Got a send error! SimID: %u - Error %u", current_connection->identifier, wsa_error); + break; + } + else + delete[] connection_information->message_queue[queue_iteration]; + + // We can break if the connection was never made yet or if there was an error processing the message queue + if (!connection_information->is_connected || !connection_is_ready) + break; + + // FIXME: Under send() error conditions we can't deallocate all of the associated memory + connection_information->message_count = 0; + unsigned int data_length = recv(connection_information->socket, incoming_buffer, TCPOBJECT_BUFFERSIZE, 0); int currentError = WSAGetLastError(); if (currentError != WSAEWOULDBLOCK && currentError != 0) - Con::errorf(0, "Got an error! %u", currentError); + { + Con::errorf(0, "Got an error! %u - SimID %u", currentError, current_connection->identifier); + disconnected_objects[disconnected_object_count] = current_connection; + disconnected_object_count++; + } else if (data_length == 0) { Con::errorf(0, "Finished receiving?"); @@ -241,36 +312,53 @@ bool conTSExtensionUpdate(Linker::SimObject *obj, S32 argc, const char *argv[]) disconnected_objects[disconnected_object_count] = current_connection; disconnected_object_count++; + // Our actual buffer is +1 bytes in length, so will set the extra byte to 0x00 to ensure NULL termination + connection_information->buffer[connection_information->buffer_length] = 0x00; + + Con::errorf(0, "Stream Len: %u Bytes", connection_information->buffer_length); // Stream the data into ::onLine unsigned int current_start = 0; for (unsigned int split_iteration = 0; split_iteration < connection_information->buffer_length; split_iteration++) - if (connection_information->buffer[split_iteration] == '\n' || split_iteration == connection_information->buffer_length - 1) + { + bool streaming_line = false; + if (connection_information->buffer[split_iteration] == '\n') // || split_iteration == connection_information->buffer_length - 1 { - unsigned int desired_length = (split_iteration - current_start); - connection_information->buffer[connection_information->buffer_length - 1] = 0x00; - - if (desired_length == data_length) - current_connection->TSCall("onLine", 1, connection_information->buffer); - else - { - char *current_line = (char*)malloc(desired_length + 1); - memset(current_line, 0x00, desired_length + 1); - memcpy(current_line, &connection_information->buffer[current_start], desired_length); - current_line[desired_length + 1] = 0x00; - - // Is it some newline? - if (strlen(current_line) == 1 && current_line[0] == 0xD) // Carriage Return - current_line[0] = 0x20; // Space - - current_start = split_iteration + 1; - current_connection->TSCall("onLine", 1, current_line); - free(current_line); - } + connection_information->buffer[split_iteration] = 0x00; + streaming_line = true; } + else if (split_iteration == connection_information->buffer_length - 1) + streaming_line = true; + + //unsigned int desired_length = (split_iteration - current_start); + + //if (desired_length == data_length) + // current_connection->TSCall("onLine", 1, connection_information->buffer); + //else + //{ + //if(split_iteration != connection_information->buffer_length - 1) + // connection_information->buffer[split_iteration] = 0x00; + + if (streaming_line) + { + // Time to be clever: Since T2 doesn't care what happens to the string after it's passed in, I'm not + // Bothering to allocate more memory for the results. I'm just going to manipulate it to appear as + // different lines but in reality they're all sourced from the same memory. + char *current_line = &connection_information->buffer[current_start]; + + Con::errorf(0, "Streamed: %s", current_line); + + // If we just have a blank line (a carriage return), replace it with the space character + if (strlen(current_line) == 1 && current_line[0] == 0xD) + current_line[0] = 0x20; + + current_start = split_iteration + 1; + current_connection->CallMethod("onLine", 1, current_line); + } + } closesocket(connection_information->socket); connection_information->socket = 0; - free(connection_information->buffer); + delete[] connection_information->buffer; } else if (data_length <= TCPOBJECT_BUFFERSIZE) { @@ -279,27 +367,26 @@ bool conTSExtensionUpdate(Linker::SimObject *obj, S32 argc, const char *argv[]) // If our connection hasn't buffered anything yet if (connection_information->buffer == 0x00) { - connection_information->buffer = (char*)malloc(data_length); - memset(connection_information->buffer, 0x00, data_length); + // Allocate our memory with a +1 Byte Size (to ensure it's properly NULL terminated when we stream to ::onLine) + connection_information->buffer = new char[data_length + 1]; + memset(connection_information->buffer, 0x00, data_length + 1); connection_information->buffer_length = data_length; - memcpy(connection_information->buffer, character_buffer, data_length); + memcpy(connection_information->buffer, incoming_buffer, data_length); } else { unsigned int new_buffer_length = data_length + connection_information->buffer_length; - char *new_buffer = (char*)malloc(new_buffer_length); - memset(new_buffer, 0x00, new_buffer_length); + char *new_buffer = new char[new_buffer_length + 1]; + memset(new_buffer, 0x00, new_buffer_length + 1); // Copy the two halves memcpy(new_buffer, connection_information->buffer, connection_information->buffer_length); - memcpy(&new_buffer[connection_information->buffer_length], character_buffer, data_length); + memcpy(&new_buffer[connection_information->buffer_length], incoming_buffer, data_length); connection_information->buffer = new_buffer; connection_information->buffer_length = new_buffer_length; } - - memset(character_buffer, 0x00, TCPOBJECT_BUFFERSIZE); } }