view net.rhope @ 164:429afd920a23

Allow sending and receiving arrays of arbitrary objects rather than just UInt8s
author Mike Pavone <pavone@retrodev.com>
date Mon, 10 Jan 2011 00:25:35 -0500
parents e9a8269384bb
children 1bfc19076f1b
line wrap: on
line source


Foreign C:libc
{
	setsockopt[sockfd(Int32,Naked),level(Int32,Naked),optname(Int32,Naked),optval(Int32,Raw Pointer),optlen(Int32,Naked):out(Int32,Naked)]
	listen[sockfd(Int32,Naked),backlog(Int32,Naked):out(Int32,Naked)]
	send[sockfd(Int32,Naked),data(Array,Raw Pointer),datalen(Int64,Naked):out(Int32,Naked)]
	recv[sockfd(Int32,Naked),data(Array,Raw Pointer,Mutable),maxlen(Int64,Naked):out(Int32,Naked),data]
	fcntl[fd(Int32,Naked),command(Int32,Naked),value(Int64,Naked):out(Int32,Naked)]
	pipe[descriptors(Array,Raw Pointer,Mutable):out(Int32,Naked),descriptors]
	epoll_create[size(Int32,Naked):status(Int32,Naked)]
	epoll_ctl[epfd(Int32,Naked),operation(Int32,Naked),fd(Int32,Naked),event(epoll_event,Raw Pointer):status(Int32,Naked)]
	epoll_wait[epfd(Int32,Naked),events(Array,Raw Pointer,Mutable),maxevents(Int32,Naked),timeout(Int32,Naked):out(Int32,Naked),events]
}


//Note: port numbers would more properly be UInt16, think about changing later
Foreign C:runtime
{
	_internal_bindnewsocket[port(Int32,Naked),setreuse(Int32,Naked):socket(Int32,Naked)]
	_internal_accept[sockfd(Int32,Naked),addrbuf(Array,Raw Pointer,Mutable),buflen(Int32,Naked):consock(Int32,Naked),addrbuf]
	_internal_connectnewsocket[addr(Array,Raw Pointer),port(Int32,Naked):sockfd(Int32,Naked)]
}

Blueprint epoll_event
{
	events(UInt32,Naked)
	data(Int64,Naked)
}

//EPOLLIN 1
//EPOLLOUT 4
//EPOLLPRI 2
//EPOLL_CTL_ADD 1
//EPOLL_CTL_DEL 2
//EPOLL_CTL_MOD 3

_Add New FDs[epfd,pipefd:out]
{
	count,rdata <- read[pipefd, _internal_array_allocnaked[2,Int32()], 8i64]
	,out <- If[[count]=[8i64]]
	{
		data <- [rdata]Length <<[2]
		epoll_ctl[epfd, 1i32, [data]Index[0], [[Build[epoll_event()]]events <<[Abs UInt[[data]Index[1]]]]data <<[Int64[[data]Index[0]]]]
		{ out <- _Add New FDs[epfd,pipefd] }
	}{
		//TODO: Properly deal with the case when count > 0 but < 8
		If[[count]>[0i64]]
		{
			Print[["read of listener pipe returned unexpected number of bytes: "]Append[String[count]]]
		}
	}
}

_Get IO Context[fd:out,notfound] uses socklisten
{
	out,notfound <- [socklisten::fdlookup]Index[fd]
}

_Handle Events[epfd,pipefd,buf,cur:out]
{
	event,out <- [buf]Index[cur]
	{
		If[[event]data >>]
		{
			activefd <- Trunc Int32[[event]data >>]
			epoll_ctl[epfd, 2, activefd, Build[epoll_event()]]
			{	
				If[[[event]events >>]&[8216u32]]
				{ res <- No }
				{ res <- Yes }
				ct,cont <- _Get IO Context[activefd]
				{
				}{ Print["Could not find context for IO event"] }
				,cont <- Resume[ct,res]
				{
					cont <- Yield[]
				}{
					Print["could not resume context for IO event"]
				}
			}
		}{
			cont <- _Add New FDs[epfd,pipefd]
		}
		Val[cont]
		{
			out <- _Handle Events[epfd,pipefd,buf,[cur]+[1]]
		}
	}
}

