Mercurial > repos > rhope
diff net.rhope @ 142:7bbdc034e347
Fix some bugs. Get basic network code working (epoll listener + accept connections). Start porting webserver.
author | Mike Pavone <pavone@retrodev.com> |
---|---|
date | Sun, 21 Nov 2010 16:33:17 -0500 |
parents | |
children | ff00538cd818 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/net.rhope Sun Nov 21 16:33:17 2010 -0500 @@ -0,0 +1,253 @@ + +Foreign C:libc +{ + setsockopt[sockfd(Int32,Naked),level(Int32,Naked),optname(Int32,Naked),optval(Int32,Raw Pointer),optlen(Int32,Naked):out(Int32,Naked)] + listen[sockfd(Int32,Naked),backlog(Int32,Naked):out(Int32,Naked)] + send[sockfd(Int32,Naked),data(Array,Raw Pointer),datalen(Int64,Naked):out(Int32,Naked)] + recv[sockfd(Int32,Naked),data(Array,Raw Pointer,Mutable),maxlen(Int64,Naked):out(Int32,Naked),data] + fcntl[fd(Int32,Naked),command(Int32,Naked),value(Int64,Naked):out(Int32,Naked)] + pipe[descriptors(Array,Raw Pointer,Mutable):out(Int32,Naked),descriptors] + epoll_create[size(Int32,Naked):status(Int32,Naked)] + epoll_ctl[epfd(Int32,Naked),operation(Int32,Naked),fd(Int32,Naked),event(epoll_event,Raw Pointer):status(Int32,Naked)] + epoll_wait[epfd(Int32,Naked),events(Array,Raw Pointer,Mutable),maxevents(Int32,Naked),timeout(Int32,Naked):out(Int32,Naked),events] +} + + +//Note: port numbers would more properly be UInt16, think about changing later +Foreign C:runtime +{ + _internal_bindnewsocket[port(Int32,Naked),setreuse(Int32,Naked):socket(Int32,Naked)] + _internal_accept[sockfd(Int32,Naked),addrbuf(Array,Raw Pointer,Mutable),buflen(Int32,Naked):consock(Int32,Naked),addrbuf] +} + +Blueprint epoll_event +{ + events(UInt32,Naked) + data(Int64,Naked) +} + +//EPOLLIN 1 +//EPOLLOUT 4 +//EPOLLPRI 2 +//EPOLL_CTL_ADD 1 +//EPOLL_CTL_DEL 2 +//EPOLL_CTL_MOD 3 + +_Add New FDs[epfd,pipefd:out] +{ + count,rdata <- read[pipefd, _internal_array_allocnaked[2,Int32()], 8i64] + ,out <- If[[count]=[8i64]] + { + data <- [rdata]Length <<[2] + epoll_ctl[epfd, 1i32, [data]Index[0], [[Build[epoll_event()]]events <<[Abs UInt[[data]Index[1]]]]data <<[Int64[[data]Index[0]]]] + { out <- _Add New FDs[epfd,pipefd] } + }{ + //TODO: Properly deal with the case when count > 0 but < 8 + If[[count]>[0i64]] + { + Print[["read of listener pipe returned unexpected number of bytes: "]Append[String[count]]] + } + } +} + +_Get IO Context[fd:out,notfound] uses socklisten +{ + out,notfound <- [socklisten::fdlookup]Index[fd] +} + +_Handle Events[epfd,pipefd,buf,cur:out] +{ + event,out <- [buf]Index[cur] + { + If[[event]data >>] + { + activefd <- Trunc Int32[[event]data >>] + epoll_ctl[epfd, 2, activefd, Build[epoll_event()]] + { + If[[[event]events >>]&[8216u32]] + { res <- No } + { res <- Yes } + ct,cont <- _Get IO Context[activefd] + { + }{ Print["Could not find context for IO event"] } + ,cont <- Resume[ct,res] + { + cont <- Yield[] + }{ + Print["could not resume context for IO event"] + } + } + }{ + cont <- _Add New FDs[epfd,pipefd] + } + Val[cont] + { + out <- _Handle Events[epfd,pipefd,buf,[cur]+[1]] + } + } +} + +_Wait Active[epfd,pipefd,buf] +{ + workaround <- Yield[] + Val[workaround] + { + count,newbuf <- epoll_wait[epfd,buf,[buf]Storage >>,-1] + If[[count]=[-1]] + { + Print["epoll_wait returned error"] + }{ + If[[count]=[0]] + { + //Shouldn't happen normally, but perhaps if there was a signal + _Wait Active[epfd,pipefd,buf] + }{ + _Handle Events[epfd,pipefd,[newbuf]Length <<[count],0] + { _Wait Active[epfd,pipefd,buf] } + } + } + } +} + +_Sock Listener[pipefd] +{ + epfd <- epoll_create[16] + If[[epfd]=[-1]] + { + Print["Error creating epoll file descriptor"] + }{ + If[[epoll_ctl[epfd,1i32,pipefd,[[Build[epoll_event()]]events <<[1u32]]data <<[0i64]]]=[-1]] + { + Print["Error adding pipe to epoll fd"] + }{ + _Wait Active[epfd, pipefd, _internal_array_allocnaked[8,epoll_event()]] + } + } +} + +Globals socklisten +{ + listener started <- No + fdlookup <- () + pipefd <- -1 +} + +_Add FD to Listener[fd,context:pipefd,err] uses socklisten +{ + If[socklisten::listener started] + { + pipefd <- socklisten::pipefd + do add <- Yes + }{ + //Calling a function with side effects inside a transaction is BAD + //Need to do something about this + ret,des <- pipe[[Array[]]Set[1, 0i32]] + If[[ret]=[-1]] + { + err <- "Error creating pipe for waking up socket listener" + }{ + socklisten::pipefd <- [des]Index[1] + pipefd <- [des]Index[1] + //fcntl[fd, F_SETFL, O_NONBLOCK] + //Set both ends of the pipe to non blocking + fcntl[[des]Index[0], 4i32, 2048i64] + fcntl[[des]Index[1], 4i32, 2048i64] + socklisten::listener started <- Yes + Call Async[_Sock Listener[[des]Index[0],?]] + do add <- Yes + } + } + Val[do add] + { + socklisten::fdlookup <- [socklisten::fdlookup]Set[fd, context] + } +} + +_Write to Listener Pipe[pipefd,data] +{ + res <- write[pipefd, data, 8i64] + If[[res]!=[8i32]] + { + workaround <- Yield[] + Val[workaround] + { _Write to Listener Pipe[pipefd,data] } + } +} + +_Wait for IO[fd,type,context] +{ + _Add FD to Listener[fd,context] + { + _Write to Listener Pipe[~,[[Array[]]Append[fd]]Append[type]] + }{ + Print[~] + { Resume[context,No] } + } +} + +Wait for IO[fd,type:out] +{ + out <- Pause[_Wait for IO[fd,type,?]] +} + +_Do Con Call[newfd,address,tocall] +{ + [tocall]Call[newfd,address] +} + +_Null Term[raw str,cur:out] +{ + [raw str]Index[cur] + { + If[[~]=[0u8]] + { + out <- String[[raw str]Length <<[cur]] + }{ + out <- _Null Term[raw str, [cur]+[1]] + } + }{ + out <- String[raw str] + } +} + +_Port Wait[fd,tocall:out] +{ + con <- _internal_accept[fd,_internal_array_allocnaked[40,UInt8()],40] {} + { address <- _Null Term[[~]Length <<[40], 0i32] } + If[[con]=[-1]] + { + If[Wait for IO[fd, 1i32]] + { + out <- _Port Wait[fd,tocall] + }{ + Print["Error waiting for connection"] + } + }{ + Call Async[_Do Con Call[con,address,tocall,?]] + { + out <- _Port Wait[fd,tocall] + } + } +} + +Listen on Port[port(Int32),tocall:out] +{ + fd <- _internal_bindnewsocket[port,1] + //fcntl[fd, F_SETFL, O_NONBLOCK] + //Set listen socket to non blocking + If[[fd]=[-1]] + { out <- No } + { + fcntl[fd, 4i32, 2048i64] + { listen[fd,8] + { out <- Call Async[_Port Wait[fd,tocall,?]] }} + } +} + +//This effectively leaks a context and thus any data on the stack of that context +//Need either handle cleanup of contexts or find a better way to accomplish this +Wait Forever[] +{ + Pause[Val[?]] +} +