view net.c @ 75:0083b2f7b3c7

Partially working implementation of List. Modified build scripts to allow use of other compilers. Fixed some bugs involving method implementations on different types returning different numbers of outputs. Added Fold to the 'builtins' in the comipler.
author Mike Pavone <pavone@retrodev.com>
date Tue, 06 Jul 2010 07:52:59 -0400
parents 76568becd6d6
children
line wrap: on
line source

#ifdef WIN32
	#include <winsock.h>
#else
	#include <sys/types.h>
	#include <sys/socket.h>
	#include <netinet/in.h>
	#include <netdb.h>
	#include <arpa/inet.h>
#endif
#include <string.h>
#include <stdlib.h>
#include "datum.h"
#include "structs.h"
#include "vis_threading.h"
#include "interp.h"

#ifdef WIN32
	#define VIS_CloseSocket(sock)	closesocket(sock)
#else
	#define VIS_CloseSocket(sock)	close(sock)
#endif

#define NET_SEARCH_BUFFER_SIZE 512

typedef struct netbuffer {
	char buffer[NET_SEARCH_BUFFER_SIZE];
	int index;
	struct netbuffer * next;	
} netbuffer;
/*
typedef struct 
{
	int sockfd;
	netbuffer * buffers;
	VIS_CRITICAL_SECTION(lock)
} net_client_data;*/

int send_all(int sockfd, char * data, int len)
{
	int sent = 0;
	int temp;
	while(sent < len)
	{
		temp = send(sockfd, data+sent, len-sent, 0);
		if(temp < 0)
		{
			DEBUGPUTS("send_all() failed\n");
			return 0-sent;
		}
		sent += temp;
	}
	return sent;
}

int recv_all(int sockfd, char * data, int len)
{
	int temp;
	int received = 0;
	while(received < len)
	{
		temp = recv(sockfd, data, len-received, 0);
		if(temp < 0)
		{
			return 0 - received;
		}
		received += temp;	
	}
	return received;
}

int net_client_new(datum ** params, queue_entry * worker_entry)
{
	int sockfd, i;
	//net_client_data * net;
	struct sockaddr_in dest;
	struct hostent * h;
	char * host = params[0]->c.generic.data;
	int hostlen = params[0]->c.generic.len;
	for(i = 0; i < hostlen; ++i)
		if((host[i] < '0' || host[i] > '0') && host[i] != '.')
		{
			h = gethostbyname(host);
			if(!h)
			{
				release_ref(params[0]);
				release_ref(params[1]);
				params[0] = NULL;
				params[1] = new_datum(BUILTIN_TYPE_YESNO, 2, 0, worker_entry->instance->def->program);
				params[1]->c.integers.num_a = 0;
				return 0;
			}
			host = inet_ntoa(*((struct in_addr *)h->h_addr));
			break;
		}
	sockfd = socket(PF_INET, SOCK_STREAM, 0);
	
	dest.sin_family = AF_INET;
	dest.sin_port = htons(params[1]->c.integers.num_a);
	dest.sin_addr.s_addr = inet_addr(host);
	memset(&(dest.sin_zero), '\0', 8);
	
	release_ref(params[0]);
	release_ref(params[1]);
	params[0] = params[1] = NULL;
	
	if(connect(sockfd, (struct sockaddr *)&dest, sizeof(struct sockaddr)) != 1)
	{
		params[0] = new_datum(BUILTIN_TYPE_NETCLIENT, 2, 0, worker_entry->instance->def->program);//sizeof(net_client_data));
		params[0]->c.integers.num_a = sockfd;
		/*net = params[0]->c.generic.data;
		net->sockfd = sockfd;
		net->buffers = NULL;*/
		
//		VIS_InitializeCriticalSection(net->lock);
		DEBUGPUTS("Connection established\n");
	}
	else
	{
		params[1] = new_datum(BUILTIN_TYPE_YESNO, 2, 0, worker_entry->instance->def->program);
		params[1]->c.integers.num_a = 0;
		DEBUGPUTS("connect() failed\n");
	}
	return 0;
}