_Wait Active[epfd,pipefd,buf]
{
	workaround <- Yield[]
	Val[workaround]
	{
		count,newbuf <- epoll_wait[epfd,buf,[buf]Storage >>,-1]
		If[[count]=[-1]]
		{
			Print["epoll_wait returned error"]
		}{
			If[[count]=[0]]
			{
				//Shouldn't happen normally, but perhaps if there was a signal
				_Wait Active[epfd,pipefd,buf]
			}{
				_Handle Events[epfd,pipefd,[newbuf]Length <<[count],0]
				{ _Wait Active[epfd,pipefd,buf] }
			}
		}
	}
}

_Sock Listener[pipefd]
{
	epfd <- epoll_create[16]
	If[[epfd]=[-1]]
	{
		Print["Error creating epoll file descriptor"]
	}{
		If[[epoll_ctl[epfd,1i32,pipefd,[[Build[epoll_event()]]events <<[1u32]]data <<[0i64]]]=[-1]]
		{
			Print["Error adding pipe to epoll fd"]
		}{
			_Wait Active[epfd, pipefd, _internal_array_allocnaked[8,epoll_event()]]
		}
	}
}

Globals socklisten
{
	listener started <- No
	fdlookup <- ()
	pipefd <- -1
}

_Add FD to Listener[fd,context:pipefd,err] uses socklisten
{
	If[socklisten::listener started]
	{
		pipefd <- socklisten::pipefd
		do add <- Yes
	}{
		//Calling a function with side effects inside a transaction is BAD
		//Need to do something about this
		ret,des <- pipe[[Array[]]Set[1, 0i32]]
		If[[ret]=[-1]]
		{
			err <- "Error creating pipe for waking up socket listener"
		}{
			socklisten::pipefd <- [des]Index[1]
			pipefd <- [des]Index[1]
			//fcntl[fd, F_SETFL, O_NONBLOCK]
			//Set both ends of the pipe to non blocking
			fcntl[[des]Index[0], 4i32, 2048i64]
			fcntl[[des]Index[1], 4i32, 2048i64]
			socklisten::listener started <- Yes
			Call Async[_Sock Listener[[des]Index[0],?]]
			do add <- Yes
		}
	}
	Val[do add]
	{
		socklisten::fdlookup <- [socklisten::fdlookup]Set[fd, context]
	}
}

_Write to Listener Pipe[pipefd,data]
{
	res <- write[pipefd, data, 8i64]
	If[[res]!=[8i32]]
	{
		workaround <- Yield[]
		Val[workaround]
		{ _Write to Listener Pipe[pipefd,data] }
	}
}

_Wait for IO[fd,type,context]
{
	_Add FD to Listener[fd,context]
	{
		_Write to Listener Pipe[~,[[Array[]]Append[fd]]Append[type]]
	}{
		Print[~]
		{ Resume[context,No] }
	}
}

Wait for IO[fd,type:out]
{
	out <- Pause[_Wait for IO[fd,type,?]]
}

_Do Con Call[newfd,address,tocall]
{
	[tocall]Call[newfd,address]
}

_Null Term[raw str,cur:out]
{
	[raw str]Index[cur]
	{
		If[[~]=[0u8]]
		{
			out <- String[[raw str]Length <<[cur]]
		}{
			out <- _Null Term[raw str, [cur]+[1]]
		}
	}{
		out <- String[raw str]
	}
}

_Port Wait[fd,tocall:out]
{
	con <- _internal_accept[fd,_internal_array_allocnaked[40,UInt8()],40] {}
	{ address <- _Null Term[[~]Length <<[40], 0i32] }
	If[[con]=[-1]]
	{
		If[Wait for IO[fd, 1i32]]
		{
			out <- _Port Wait[fd,tocall]
		}{
			Print["Error waiting for connection"]
		}
	}{
		fcntl[con, 4i32, 2048i64]
		{ Call Async[_Do Con Call[TCP Connection[con],address,tocall,?]]
		{ out <- _Port Wait[fd,tocall] }}
	}
}

