view net.rhope @ 142:7bbdc034e347

Fix some bugs. Get basic network code working (epoll listener + accept connections). Start porting webserver.
author Mike Pavone <pavone@retrodev.com>
date Sun, 21 Nov 2010 16:33:17 -0500
parents
children ff00538cd818
line wrap: on
line source


Foreign C:libc
{
	setsockopt[sockfd(Int32,Naked),level(Int32,Naked),optname(Int32,Naked),optval(Int32,Raw Pointer),optlen(Int32,Naked):out(Int32,Naked)]
	listen[sockfd(Int32,Naked),backlog(Int32,Naked):out(Int32,Naked)]
	send[sockfd(Int32,Naked),data(Array,Raw Pointer),datalen(Int64,Naked):out(Int32,Naked)]
	recv[sockfd(Int32,Naked),data(Array,Raw Pointer,Mutable),maxlen(Int64,Naked):out(Int32,Naked),data]
	fcntl[fd(Int32,Naked),command(Int32,Naked),value(Int64,Naked):out(Int32,Naked)]
	pipe[descriptors(Array,Raw Pointer,Mutable):out(Int32,Naked),descriptors]
	epoll_create[size(Int32,Naked):status(Int32,Naked)]
	epoll_ctl[epfd(Int32,Naked),operation(Int32,Naked),fd(Int32,Naked),event(epoll_event,Raw Pointer):status(Int32,Naked)]
	epoll_wait[epfd(Int32,Naked),events(Array,Raw Pointer,Mutable),maxevents(Int32,Naked),timeout(Int32,Naked):out(Int32,Naked),events]
}


//Note: port numbers would more properly be UInt16, think about changing later
Foreign C:runtime
{
	_internal_bindnewsocket[port(Int32,Naked),setreuse(Int32,Naked):socket(Int32,Naked)]
	_internal_accept[sockfd(Int32,Naked),addrbuf(Array,Raw Pointer,Mutable),buflen(Int32,Naked):consock(Int32,Naked),addrbuf]
}

Blueprint epoll_event
{
	events(UInt32,Naked)
	data(Int64,Naked)
}

//EPOLLIN 1
//EPOLLOUT 4
//EPOLLPRI 2
//EPOLL_CTL_ADD 1
//EPOLL_CTL_DEL 2
//EPOLL_CTL_MOD 3

_Add New FDs[epfd,pipefd:out]
{
	count,rdata <- read[pipefd, _internal_array_allocnaked[2,Int32()], 8i64]
	,out <- If[[count]=[8i64]]
	{
		data <- [rdata]Length <<[2]
		epoll_ctl[epfd, 1i32, [data]Index[0], [[Build[epoll_event()]]events <<[Abs UInt[[data]Index[1]]]]data <<[Int64[[data]Index[0]]]]
		{ out <- _Add New FDs[epfd,pipefd] }
	}{
		//TODO: Properly deal with the case when count > 0 but < 8
		If[[count]>[0i64]]
		{
			Print[["read of listener pipe returned unexpected number of bytes: "]Append[String[count]]]
		}
	}
}

_Get IO Context[fd:out,notfound] uses socklisten
{
	out,notfound <- [socklisten::fdlookup]Index[fd]
}

_Handle Events[epfd,pipefd,buf,cur:out]
{
	event,out <- [buf]Index[cur]
	{
		If[[event]data >>]
		{
			activefd <- Trunc Int32[[event]data >>]
			epoll_ctl[epfd, 2, activefd, Build[epoll_event()]]
			{	
				If[[[event]events >>]&[8216u32]]
				{ res <- No }
				{ res <- Yes }
				ct,cont <- _Get IO Context[activefd]
				{
				}{ Print["Could not find context for IO event"] }
				,cont <- Resume[ct,res]
				{
					cont <- Yield[]
				}{
					Print["could not resume context for IO event"]
				}
			}
		}{
			cont <- _Add New FDs[epfd,pipefd]
		}
		Val[cont]
		{
			out <- _Handle Events[epfd,pipefd,buf,[cur]+[1]]
		}
	}
}