int net_client_put_string(datum ** params, queue_entry * worker_entry)
{
	//net_client_data * net = params[0]->c.generic.data;
	//int sockfd = net->sockfd;
	int sockfd = params[0]->c.integers.num_a;
	int to_send = params[1]->c.generic.len-1;
	char * data = params[1]->c.generic.data;
	int sent;
	if(to_send)
	{
		sent = send_all(sockfd, data, to_send);
		
		if(sent != to_send)
		{
			release_ref(params[0]);
			release_ref(params[1]);
			params[0] = NULL;
			params[1] = new_datum(BUILTIN_TYPE_WHOLE, 2, 0, worker_entry->instance->def->program);
			params[1]->c.integers.num_a = 0-sent;
			DEBUGPUTS("send() failed\n");
			return 0;
		}
	}
	DEBUGPRINTF("Sent %s\n", data);
	release_ref(params[1]);
	params[1] = NULL;
	return 0;
}

int net_client_get_fstring(datum ** params, queue_entry * worker_entry)
{
	char * buf;
	//net_client_data * net = params[0]->c.generic.data;
	//int sockfd = net->sockfd;
	int sockfd = params[0]->c.integers.num_a;
	int to_receive = params[1]->c.integers.num_a;
	int temp, received = 0;
	datum * output = new_datum(BUILTIN_TYPE_STRING, 1, params[1]->c.integers.num_a+1, worker_entry->instance->def->program);
	release_ref(params[1]);
	buf = output->c.generic.data;
	while(received < to_receive)
	{
		temp = recv(sockfd, buf+received, to_receive-received, 0);
		if(temp < 0)
		{
			release_ref(params[0]);
			params[0] = params[1] = NULL;
			params[2] = output;
			return 0;
		}
		received += temp;	
	}
	params[1] = output;
	params[2] = NULL;
	return 0;
}