Listen on Port[port(Int32),tocall:out]
{
	fd <- _internal_bindnewsocket[port,1]
	//fcntl[fd, F_SETFL, O_NONBLOCK]
	//Set listen socket to non blocking
	If[[fd]=[-1]]
	{ out <- No }
	{
		fcntl[fd, 4i32, 2048i64]
		{ listen[fd,8]
		{ out <- Call Async[_Port Wait[fd,tocall,?]] }}
	}
}

Blueprint TCP Connection
{
	Filedes
	Read Buffer
}

TCP Connection[fd:out]
{
	out <- [[Build[TCP Connection()] 
		]Filedes <<[fd]
		]Read Buffer <<[Array[]]
}

TCP Connect[address,port:out,err]
{
	addrbuf <- [[Flatten[address]]Buffer >>]Append[0u8]
	fd <- _internal_connectnewsocket[addrbuf, port]
	err <- If[[fd]=[-1]] {}
	{
		fcntl[fd, 4i32, 2048i64]
		out <- TCP Connection[fd]
	}
}

_Write@TCP Connection[con,buffer,wrote:out,err]
{
	
	new wrote <- write[[con]Filedes >>, buffer, Int64[[buffer]Length]]
	If[[new wrote]=[-1]]
	{
		If[Wait for IO[[con]Filedes >>, 4]]
		{
			out,err <- [con]_Write[buffer,wrote]
		}{
			err <- wrote
		}
	}{
		If[[new wrote]=[[buffer]Length]]
		{
			out <- con
		}{
			remaining <- [[buffer]Length]-[new wrote]
			next buf <- [_internal_array_copychunk[buffer, new wrote, _internal_array_allocnaked[remaining,UInt8()], 0, remaining]]Length <<[remaining]
			out,err <- [con]_Write[next buf,[wrote]+[new wrote]]
		}
	}
}

Write@TCP Connection[con,buffer(Array):out,err]
{
	If[[buffer]Length]
	{
		If[[Raw Size[[buffer]Eltype >>]] > [1]]
		{
			bytelen <- [[buffer]Length]*[Raw Size[[buffer]Eltype >>]]
			//Copy to a byte buffer until we have a better way of writing from an arbitrary byte offset
			bbuffer <- [_internal_array_copychunk[buffer, 0, _internal_array_allocnaked[bytelen, UInt8()], 0, [buffer]Length]]Length <<[bytelen]
		}{
			bbuffer <- Val[buffer]
		}
		out,err <- [con]_Write[bbuffer,0]
	}{
		out <- con
	}
}

_Read@TCP Connection[con,toread,buflist:data,out con,err]
{
	read[[con]Filedes >>, _internal_array_allocnaked[toread, UInt8()], Int64[toread]]
	{ numread <- Trunc Int32[~] }
	{ outbuf <- [~]Length <<[numread] }
	If[[numread]=[-1]]
	{
		If[Wait for IO[[con]Filedes >>, 1]]
		{
			data,out con,err <- [con]_Read[toread,buflist]
		}{
			err <- Fold[_Add Len[?], 0, buflist]
		}
	}{
		If[[numread]=[toread]]
		{
			data <- [buflist]Append[outbuf]
			out con <- con
		}{
			data,out con,err <- [con]_Read[[toread]-[numread], [buflist]Append[outbuf]]
		}
	}
}

_Add Len[len,buf:out]
{
	out <- [len]+[[buf]Length]
}

_Merge One Buf[inprog,src:out]
{
	oldlen <- [inprog]Length
	srclen <- [src]Length
	If[srclen]
	{
		out <- [_internal_array_copychunk[src, 0, inprog, oldlen, srclen]]Length <<[[oldlen]+[srclen]]
	}{
		out <- inprog
	}
}