_Wait Active[epfd,pipefd,buf]
{
	workaround <- Yield[]
	Val[workaround]
	{
		count,newbuf <- epoll_wait[epfd,buf,[buf]Storage >>,-1]
		If[[count]=[-1]]
		{
			Print["epoll_wait returned error"]
		}{
			If[[count]=[0]]
			{
				//Shouldn't happen normally, but perhaps if there was a signal
				_Wait Active[epfd,pipefd,buf]
			}{
				_Handle Events[epfd,pipefd,[newbuf]Length <<[count],0]
				{ _Wait Active[epfd,pipefd,buf] }
			}
		}
	}
}

_Sock Listener[pipefd]
{
	epfd <- epoll_create[16]
	If[[epfd]=[-1]]
	{
		Print["Error creating epoll file descriptor"]
	}{
		If[[epoll_ctl[epfd,1i32,pipefd,[[Build[epoll_event()]]events <<[1u32]]data <<[0i64]]]=[-1]]
		{
			Print["Error adding pipe to epoll fd"]
		}{
			_Wait Active[epfd, pipefd, _internal_array_allocnaked[8,epoll_event()]]
		}
	}
}

Globals socklisten
{
	listener started <- No
	fdlookup <- ()
	pipefd <- -1
}

_Add FD to Listener[fd,context:pipefd,err] uses socklisten
{
	If[socklisten::listener started]
	{
		pipefd <- socklisten::pipefd
		do add <- Yes
	}{
		//Calling a function with side effects inside a transaction is BAD
		//Need to do something about this
		ret,des <- pipe[[Array[]]Set[1, 0i32]]
		If[[ret]=[-1]]
		{
			err <- "Error creating pipe for waking up socket listener"
		}{
			socklisten::pipefd <- [des]Index[1]
			pipefd <- [des]Index[1]
			//fcntl[fd, F_SETFL, O_NONBLOCK]
			//Set both ends of the pipe to non blocking
			fcntl[[des]Index[0], 4i32, 2048i64]
			fcntl[[des]Index[1], 4i32, 2048i64]
			socklisten::listener started <- Yes
			Call Async[_Sock Listener[[des]Index[0],?]]
			do add <- Yes
		}
	}
	Val[do add]
	{
		socklisten::fdlookup <- [socklisten::fdlookup]Set[fd, context]
	}
}

_Write to Listener Pipe[pipefd,data]
{
	res <- write[pipefd, data, 8i64]
	If[[res]!=[8i32]]
	{
		workaround <- Yield[]
		Val[workaround]
		{ _Write to Listener Pipe[pipefd,data] }
	}
}

_Wait for IO[fd,type,context]
{
	_Add FD to Listener[fd,context]
	{
		_Write to Listener Pipe[~,[[Array[]]Append[fd]]Append[type]]
	}{
		Print[~]
		{ Resume[context,No] }
	}
}

Wait for IO[fd,type:out]
{
	out <- Pause[_Wait for IO[fd,type,?]]
}

_Do Con Call[newfd,address,tocall]
{
	[tocall]Call[newfd,address]
}

_Null Term[raw str,cur:out]
{
	[raw str]Index[cur]
	{
		If[[~]=[0u8]]
		{
			out <- String[[raw str]Length <<[cur]]
		}{
			out <- _Null Term[raw str, [cur]+[1]]
		}
	}{
		out <- String[raw str]
	}
}

_Port Wait[fd,tocall:out]
{
	con <- _internal_accept[fd,_internal_array_allocnaked[40,UInt8()],40] {}
	{ address <- _Null Term[[~]Length <<[40], 0i32] }
	If[[con]=[-1]]
	{
		If[Wait for IO[fd, 1i32]]
		{
			out <- _Port Wait[fd,tocall]
		}{
			Print["Error waiting for connection"]
		}
	}{
		Call Async[_Do Con Call[con,address,tocall,?]]
		{
			out <- _Port Wait[fd,tocall]
		}
	}
}

Listen on Port[port(Int32),tocall:out]
{
	fd <- _internal_bindnewsocket[port,1]
	//fcntl[fd, F_SETFL, O_NONBLOCK]
	//Set listen socket to non blocking
	If[[fd]=[-1]]
	{ out <- No }
	{
		fcntl[fd, 4i32, 2048i64]
		{ listen[fd,8]
		{ out <- Call Async[_Port Wait[fd,tocall,?]] }}
	}
}

//This effectively leaks a context and thus any data on the stack of that context
//Need either handle cleanup of contexts or find a better way to accomplish this
Wait Forever[]
{
	Pause[Val[?]]
}