int net_client_get_dstring(datum ** inputlist, queue_entry * worker_entry)
{
	BOOL found = FALSE;
	netbuffer buf;
	netbuffer * current, *first, *temp,*temp2;
	int i,j,k,startk;
	int found_entry;
	int string_offset;
	int search_offset;
	netbuffer * search_start;
	int search_start_offset;
	int *search_offsets;
	netbuffer ** search_starts;
	int *search_start_offsets;
	int read_bytes;
	int buf_pos;
	int sockfd = inputlist[0]->c.integers.num_a;
	//net_client_data * net = inputlist[0]->c.generic.data;
	
	list_data * list;
	if(inputlist[1]->company->type_id == BUILTIN_TYPE_LIST)
	{
		
		list = ((list_data *)inputlist[1]->c.generic.data);
		DEBUGPRINTF("Delimeter input is a list with %d entries.\n", list->num_entries);
		search_offsets = malloc(sizeof(int) * (list->num_entries));
		DEBUGPRINTF("Allocated %d bytes.\n", sizeof(int) * (list->num_entries));
		search_starts = malloc(sizeof(netbuffer *) * (list->num_entries));
		DEBUGPRINTF("Allocated %d bytes.\n", sizeof(netbuffer *) * (list->num_entries));
		search_start_offsets = malloc(sizeof(int) * (list->num_entries));
		DEBUGPRINTF("Allocated %d bytes.\n", sizeof(int) * (list->num_entries));
		for(i = 0; i < list->num_entries; ++i)
		{
			DEBUGPRINTF("Setting search_offsets[%d] = 0.\n", i);
			search_offsets[i] = 0;
		}
	}
	search_offset = 0;
		first = &buf;
		first->next = NULL;
		current = first;
		current->index = 0;
		first = current;
		read_bytes = 1;
		while(!found && read_bytes == 1)
		{
			buf_pos = 0;
			for(i = 0; i < NET_SEARCH_BUFFER_SIZE && !found; ++i)
			{
				if(i >= buf_pos)
				{
					read_bytes = recv(sockfd, current->buffer+i, 1, 0);
					if(read_bytes != 1)
						break;
					++buf_pos;
				}
				DEBUGPRINTF("Checking character #%d (%c)\n", i, current->buffer[i]);
				switch(inputlist[1]->company->type_id)
				{
				case BUILTIN_TYPE_WHOLE:
					if((int)current->buffer[i] == inputlist[1]->c.integers.num_a)
					{
						found = TRUE;
						search_offset = 1;
						search_start = current;
						search_start_offset = i;
					}
					break;
				case BUILTIN_TYPE_STRING:
					DEBUGPRINTF("Comparing with character %d of delim: %c (%X)\n", search_offset, ((char *)inputlist[1]->c.generic.data)[search_offset], ((char *)inputlist[1]->c.generic.data)[search_offset]);
					if(current->buffer[i] == ((char *)inputlist[1]->c.generic.data)[search_offset])
					{
						if(search_offset == 0)
						{
							search_start = current;
							search_start_offset = i;
						}
						++search_offset;
						DEBUGPRINTF("Search offset is: %d, delim len is: %d\n", search_offset, (inputlist[1]->c.generic.len-1));
						if(search_offset == (inputlist[1]->c.generic.len-1))
						{
							found = TRUE;
							DEBUGPUTS("Matched delim\n");
						}
					}
					else
					{
						if(search_offset > 0)
						{
							current = search_start;
							i = search_start_offset;
						}
						search_offset = 0;
					}
					break;
				case BUILTIN_TYPE_LIST:
					for(j = 0; j < list->num_entries; ++j)
					{
						DEBUGPRINTF("Testing list entry %d against character %d in buffer %d\n", j, i, current->index);
						if(list->entries[j]->company->type_id == BUILTIN_TYPE_WHOLE && (int)current->buffer[i] == list->entries[j]->c.integers.num_a)
						{
							DEBUGPUTS("Matched whole number entry.\n");
							found = TRUE;
							found_entry = j;
							search_offset = 1;
							search_start = current;
							search_start_offset = i;
							break;
						}
						else if(list->entries[j]->company->type_id == BUILTIN_TYPE_STRING)
						{
							DEBUGPUTS("String entry.\n");
							if(current->buffer[i] == ((char *)list->entries[j]->c.generic.data)[search_offsets[j]])
							{
								DEBUGPRINTF("%c in buffer matches character #%d in entry.\n", current->buffer[i], search_offsets[j]);
								if(search_offsets[j] == 0)
								{
									search_starts[j] = current;
									search_start_offsets[j] = i;
								}
								++search_offsets[j];
								if(search_offsets[j] == (list->entries[j]->c.generic.len-1))
								{
									DEBUGPUTS("Entire string matched.\n");
									found = TRUE;
									found_entry = j;
									search_offset = search_offsets[j];
									search_start = search_starts[j];
									search_start_offset = search_start_offsets[j];
									break;
								}
							}
							else if(search_offsets[j] > 0)
							{
								DEBUGPRINTF("%c in bufer does not match character #%d in entry.\n", current->buffer[i], search_offsets[j]);
								temp = search_starts[j];
								search_offsets[j] = 0;
								startk = search_start_offsets[j];
								while(temp && !found)
								{
									DEBUGPRINTF("Scanning block %d for possible missed match from %d to %d.\n", temp->index, startk, (temp == current ? i : NET_SEARCH_BUFFER_SIZE)-1);
									for(k = startk; k < (temp == current ? i : NET_SEARCH_BUFFER_SIZE); ++k)
									{
										if(temp->buffer[k] == ((char *)list->entries[j]->c.generic.data)[search_offsets[j]])
										{
											if(!search_offsets[j])
											{
												search_starts[j] = temp;
												search_start_offsets[j] = k;
											}
											++search_offsets[j];
											if(search_offset == (list->entries[j]->c.generic.len-1))
											{
												found = TRUE;
												found_entry = j;
												search_start = search_starts[j];
												search_start_offset = search_start_offsets[j];
											}
										}
										else
										{
											if(search_offsets[j] > 0)
											{
												temp = search_starts[j];
												k = search_start_offsets[j];
											}
											search_offsets[j] = 0;
										}
									}
									startk = 0;
									temp = temp->next;
								}
								
							}
							else
								search_offsets[j] = 0;
							
						}
					}
					break;
				}
			}
			if(!found && read_bytes == 1)
			{
				current->next = malloc(sizeof(netbuffer));
				current->next->index = current->index+1;
				//current->next->offset = current->next->filled = 0;
				current->next->next = NULL;
				current = current->next;
			}
		}
	if(inputlist[1]->company->type_id == BUILTIN_TYPE_LIST)
	{
		VIS_FREE(search_offsets, "Get DString@Net Clinet, search offsets");
		VIS_FREE(search_starts, "Get DString@Net Clinet, search starts");
		VIS_FREE(search_start_offsets, "Get DString@Net Clinet, search start offsets");
	}
	if(found)
	{
		if(inputlist[1]->company->type_id == BUILTIN_TYPE_LIST)
		{
			inputlist[2] = add_ref(list->entries[found_entry]);
			release_ref(inputlist[1]);
		}
		else
			inputlist[2] = inputlist[1];
		inputlist[3] = NULL;
	}
	else
	{
		release_ref(inputlist[1]);
		inputlist[3] = new_datum(BUILTIN_TYPE_YESNO, 2, 0, worker_entry->instance->def->program);
		datum_set_yesno(inputlist[3], 0);
		inputlist[2] = NULL;
		release_ref(inputlist[0]);
		inputlist[0] = NULL;
	}
	//Does this need to be here still or was it just working around another bug?
	if(search_start_offset < 0)
		search_start_offset = 0;
	if(!found) {
		search_start = current;
		search_start_offset = i;
	}
	DEBUGPRINTF("Allocating string of length: %d = %d * %d + %d + 1\n", NET_SEARCH_BUFFER_SIZE * search_start->index + search_start_offset+1, NET_SEARCH_BUFFER_SIZE , search_start->index, search_start_offset);
	inputlist[1] = new_datum(BUILTIN_TYPE_STRING, 1, NET_SEARCH_BUFFER_SIZE * search_start->index + search_start_offset+1, worker_entry->instance->def->program);
	temp = first;
	string_offset = 0;
	while(temp)
	{
		DEBUGPRINTF("Copying from index %d to offset %X\n", temp->index, string_offset);
		if(temp == search_start)
		{
			//if(found)
			//{
				temp->buffer[search_start_offset] = '\0';
				memcpy(((char *)inputlist[1]->c.generic.data)+string_offset, temp->buffer, search_start_offset);
				string_offset += search_start_offset;
			/*}
			else
			{
				memcpy(((char *)inputlist[1]->c.generic.data)+string_offset, temp->buffer, i);
				string_offset += i;
			}*/
			break;
		}
		else
		{
			memcpy(((char *)inputlist[1]->c.generic.data)+string_offset, temp->buffer, NET_SEARCH_BUFFER_SIZE);
			string_offset += NET_SEARCH_BUFFER_SIZE;
		}
		if(temp != first)
		{
			temp2 = temp->next;
			VIS_FREE(temp, "Get DString@Net Client, buffer node");
			temp = temp2;
		}
		else
			temp = temp->next;
	}
	while(temp)
	{
		if(temp != first)
		{
			temp2 = temp->next;
			VIS_FREE(temp, "Get DString@Net Client, buffer node");
			temp = temp2;
		}
		else
			temp = temp->next;
	}
	((char *)inputlist[1]->c.generic.data)[string_offset] = '\0';
	DEBUGPRINTF("Retrieved: %s\ninputlist[0] = %X\n", inputlist[1]->c.generic.data, inputlist[0]);
	return 0;
}

int net_client_put_raw(datum ** params, queue_entry * entry)
{
	int sockfd = params[0]->c.integers.num_a;
	int sent;
	if(params[1]->union_type == 1 && params[1]->c.generic.len)
	{
		sent = send_all(sockfd, params[1]->c.generic.data, params[1]->c.generic.len);
		if(sent <= 0)
		{
			release_ref(params[0]);
			release_ref(params[1]);
			params[0] = NULL;
			params[1] = new_datum(BUILTIN_TYPE_WHOLE, 2, 0, entry->instance->def->program);
			params[1]->c.integers.num_a = 0-sent;
			return 0;
		}
	}
	release_ref(params[1]);
	params[1] = NULL;
	return 0;
}

int net_client_get_raw(datum ** params, queue_entry * entry)
{
	int sockfd = params[0]->c.integers.num_a;
	int got;
	params[1] = copy_datum(params[1], 0);
	if(params[1]->union_type == 1 && params[1]->c.generic.len)
	{
		got = recv_all(sockfd, params[1]->c.generic.data, params[1]->c.generic.len);
		if(got <= 0)
		{
			release_ref(params[0]);
			release_ref(params[1]);
			params[0] = params[1] = NULL;
			params[2] = new_datum(BUILTIN_TYPE_WHOLE, 2, 0, entry->instance->def->program);
			params[2]->c.integers.num_a = 0-got;
			return 0;
		}
	}
	params[2] = NULL;
	return 0;
}

typedef struct
{
	int sockfd;
	datum * callback;
	program * program;
} net_listen_data;

DWORD WINAPI listen_thread(net_listen_data * listen)
{
	worker_datum * work = listen->callback->c.generic.data;
	struct sockaddr_in client_addy;
	int addy_len;
	int newsock;
	int i;
	queue_entry entry;
	datum * params[32];
	BOOL netparam;
	worker_instance inst;
	entry.worker_num = 0;
	entry.instance = &inst;
	inst.def = listen->program->defs->deflist;//Use Main
	inst.caller_instance = NULL;
	inst.trans = NULL;
	inst.num_workers = inst.num_wires = 0;
	VIS_InitializeCriticalSection(inst.counter_lock);

	while(execute_active)
	{
		addy_len = sizeof(client_addy);
		newsock = accept(listen->sockfd, (struct sockaddr *)&client_addy, &addy_len);
		if(newsock != -1)
		{
			netparam = FALSE;
			for(i = 0; i < work->def->num_inputs; ++i)
				if(work->params[i])
					params[i] = add_ref(work->params[i]);
				else if(!netparam)
				{
					params[i] = new_datum(BUILTIN_TYPE_NETCLIENT, 2, 0, listen->program);
					params[i]->c.integers.num_a = newsock;
					netparam = TRUE;
				}
				else
					params[i] = NULL;
			//Make sure that our fake instance is never cleaned up
			inst.in_progress_count = inst.in_queue_count = 1000;
			execute_def(work->def, entry, params, NULL);
		}
	}
	release_ref(listen->callback);
	VIS_CloseSocket(listen->sockfd);
	VIS_FREE(listen, "Net listener object");
	return 0;
}

int vis_net_listenport(datum ** params, queue_entry * entry)
{
	int junk;
	struct sockaddr_in my_address;
	int port = params[0]->c.integers.num_a;
	net_listen_data * listener = malloc(sizeof(net_listen_data));
	release_ref(params[0]);
	
	DEBUGPUTS("calling socket\n");
	listener->callback = params[1];
	listener->sockfd = socket(PF_INET, SOCK_STREAM, 0);
	listener->program = entry->instance->def->program;
	if(listener->sockfd == -1)
	{
		release_ref(params[1]);
		VIS_FREE(listener, "net listener object");
		params[1] = new_datum(BUILTIN_TYPE_YESNO, 2, 0, entry->instance->def->program);
		params[1]->c.integers.num_a = 0;
		params[0] = NULL;
		return 0;
	}
	DEBUGPRINTF("Socket: %X\n", listener->sockfd);
	
	my_address.sin_family = AF_INET;
	my_address.sin_port = htons(port);
	my_address.sin_addr.s_addr = INADDR_ANY;
	memset(&(my_address.sin_zero), '\0', 8);
	
	DEBUGPRINTF("Calling bind on port: %d\n", port);
	junk = bind(listener->sockfd, (struct sockaddr *)&my_address, sizeof(struct sockaddr));
	DEBUGPRINTF("Bind returned: %d\n", junk);
	if(junk == -1)
	{
		perror("bind");
		DEBUGPUTS("bind failed, releasing ref to callback worker\n");
		release_ref(params[1]);
		DEBUGPUTS("Closing socket\n");
		DEBUGPRINTF("Socket: %X\n", listener->sockfd);
		VIS_CloseSocket(listener->sockfd);
		DEBUGPUTS("Freeing listener data\n");
		VIS_FREE(listener, "net listener object");
		DEBUGPUTS("Allocating error output\n");
		params[1] = new_datum(BUILTIN_TYPE_YESNO, 2, 0, entry->instance->def->program);
		params[1]->c.integers.num_a = 0;
		params[0] = NULL;
		return 0;
	}
	
	DEBUGPUTS("Calling listen\n");
	if(listen(listener->sockfd, 10) == -1)
	{
		release_ref(params[1]);
		VIS_CloseSocket(listener->sockfd);
		VIS_FREE(listener, "net listener object");
		params[1] = new_datum(BUILTIN_TYPE_YESNO, 2, 0, entry->instance->def->program);
		params[1]->c.integers.num_a = 0;
		params[0] = NULL;
		return 0;
	}
	DEBUGPUTS("Creating new thread\n");
	VIS_NewThread(listen_thread, listener);
	params[0] = new_datum(BUILTIN_TYPE_YESNO, 2, 0, entry->instance->def->program);
	params[0]->c.integers.num_a = 0;
	params[1] = NULL;
	return 0;
}