_Merge Buffers[buflist:out]
{
	If[[[buflist]Length]=[1]]
	{
		out <- [buflist]Index[0]{}
		{ Print["This shouldn't happen!!!!"] }
	}{
		len <- Fold[_Add Len[?], 0, buflist]
		out <- Fold[_Merge One Buf[?], _internal_array_allocnaked[len, UInt8()], buflist]
	}
}

Read@TCP Connection[con,toread(Int32):data,out con,err]
{
	buflen <- [[con]Read Buffer >>]Length
	If[[toread]=[buflen]]
	{
		data <- [con]Read Buffer >>
		out con <- [con]Read Buffer <<[Array[]]
	}{
		If[[toread]<[buflen]]
		{
			data <- [[con]Read Buffer >>]Slice[toread] {}
			{ out con <- [con]Read Buffer <<[~] }
		}{
			If[buflen]
			{
				buflist <- [()]Append[[con]Read Buffer >>]
				ntoread <- [toread]-[[[con]Read Buffer >>]Length]
			}{
				ntoread <- Val[toread]
				buflist <- ()
			}
			,out con,err <- [[con]Read Buffer <<[Array[]]]_Read[ntoread,buflist]
			{ data <- _Merge Buffers[~] }
		}
	}
}

Read Type@TCP Connection[con,toread(Int32),type(Blueprint):data,out con,err]
{
	toreadbytes <- [toread]*[Raw Size[type]]
	,out con,err <- [con]Read[toreadbytes]
	{
		data <- [_internal_array_copychunk[~, 0, _internal_array_allocnaked[toread, type], 0, toreadbytes]]Length <<[toread] 
	}
}

_Check Partial[buffer,delim,offset:partial,none]
{
	none <- If[[offset]=[[buffer]Length]] {}
	{
		left <- [[buffer]Length]-[offset]
		If[_internal_memcmp_offset[buffer, offset, delim, 0, left]]
		{
			partial,none <- _Check Partial[buffer,delim,[offset]+[1]]
		}{
			partial <- offset
		}
	}
}

_Check Buffer[buffer,delim,offset:out,partial,none]
{
	dlen <- [delim]Length
	blen <- [buffer]Length
	If[[[offset]+[dlen]]<=[blen]]
	{
		If[_internal_memcmp_offset[buffer, offset, delim, 0, dlen]]
		{
			out,partial,none <- _Check Buffer[buffer,delim,[offset]+[1]]
		}{
			out <- offset
		}
	}{
		partial,none <- _Check Partial[buffer,delim,offset]
	}
}

_Check Split[old,new,delim,offset:out,not split]
{
	,not split <- If[[offset]<[[old]Length]]
	{
		left <- [[old]Length]-[offset]
		If[_internal_memcmp_offset[old, offset, delim, 0, left]]
		{
			next <- [offset]+[1]
		}{
			If[_internal_memcmp_offset[new, 0, delim, left, [[delim]Length]-[left]]]
			{
				next <- [offset]+[1]
			}{
				out <- offset
			}
		}
		out,not split <- _Check Split[old,new,delim,next]
	}
}

_Read Delim@TCP Connection[con,delim,poffset,buflist:data,out con,err]
{
	read[[con]Filedes >>, _internal_array_allocnaked[512i32, UInt8()], 512i64]
	{ numread <- Trunc Int32[~] }
	{ outbuf <- [~]Length <<[numread] }
	
	If[[numread]=[-1]]
	{
		If[Wait for IO[[con]Filedes >>, 1]]
		{
			data,out con,err <- [con]_Read Delim[delim,poffset,buflist]
		}{
			err <- [Fold[_Add Len[?], 0, buflist]]+[Length[[con]Read Buffer >>]]
		}
	}{
		,checknew <- If[[poffset]>=[0]]
		{
			If[[[delim]Length]>[[outbuf]Length]]
			{
				//Avoid possibility of having to check across more than 2 buffers
				data,out con,err <-[[con]Read Buffer <<[
						_Merge Buffers[[[buflist]Append[[con]Read Buffer >>]]Append[outbuf]]
					]
				]Read Delim[delim]
			}{
				,checknew <- _Check Split[[con]Read Buffer >>, outbuf, delim ,poffset]
				{
					before <- [_internal_array_copychunk[[con]Read Buffer >>, 0, _internal_array_allocnaked[~, UInt8()], 0, ~]
					]Length <<[~]
					
					data <- _Merge Buffers[[buflist]Append[before]]
					after off <- [[delim]Length]- [[[[con]Read Buffer >>]Length]-[~]]
					rbufferlen <- [[outbuf]Length]-[after off]
					out con <- [con]Read Buffer <<[
						[_internal_array_copychunk[outbuf, after off, 
							_internal_array_allocnaked[rbufferlen, UInt8()], 0, rbufferlen]
						]Length <<[rbufferlen]
					]
				}
			}
		}
		Val[checknew]
		{
			,npoffset <- _Check Buffer[outbuf,delim,0]
			{
				before <- [_internal_array_copychunk[outbuf, 0, _internal_array_allocnaked[~, UInt8()], 0, ~]
						]Length <<[~]
				If[[[con]Read Buffer >>]Length]
				{
					
					tomerge <- [[buflist]Append[[con]Read Buffer >>]]Append[before]
				}{
					tomerge <- [buflist]Append[before]
				}
				data <- _Merge Buffers[tomerge]
				rbufferlen <- [[outbuf]Length]-[[~]+[[delim]Length]]
				out con <- [con]Read Buffer <<[
					[_internal_array_copychunk[outbuf, [~]+[[delim]Length], 
						_internal_array_allocnaked[rbufferlen, UInt8()], 0, rbufferlen]
					]Length <<[rbufferlen]
				]
			}{
				If[[[con]Read Buffer >>]Length]
				{
					nbuflist <- [buflist]Append[[con]Read Buffer >>]
				}{
					nbuflist <- Val[buflist]
				}
				ncon <- [con]Read Buffer <<[outbuf]
			}{
				npoffset <- -1
				If[[[con]Read Buffer >>]Length]
				{
					nbuflist <- [[buflist]Append[[con]Read Buffer >>]]Append[outbuf]
					ncon <- [con]Read Buffer <<[Array[]]
				}{
					nbuflist <- [buflist]Append[outbuf]
					ncon <- Val[con]
				}
			}
		}
		data,out con,err <- [ncon]_Read Delim[delim,npoffset,nbuflist]
	}
}

Read Delim@TCP Connection[con,delim(Array):data,out con,err]
{
	,poffset <-_Check Buffer[[con]Read Buffer >>, delim,0]
	{
		data <- [_internal_array_copychunk[[con]Read Buffer >>, 0, _internal_array_allocnaked[~, UInt8()], 0, ~]]Length <<[~]
		rbufferlen <- [[[con]Read Buffer >>]Length]-[[~]+[[delim]Length]]
		out con <- [con]Read Buffer <<[
			[_internal_array_copychunk[[con]Read Buffer >>, [~]+[[delim]Length], 
				_internal_array_allocnaked[rbufferlen, UInt8()], 0, rbufferlen]
			]Length <<[rbufferlen]
		]
	}{  
		buflist <- ()
		ncon <- Val[con]
	}{ 
		If[[[con]Read Buffer >>]Length]
		{
			ncon <- [con]Read Buffer <<[Array[]]
			buflist <- [()]Append[[con]Read Buffer >>] 
		}{
			ncon <- Val[con]
			buflist <- ()
		}
		poffset <- -1
		
	}
	data,out con,err <- [ncon]_Read Delim[delim,poffset,buflist]
}

Close@TCP Connection[con:out]
{
	out <- close[[con]Filedes >>]
}

Globals Forever
{
	Wait <- No
}

_Wait Forever[context:out] uses Forever
{
	Forever::Wait <- context
	out <- Yes
}

//This effectively leaks a context and thus any data on the stack of that context
//Need either handle cleanup of contexts or find a better way to accomplish this
Wait Forever[]
{
	Pause[_Wait Forever[?]]
}

Stop Waiting[] uses Forever
{
	Resume[Forever::Wait, Yes]
}