diff net.rhope @ 142:7bbdc034e347

Fix some bugs. Get basic network code working (epoll listener + accept connections). Start porting webserver.
author Mike Pavone <pavone@retrodev.com>
date Sun, 21 Nov 2010 16:33:17 -0500
parents
children ff00538cd818
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/net.rhope	Sun Nov 21 16:33:17 2010 -0500
@@ -0,0 +1,253 @@
+
+Foreign C:libc
+{
+	setsockopt[sockfd(Int32,Naked),level(Int32,Naked),optname(Int32,Naked),optval(Int32,Raw Pointer),optlen(Int32,Naked):out(Int32,Naked)]
+	listen[sockfd(Int32,Naked),backlog(Int32,Naked):out(Int32,Naked)]
+	send[sockfd(Int32,Naked),data(Array,Raw Pointer),datalen(Int64,Naked):out(Int32,Naked)]
+	recv[sockfd(Int32,Naked),data(Array,Raw Pointer,Mutable),maxlen(Int64,Naked):out(Int32,Naked),data]
+	fcntl[fd(Int32,Naked),command(Int32,Naked),value(Int64,Naked):out(Int32,Naked)]
+	pipe[descriptors(Array,Raw Pointer,Mutable):out(Int32,Naked),descriptors]
+	epoll_create[size(Int32,Naked):status(Int32,Naked)]
+	epoll_ctl[epfd(Int32,Naked),operation(Int32,Naked),fd(Int32,Naked),event(epoll_event,Raw Pointer):status(Int32,Naked)]
+	epoll_wait[epfd(Int32,Naked),events(Array,Raw Pointer,Mutable),maxevents(Int32,Naked),timeout(Int32,Naked):out(Int32,Naked),events]
+}
+
+
+//Note: port numbers would more properly be UInt16, think about changing later
+Foreign C:runtime
+{
+	_internal_bindnewsocket[port(Int32,Naked),setreuse(Int32,Naked):socket(Int32,Naked)]
+	_internal_accept[sockfd(Int32,Naked),addrbuf(Array,Raw Pointer,Mutable),buflen(Int32,Naked):consock(Int32,Naked),addrbuf]
+}
+
+Blueprint epoll_event
+{
+	events(UInt32,Naked)
+	data(Int64,Naked)
+}
+
+//EPOLLIN 1
+//EPOLLOUT 4
+//EPOLLPRI 2
+//EPOLL_CTL_ADD 1
+//EPOLL_CTL_DEL 2
+//EPOLL_CTL_MOD 3
+
+_Add New FDs[epfd,pipefd:out]
+{
+	count,rdata <- read[pipefd, _internal_array_allocnaked[2,Int32()], 8i64]
+	,out <- If[[count]=[8i64]]
+	{
+		data <- [rdata]Length <<[2]
+		epoll_ctl[epfd, 1i32, [data]Index[0], [[Build[epoll_event()]]events <<[Abs UInt[[data]Index[1]]]]data <<[Int64[[data]Index[0]]]]
+		{ out <- _Add New FDs[epfd,pipefd] }
+	}{
+		//TODO: Properly deal with the case when count > 0 but < 8
+		If[[count]>[0i64]]
+		{
+			Print[["read of listener pipe returned unexpected number of bytes: "]Append[String[count]]]
+		}
+	}
+}
+
+_Get IO Context[fd:out,notfound] uses socklisten
+{
+	out,notfound <- [socklisten::fdlookup]Index[fd]
+}
+
+_Handle Events[epfd,pipefd,buf,cur:out]
+{
+	event,out <- [buf]Index[cur]
+	{
+		If[[event]data >>]
+		{
+			activefd <- Trunc Int32[[event]data >>]
+			epoll_ctl[epfd, 2, activefd, Build[epoll_event()]]
+			{	
+				If[[[event]events >>]&[8216u32]]
+				{ res <- No }
+				{ res <- Yes }
+				ct,cont <- _Get IO Context[activefd]
+				{
+				}{ Print["Could not find context for IO event"] }
+				,cont <- Resume[ct,res]
+				{
+					cont <- Yield[]
+				}{
+					Print["could not resume context for IO event"]
+				}
+			}
+		}{
+			cont <- _Add New FDs[epfd,pipefd]
+		}
+		Val[cont]
+		{
+			out <- _Handle Events[epfd,pipefd,buf,[cur]+[1]]
+		}
+	}
+}
+
+_Wait Active[epfd,pipefd,buf]
+{
+	workaround <- Yield[]
+	Val[workaround]
+	{
+		count,newbuf <- epoll_wait[epfd,buf,[buf]Storage >>,-1]
+		If[[count]=[-1]]
+		{
+			Print["epoll_wait returned error"]
+		}{
+			If[[count]=[0]]
+			{
+				//Shouldn't happen normally, but perhaps if there was a signal
+				_Wait Active[epfd,pipefd,buf]
+			}{
+				_Handle Events[epfd,pipefd,[newbuf]Length <<[count],0]
+				{ _Wait Active[epfd,pipefd,buf] }
+			}
+		}
+	}
+}
+
+_Sock Listener[pipefd]
+{
+	epfd <- epoll_create[16]
+	If[[epfd]=[-1]]
+	{
+		Print["Error creating epoll file descriptor"]
+	}{
+		If[[epoll_ctl[epfd,1i32,pipefd,[[Build[epoll_event()]]events <<[1u32]]data <<[0i64]]]=[-1]]
+		{
+			Print["Error adding pipe to epoll fd"]
+		}{
+			_Wait Active[epfd, pipefd, _internal_array_allocnaked[8,epoll_event()]]
+		}
+	}
+}
+
+Globals socklisten
+{
+	listener started <- No
+	fdlookup <- ()
+	pipefd <- -1
+}
+
+_Add FD to Listener[fd,context:pipefd,err] uses socklisten
+{
+	If[socklisten::listener started]
+	{
+		pipefd <- socklisten::pipefd
+		do add <- Yes
+	}{
+		//Calling a function with side effects inside a transaction is BAD
+		//Need to do something about this
+		ret,des <- pipe[[Array[]]Set[1, 0i32]]
+		If[[ret]=[-1]]
+		{
+			err <- "Error creating pipe for waking up socket listener"
+		}{
+			socklisten::pipefd <- [des]Index[1]
+			pipefd <- [des]Index[1]
+			//fcntl[fd, F_SETFL, O_NONBLOCK]
+			//Set both ends of the pipe to non blocking
+			fcntl[[des]Index[0], 4i32, 2048i64]
+			fcntl[[des]Index[1], 4i32, 2048i64]
+			socklisten::listener started <- Yes
+			Call Async[_Sock Listener[[des]Index[0],?]]
+			do add <- Yes
+		}
+	}
+	Val[do add]
+	{
+		socklisten::fdlookup <- [socklisten::fdlookup]Set[fd, context]
+	}
+}
+
+_Write to Listener Pipe[pipefd,data]
+{
+	res <- write[pipefd, data, 8i64]
+	If[[res]!=[8i32]]
+	{
+		workaround <- Yield[]
+		Val[workaround]
+		{ _Write to Listener Pipe[pipefd,data] }
+	}
+}
+
+_Wait for IO[fd,type,context]
+{
+	_Add FD to Listener[fd,context]
+	{
+		_Write to Listener Pipe[~,[[Array[]]Append[fd]]Append[type]]
+	}{
+		Print[~]
+		{ Resume[context,No] }
+	}
+}
+
+Wait for IO[fd,type:out]
+{
+	out <- Pause[_Wait for IO[fd,type,?]]
+}
+
+_Do Con Call[newfd,address,tocall]
+{
+	[tocall]Call[newfd,address]
+}
+
+_Null Term[raw str,cur:out]
+{
+	[raw str]Index[cur]
+	{
+		If[[~]=[0u8]]
+		{
+			out <- String[[raw str]Length <<[cur]]
+		}{
+			out <- _Null Term[raw str, [cur]+[1]]
+		}
+	}{
+		out <- String[raw str]
+	}
+}
+
+_Port Wait[fd,tocall:out]
+{
+	con <- _internal_accept[fd,_internal_array_allocnaked[40,UInt8()],40] {}
+	{ address <- _Null Term[[~]Length <<[40], 0i32] }
+	If[[con]=[-1]]
+	{
+		If[Wait for IO[fd, 1i32]]
+		{
+			out <- _Port Wait[fd,tocall]
+		}{
+			Print["Error waiting for connection"]
+		}
+	}{
+		Call Async[_Do Con Call[con,address,tocall,?]]
+		{
+			out <- _Port Wait[fd,tocall]
+		}
+	}
+}
+
+Listen on Port[port(Int32),tocall:out]
+{
+	fd <- _internal_bindnewsocket[port,1]
+	//fcntl[fd, F_SETFL, O_NONBLOCK]
+	//Set listen socket to non blocking
+	If[[fd]=[-1]]
+	{ out <- No }
+	{
+		fcntl[fd, 4i32, 2048i64]
+		{ listen[fd,8]
+		{ out <- Call Async[_Port Wait[fd,tocall,?]] }}
+	}
+}
+
+//This effectively leaks a context and thus any data on the stack of that context
+//Need either handle cleanup of contexts or find a better way to accomplish this
+Wait Forever[]
+{
+	Pause[Val[?]]
+}
+