mirror of
https://github.com/Ragora/T2-CPP.git
synced 2026-03-06 14:00:22 +00:00
Non-blocking I/O
This commit is contained in:
parent
f5b0ff6a07
commit
120fd5a19b
3 changed files with 169 additions and 82 deletions
|
|
@ -8,7 +8,7 @@ namespace DX
|
|||
SimObject(unsigned int obj);
|
||||
|
||||
void deleteObject(void);
|
||||
const char *TSCall(const char *name, unsigned int argc, ...);
|
||||
const char *CallMethod(const char *name, unsigned int argc, ...);
|
||||
|
||||
const unsigned int &identifier;
|
||||
const unsigned int base_pointer_value;
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ namespace DX
|
|||
}
|
||||
}
|
||||
|
||||
const char *SimObject::TSCall(const char *name, unsigned int argc, ...)
|
||||
const char *SimObject::CallMethod(const char *name, unsigned int argc, ...)
|
||||
{
|
||||
char **argv = (char**)malloc(sizeof(char*) * (2 + argc));
|
||||
argv[0]= (char*)name;
|
||||
|
|
|
|||
|
|
@ -11,6 +11,32 @@
|
|||
static unsigned int TSEXTENSION_RUNNINGTCPOBJECTCOUNT = 0;
|
||||
static DX::TCPObject *TSEXTENSION_RUNNINGTCPOBJECTS[TCPOBJECT_MAXCOUNT];
|
||||
|
||||
// Since wants TS function call arguments to be of type char*, we use this
|
||||
// helper function to painlessly pass in unsigned int arguments for things
|
||||
// such as the return value from WSAGetLastError().
|
||||
__forceinline static char *S32ToCharPtr(unsigned int in)
|
||||
{
|
||||
char out[256];
|
||||
memset(out, NULL, 256);
|
||||
sprintf_s<256>(out, "%u", in);
|
||||
return out;
|
||||
}
|
||||
|
||||
// Also a helper function to return the status of a socket
|
||||
static bool DXTCP_GetSocketStatus(SOCKET sock)
|
||||
{
|
||||
fd_set sockets;
|
||||
sockets.fd_array[0] = sock;
|
||||
sockets.fd_count = 1;
|
||||
|
||||
// We don't want to do any waiting at all
|
||||
timeval wait_time;
|
||||
wait_time.tv_sec = 0;
|
||||
wait_time.tv_usec = 0;
|
||||
|
||||
return select(sock, &sockets, &sockets, NULL, &wait_time) != SOCKET_ERROR;
|
||||
}
|
||||
|
||||
inline DX::TCPObject *TCPObject_Find(unsigned int identifier)
|
||||
{
|
||||
// Make sure it's in our list of objects
|
||||
|
|
@ -29,8 +55,8 @@ typedef struct
|
|||
unsigned int buffer_length;
|
||||
char *buffer;
|
||||
|
||||
unsigned int send_queue_count;
|
||||
char *send_queue[TCPOBJECT_SENDQUEUELENGTH];
|
||||
unsigned int message_count;
|
||||
char *message_queue[TCPOBJECT_SENDQUEUELENGTH];
|
||||
|
||||
bool is_connected;
|
||||
|
||||
|
|
@ -63,7 +89,7 @@ inline bool TCPObject_Disconnect(unsigned int identifier)
|
|||
TSEXTENSION_RUNNINGTCPOBJECTS[iteration] = TSEXTENSION_RUNNINGTCPOBJECTS[iteration + 1];
|
||||
TSEXTENSION_RUNNINGTCPOBJECTCOUNT--;
|
||||
|
||||
obj->TSCall("onDisconnect", 0);
|
||||
obj->CallMethod("onDisconnect", 0);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -88,6 +114,7 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar
|
|||
connection->buffer = 0x00;
|
||||
connection->buffer_length = 0;
|
||||
connection->is_connected = false;
|
||||
connection->message_count = 0;
|
||||
connection->socket = 0;
|
||||
|
||||
// Hack: Store the Ptr to our connection information struct in the old unused state value
|
||||
|
|
@ -97,11 +124,8 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar
|
|||
char *target_hostname = strlwr(connection->target_hostname);
|
||||
|
||||
// Is it an IP we got?
|
||||
bool needs_dns_translation = false;
|
||||
if (strstr(target_hostname, "ip:"))
|
||||
target_hostname += 3; // Chop off the 'ip:' segment
|
||||
else
|
||||
needs_dns_translation = true;
|
||||
|
||||
// Did we get a port #?
|
||||
unsigned int desired_port = 0;
|
||||
|
|
@ -115,32 +139,28 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar
|
|||
else
|
||||
{
|
||||
Con::errorf(0, "No Port");
|
||||
operand->TSCall("onConnectFailed", 0);
|
||||
operand->CallMethod("onConnectFailed", 1, S32ToCharPtr(1));
|
||||
return "NO_PORT";
|
||||
}
|
||||
|
||||
// Perform a DNS Lookup if we need to
|
||||
if (needs_dns_translation)
|
||||
// Perform a DNS Lookup
|
||||
wchar_t hostname_dns[128];
|
||||
std::mbstowcs(hostname_dns, target_hostname, strlen(target_hostname) + 1);
|
||||
|
||||
PDNS_RECORD dns_record;
|
||||
if (DnsQuery(hostname_dns, DNS_TYPE_A, DNS_QUERY_BYPASS_CACHE, NULL, &dns_record, NULL))
|
||||
{
|
||||
wchar_t hostname_dns[128];
|
||||
std::mbstowcs(hostname_dns, target_hostname, strlen(target_hostname) + 1);
|
||||
|
||||
PDNS_RECORD dns_record;
|
||||
if (DnsQuery(hostname_dns, DNS_TYPE_A, DNS_QUERY_BYPASS_CACHE, NULL, &dns_record, NULL))
|
||||
{
|
||||
Con::errorf(0, "DNS Resolution Failed");
|
||||
operand->TSCall("onDNSFailed", 0);
|
||||
return "FAILED_DNS";
|
||||
}
|
||||
IN_ADDR result_address;
|
||||
result_address.S_un.S_addr = dns_record->Data.A.IpAddress;
|
||||
|
||||
// Free the DNS List
|
||||
DNS_FREE_TYPE freetype;
|
||||
DnsRecordListFree(dns_record, freetype);
|
||||
|
||||
target_hostname = inet_ntoa(result_address);
|
||||
Con::errorf(0, "DNS Resolution Failed");
|
||||
operand->CallMethod("onDNSFailed", 0);
|
||||
return "FAILED_DNS";
|
||||
}
|
||||
IN_ADDR result_address;
|
||||
result_address.S_un.S_addr = dns_record->Data.A.IpAddress;
|
||||
|
||||
DNS_FREE_TYPE freetype;
|
||||
DnsRecordListFree(dns_record, freetype);
|
||||
|
||||
target_hostname = inet_ntoa(result_address);
|
||||
|
||||
SOCKADDR_IN target_host;
|
||||
target_host.sin_family = AF_INET;
|
||||
|
|
@ -154,36 +174,33 @@ const char* conTCPObjectConnect(Linker::SimObject *obj, S32 argc, const char *ar
|
|||
if (connection->socket == INVALID_SOCKET)
|
||||
{
|
||||
Con::errorf(0, "Failed to create Socket!");
|
||||
operand->TSCall("onSocketCreationFailed", 0);
|
||||
operand->CallMethod("onConnectFailed", 2, S32ToCharPtr(2), S32ToCharPtr(WSAGetLastError()));
|
||||
return "FAILED_SOCKET_CREATION";
|
||||
}
|
||||
|
||||
// Set Blocking Mode (a non-zero for imode = non-blocking)
|
||||
u_long imode = 1;
|
||||
ioctlsocket(connection->socket, FIONBIO, &imode);
|
||||
|
||||
// Stick us in the TCPObject array
|
||||
TSEXTENSION_RUNNINGTCPOBJECTS[TSEXTENSION_RUNNINGTCPOBJECTCOUNT] = operand;
|
||||
TSEXTENSION_RUNNINGTCPOBJECTCOUNT++;
|
||||
|
||||
// Attempt the Connection
|
||||
if(connect(connection->socket, (SOCKADDR*)&target_host, sizeof(target_host)) == SOCKET_ERROR)
|
||||
connect(connection->socket, (SOCKADDR*)&target_host, sizeof(target_host));
|
||||
if (DXTCP_GetSocketStatus(connection->socket) == SOCKET_ERROR)
|
||||
{
|
||||
Con::errorf(0, "Failed to connect!");
|
||||
operand->TSCall("onConnectFailed", 0);
|
||||
operand->CallMethod("onConnectFailed", 2, S32ToCharPtr(3), S32ToCharPtr(WSAGetLastError()));
|
||||
return "CANNOT_CONNECT";
|
||||
}
|
||||
else
|
||||
{
|
||||
// Set Blocking Mode
|
||||
u_long imode = 1;
|
||||
ioctlsocket(connection->socket, FIONBIO, &imode);
|
||||
|
||||
// Connected
|
||||
connection->is_connected = true;
|
||||
|
||||
// Notify TS
|
||||
operand->TSCall("onConnected", 0);
|
||||
operand->CallMethod("onConnected", 0);
|
||||
return "SUCCESS";
|
||||
}
|
||||
|
||||
|
||||
return "unknown_error";
|
||||
return "UNKNOWN_ERROR";
|
||||
}
|
||||
|
||||
bool conTCPObjectSend(Linker::SimObject *obj, S32 argc, const char *argv[])
|
||||
|
|
@ -193,18 +210,24 @@ bool conTCPObjectSend(Linker::SimObject *obj, S32 argc, const char *argv[])
|
|||
if (!TCPObject_Find(atoi(argv[1])))
|
||||
return false;
|
||||
|
||||
Con::errorf(0, "Queued Data");
|
||||
DX::TCPObject operand((unsigned int)obj);
|
||||
ConnectionInformation *connection = (ConnectionInformation*)operand.state;
|
||||
|
||||
// Since we can be attempting to send data before we're connected, we'll just queue
|
||||
// the data here and send it all in our update function next call
|
||||
if (!connection->is_connected)
|
||||
Con::errorf(0, "Attempted to send before connected.");
|
||||
|
||||
if (send(connection->socket, argv[2], strlen(argv[2]), 0) == SOCKET_ERROR)
|
||||
{
|
||||
Con::errorf(0, "Unable to send data!");
|
||||
}
|
||||
// Tribes 2 probably deallocates the memory associated with the arguments at some point
|
||||
// so we'll copy the send payload into an independent chunk of memory
|
||||
char *send_payload = new char[strlen(argv[2]) + 1];
|
||||
memset(send_payload, 0x00, strlen(argv[2]) + 1);
|
||||
memcpy(send_payload, argv[2], strlen(argv[2]) + 1);
|
||||
|
||||
connection->message_queue[connection->message_count] = send_payload;
|
||||
connection->message_count++;
|
||||
|
||||
Con::errorf(0,"Queued data: %s", argv[2]);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -216,23 +239,71 @@ bool conTCPObjectDisconnect(Linker::SimObject *obj, S32 argc, const char *argv[]
|
|||
bool conTSExtensionUpdate(Linker::SimObject *obj, S32 argc, const char *argv[])
|
||||
{
|
||||
// Iterate through any active sockets
|
||||
static char *character_buffer = (char*)malloc(TCPOBJECT_BUFFERSIZE);
|
||||
static char *incoming_buffer = new char[TCPOBJECT_BUFFERSIZE];
|
||||
|
||||
// List of objects to D/C
|
||||
//TCPObject_Disconnect
|
||||
unsigned int disconnected_object_count = 0;
|
||||
static DX::TCPObject **disconnected_objects = (DX::TCPObject**)malloc(sizeof(DX::TCPObject*) * TCPOBJECT_MAXCOUNT);
|
||||
|
||||
for (unsigned int iteration = 0; iteration < TSEXTENSION_RUNNINGTCPOBJECTCOUNT; iteration++)
|
||||
{
|
||||
// Zero out the incoming buffer per iteration
|
||||
memset(incoming_buffer, 0x00, TCPOBJECT_BUFFERSIZE);
|
||||
|
||||
DX::TCPObject *current_connection = TSEXTENSION_RUNNINGTCPOBJECTS[iteration];
|
||||
ConnectionInformation *connection_information = (ConnectionInformation*)current_connection->state;
|
||||
|
||||
unsigned int data_length = recv(connection_information->socket, character_buffer, TCPOBJECT_BUFFERSIZE, 0);
|
||||
// FIXME: ::onConnect is never called if is where we finally realize we're connected
|
||||
// Check if we're ready to be performing network operations
|
||||
if (DXTCP_GetSocketStatus(connection_information->socket))
|
||||
connection_information->is_connected = true;
|
||||
else
|
||||
{
|
||||
Con::errorf(0,"Socket status error!");
|
||||
disconnected_objects[disconnected_object_count] = current_connection;
|
||||
disconnected_object_count++;
|
||||
break;
|
||||
}
|
||||
|
||||
// Process the send queue first
|
||||
bool connection_is_ready = true;
|
||||
if (connection_information->is_connected && connection_information->message_count != 0)
|
||||
for (unsigned int queue_iteration = 0; queue_iteration < connection_information->message_count; queue_iteration++)
|
||||
if (send(connection_information->socket, connection_information->message_queue[queue_iteration], strlen(connection_information->message_queue[queue_iteration]), 0) == SOCKET_ERROR)
|
||||
{
|
||||
int wsa_error = WSAGetLastError();
|
||||
// We're not ready yet, just break and we should eventually be ready
|
||||
if (wsa_error == WSAEWOULDBLOCK)
|
||||
{
|
||||
connection_is_ready = false;
|
||||
break;
|
||||
}
|
||||
|
||||
connection_information->is_connected = false;
|
||||
disconnected_objects[disconnected_object_count] = current_connection;
|
||||
disconnected_object_count++;
|
||||
|
||||
Con::errorf(0,"Got a send error! SimID: %u - Error %u", current_connection->identifier, wsa_error);
|
||||
break;
|
||||
}
|
||||
else
|
||||
delete[] connection_information->message_queue[queue_iteration];
|
||||
|
||||
// We can break if the connection was never made yet or if there was an error processing the message queue
|
||||
if (!connection_information->is_connected || !connection_is_ready)
|
||||
break;
|
||||
|
||||
// FIXME: Under send() error conditions we can't deallocate all of the associated memory
|
||||
connection_information->message_count = 0;
|
||||
unsigned int data_length = recv(connection_information->socket, incoming_buffer, TCPOBJECT_BUFFERSIZE, 0);
|
||||
|
||||
int currentError = WSAGetLastError();
|
||||
if (currentError != WSAEWOULDBLOCK && currentError != 0)
|
||||
Con::errorf(0, "Got an error! %u", currentError);
|
||||
{
|
||||
Con::errorf(0, "Got an error! %u - SimID %u", currentError, current_connection->identifier);
|
||||
disconnected_objects[disconnected_object_count] = current_connection;
|
||||
disconnected_object_count++;
|
||||
}
|
||||
else if (data_length == 0)
|
||||
{
|
||||
Con::errorf(0, "Finished receiving?");
|
||||
|
|
@ -241,36 +312,53 @@ bool conTSExtensionUpdate(Linker::SimObject *obj, S32 argc, const char *argv[])
|
|||
disconnected_objects[disconnected_object_count] = current_connection;
|
||||
disconnected_object_count++;
|
||||
|
||||
// Our actual buffer is +1 bytes in length, so will set the extra byte to 0x00 to ensure NULL termination
|
||||
connection_information->buffer[connection_information->buffer_length] = 0x00;
|
||||
|
||||
Con::errorf(0, "Stream Len: %u Bytes", connection_information->buffer_length);
|
||||
// Stream the data into ::onLine
|
||||
unsigned int current_start = 0;
|
||||
for (unsigned int split_iteration = 0; split_iteration < connection_information->buffer_length; split_iteration++)
|
||||
if (connection_information->buffer[split_iteration] == '\n' || split_iteration == connection_information->buffer_length - 1)
|
||||
{
|
||||
bool streaming_line = false;
|
||||
if (connection_information->buffer[split_iteration] == '\n') // || split_iteration == connection_information->buffer_length - 1
|
||||
{
|
||||
unsigned int desired_length = (split_iteration - current_start);
|
||||
connection_information->buffer[connection_information->buffer_length - 1] = 0x00;
|
||||
|
||||
if (desired_length == data_length)
|
||||
current_connection->TSCall("onLine", 1, connection_information->buffer);
|
||||
else
|
||||
{
|
||||
char *current_line = (char*)malloc(desired_length + 1);
|
||||
memset(current_line, 0x00, desired_length + 1);
|
||||
memcpy(current_line, &connection_information->buffer[current_start], desired_length);
|
||||
current_line[desired_length + 1] = 0x00;
|
||||
|
||||
// Is it some newline?
|
||||
if (strlen(current_line) == 1 && current_line[0] == 0xD) // Carriage Return
|
||||
current_line[0] = 0x20; // Space
|
||||
|
||||
current_start = split_iteration + 1;
|
||||
current_connection->TSCall("onLine", 1, current_line);
|
||||
free(current_line);
|
||||
}
|
||||
connection_information->buffer[split_iteration] = 0x00;
|
||||
streaming_line = true;
|
||||
}
|
||||
else if (split_iteration == connection_information->buffer_length - 1)
|
||||
streaming_line = true;
|
||||
|
||||
//unsigned int desired_length = (split_iteration - current_start);
|
||||
|
||||
//if (desired_length == data_length)
|
||||
// current_connection->TSCall("onLine", 1, connection_information->buffer);
|
||||
//else
|
||||
//{
|
||||
//if(split_iteration != connection_information->buffer_length - 1)
|
||||
// connection_information->buffer[split_iteration] = 0x00;
|
||||
|
||||
if (streaming_line)
|
||||
{
|
||||
// Time to be clever: Since T2 doesn't care what happens to the string after it's passed in, I'm not
|
||||
// Bothering to allocate more memory for the results. I'm just going to manipulate it to appear as
|
||||
// different lines but in reality they're all sourced from the same memory.
|
||||
char *current_line = &connection_information->buffer[current_start];
|
||||
|
||||
Con::errorf(0, "Streamed: %s", current_line);
|
||||
|
||||
// If we just have a blank line (a carriage return), replace it with the space character
|
||||
if (strlen(current_line) == 1 && current_line[0] == 0xD)
|
||||
current_line[0] = 0x20;
|
||||
|
||||
current_start = split_iteration + 1;
|
||||
current_connection->CallMethod("onLine", 1, current_line);
|
||||
}
|
||||
}
|
||||
|
||||
closesocket(connection_information->socket);
|
||||
connection_information->socket = 0;
|
||||
free(connection_information->buffer);
|
||||
delete[] connection_information->buffer;
|
||||
}
|
||||
else if (data_length <= TCPOBJECT_BUFFERSIZE)
|
||||
{
|
||||
|
|
@ -279,27 +367,26 @@ bool conTSExtensionUpdate(Linker::SimObject *obj, S32 argc, const char *argv[])
|
|||
// If our connection hasn't buffered anything yet
|
||||
if (connection_information->buffer == 0x00)
|
||||
{
|
||||
connection_information->buffer = (char*)malloc(data_length);
|
||||
memset(connection_information->buffer, 0x00, data_length);
|
||||
// Allocate our memory with a +1 Byte Size (to ensure it's properly NULL terminated when we stream to ::onLine)
|
||||
connection_information->buffer = new char[data_length + 1];
|
||||
memset(connection_information->buffer, 0x00, data_length + 1);
|
||||
|
||||
connection_information->buffer_length = data_length;
|
||||
memcpy(connection_information->buffer, character_buffer, data_length);
|
||||
memcpy(connection_information->buffer, incoming_buffer, data_length);
|
||||
}
|
||||
else
|
||||
{
|
||||
unsigned int new_buffer_length = data_length + connection_information->buffer_length;
|
||||
char *new_buffer = (char*)malloc(new_buffer_length);
|
||||
memset(new_buffer, 0x00, new_buffer_length);
|
||||
char *new_buffer = new char[new_buffer_length + 1];
|
||||
memset(new_buffer, 0x00, new_buffer_length + 1);
|
||||
|
||||
// Copy the two halves
|
||||
memcpy(new_buffer, connection_information->buffer, connection_information->buffer_length);
|
||||
memcpy(&new_buffer[connection_information->buffer_length], character_buffer, data_length);
|
||||
memcpy(&new_buffer[connection_information->buffer_length], incoming_buffer, data_length);
|
||||
|
||||
connection_information->buffer = new_buffer;
|
||||
connection_information->buffer_length = new_buffer_length;
|
||||
}
|
||||
|
||||
memset(character_buffer, 0x00, TCPOBJECT_BUFFERSIZE);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue