Mercurial > repos > rhope
view net.rhope @ 150:50c97b448f44
Add Previous method to List
author | Mike Pavone <pavone@retrodev.com> |
---|---|
date | Wed, 24 Nov 2010 02:28:01 +0000 |
parents | f3686f60985d |
children | e9a8269384bb |
line wrap: on
line source
Foreign C:libc { setsockopt[sockfd(Int32,Naked),level(Int32,Naked),optname(Int32,Naked),optval(Int32,Raw Pointer),optlen(Int32,Naked):out(Int32,Naked)] listen[sockfd(Int32,Naked),backlog(Int32,Naked):out(Int32,Naked)] send[sockfd(Int32,Naked),data(Array,Raw Pointer),datalen(Int64,Naked):out(Int32,Naked)] recv[sockfd(Int32,Naked),data(Array,Raw Pointer,Mutable),maxlen(Int64,Naked):out(Int32,Naked),data] fcntl[fd(Int32,Naked),command(Int32,Naked),value(Int64,Naked):out(Int32,Naked)] pipe[descriptors(Array,Raw Pointer,Mutable):out(Int32,Naked),descriptors] epoll_create[size(Int32,Naked):status(Int32,Naked)] epoll_ctl[epfd(Int32,Naked),operation(Int32,Naked),fd(Int32,Naked),event(epoll_event,Raw Pointer):status(Int32,Naked)] epoll_wait[epfd(Int32,Naked),events(Array,Raw Pointer,Mutable),maxevents(Int32,Naked),timeout(Int32,Naked):out(Int32,Naked),events] } //Note: port numbers would more properly be UInt16, think about changing later Foreign C:runtime { _internal_bindnewsocket[port(Int32,Naked),setreuse(Int32,Naked):socket(Int32,Naked)] _internal_accept[sockfd(Int32,Naked),addrbuf(Array,Raw Pointer,Mutable),buflen(Int32,Naked):consock(Int32,Naked),addrbuf] } Blueprint epoll_event { events(UInt32,Naked) data(Int64,Naked) } //EPOLLIN 1 //EPOLLOUT 4 //EPOLLPRI 2 //EPOLL_CTL_ADD 1 //EPOLL_CTL_DEL 2 //EPOLL_CTL_MOD 3 _Add New FDs[epfd,pipefd:out] { count,rdata <- read[pipefd, _internal_array_allocnaked[2,Int32()], 8i64] ,out <- If[[count]=[8i64]] { data <- [rdata]Length <<[2] epoll_ctl[epfd, 1i32, [data]Index[0], [[Build[epoll_event()]]events <<[Abs UInt[[data]Index[1]]]]data <<[Int64[[data]Index[0]]]] { out <- _Add New FDs[epfd,pipefd] } }{ //TODO: Properly deal with the case when count > 0 but < 8 If[[count]>[0i64]] { Print[["read of listener pipe returned unexpected number of bytes: "]Append[String[count]]] } } } _Get IO Context[fd:out,notfound] uses socklisten { out,notfound <- [socklisten::fdlookup]Index[fd] } _Handle Events[epfd,pipefd,buf,cur:out] { event,out <- [buf]Index[cur] { If[[event]data >>] { activefd <- Trunc Int32[[event]data >>] epoll_ctl[epfd, 2, activefd, Build[epoll_event()]] { If[[[event]events >>]&[8216u32]] { res <- No } { res <- Yes } ct,cont <- _Get IO Context[activefd] { }{ Print["Could not find context for IO event"] } ,cont <- Resume[ct,res] { cont <- Yield[] }{ Print["could not resume context for IO event"] } } }{ cont <- _Add New FDs[epfd,pipefd] } Val[cont] { out <- _Handle Events[epfd,pipefd,buf,[cur]+[1]] } } } _Wait Active[epfd,pipefd,buf] { workaround <- Yield[] Val[workaround] { count,newbuf <- epoll_wait[epfd,buf,[buf]Storage >>,-1] If[[count]=[-1]] { Print["epoll_wait returned error"] }{ If[[count]=[0]] { //Shouldn't happen normally, but perhaps if there was a signal _Wait Active[epfd,pipefd,buf] }{ _Handle Events[epfd,pipefd,[newbuf]Length <<[count],0] { _Wait Active[epfd,pipefd,buf] } } } } } _Sock Listener[pipefd] { epfd <- epoll_create[16] If[[epfd]=[-1]] { Print["Error creating epoll file descriptor"] }{ If[[epoll_ctl[epfd,1i32,pipefd,[[Build[epoll_event()]]events <<[1u32]]data <<[0i64]]]=[-1]] { Print["Error adding pipe to epoll fd"] }{ _Wait Active[epfd, pipefd, _internal_array_allocnaked[8,epoll_event()]] } } } Globals socklisten { listener started <- No fdlookup <- () pipefd <- -1 } _Add FD to Listener[fd,context:pipefd,err] uses socklisten { If[socklisten::listener started] { pipefd <- socklisten::pipefd do add <- Yes }{ //Calling a function with side effects inside a transaction is BAD //Need to do something about this ret,des <- pipe[[Array[]]Set[1, 0i32]] If[[ret]=[-1]] { err <- "Error creating pipe for waking up socket listener" }{ socklisten::pipefd <- [des]Index[1] pipefd <- [des]Index[1] //fcntl[fd, F_SETFL, O_NONBLOCK] //Set both ends of the pipe to non blocking fcntl[[des]Index[0], 4i32, 2048i64] fcntl[[des]Index[1], 4i32, 2048i64] socklisten::listener started <- Yes Call Async[_Sock Listener[[des]Index[0],?]] do add <- Yes } } Val[do add] { socklisten::fdlookup <- [socklisten::fdlookup]Set[fd, context] } } _Write to Listener Pipe[pipefd,data] { res <- write[pipefd, data, 8i64] If[[res]!=[8i32]] { workaround <- Yield[] Val[workaround] { _Write to Listener Pipe[pipefd,data] } } } _Wait for IO[fd,type,context] { _Add FD to Listener[fd,context] { _Write to Listener Pipe[~,[[Array[]]Append[fd]]Append[type]] }{ Print[~] { Resume[context,No] } } } Wait for IO[fd,type:out] { out <- Pause[_Wait for IO[fd,type,?]] } _Do Con Call[newfd,address,tocall] { [tocall]Call[newfd,address] } _Null Term[raw str,cur:out] { [raw str]Index[cur] { If[[~]=[0u8]] { out <- String[[raw str]Length <<[cur]] }{ out <- _Null Term[raw str, [cur]+[1]] } }{ out <- String[raw str] } } _Port Wait[fd,tocall:out] { con <- _internal_accept[fd,_internal_array_allocnaked[40,UInt8()],40] {} { address <- _Null Term[[~]Length <<[40], 0i32] } If[[con]=[-1]] { If[Wait for IO[fd, 1i32]] { out <- _Port Wait[fd,tocall] }{ Print["Error waiting for connection"] } }{ fcntl[con, 4i32, 2048i64] { Call Async[_Do Con Call[TCP Connection[con],address,tocall,?]] { out <- _Port Wait[fd,tocall] }} } } Listen on Port[port(Int32),tocall:out] { fd <- _internal_bindnewsocket[port,1] //fcntl[fd, F_SETFL, O_NONBLOCK] //Set listen socket to non blocking If[[fd]=[-1]] { out <- No } { fcntl[fd, 4i32, 2048i64] { listen[fd,8] { out <- Call Async[_Port Wait[fd,tocall,?]] }} } } Blueprint TCP Connection { Filedes Read Buffer } TCP Connection[fd:out] { out <- [[Build[TCP Connection()] ]Filedes <<[fd] ]Read Buffer <<[Array[]] } _Write@TCP Connection[con,buffer,wrote:out,err] { new wrote <- write[[con]Filedes >>, buffer, Int64[[buffer]Length]] If[[new wrote]=[-1]] { If[Wait for IO[[con]Filedes >>, 4]] { out,err <- [con]_Write[buffer,wrote] }{ err <- wrote } }{ If[[new wrote]=[[buffer]Length]] { out <- con }{ remaining <- [[buffer]Length]-[new wrote] next buf <- [_internal_array_copychunk[buffer, new wrote, _internal_array_allocnaked[remaining,UInt8()], 0, remaining]]Length <<[remaining] out,err <- [con]_Write[next buf,[wrote]+[new wrote]] } } } Write@TCP Connection[con,buffer(Array):out,err] { If[[buffer]Length] { out,err <- [con]_Write[buffer,0] }{ out <- con } } _Read@TCP Connection[con,toread,buflist:data,out con,err] { read[[con]Filedes >>, _internal_array_allocnaked[toread, UInt8()], Int64[toread]] { numread <- Trunc Int32[~] } { outbuf <- [~]Length <<[numread] } If[[numread]=[-1]] { If[Wait for IO[[con]Filedes >>, 1]] { data,out con,err <- [con]_Read[toread,buflist] }{ err <- Fold[_Add Len[?], 0, buflist] } }{ If[[numread]=[toread]] { data <- [buflist]Append[outbuf] out con <- con }{ data,out con,err <- [con]_Read[[toread]-[numread], [buflist]Append[outbuf]] } } } _Add Len[len,buf:out] { out <- [len]+[[buf]Length] } _Merge One Buf[inprog,src:out] { oldlen <- [inprog]Length srclen <- [src]Length If[srclen] { out <- [_internal_array_copychunk[src, 0, inprog, oldlen, srclen]]Length <<[[oldlen]+[srclen]] }{ out <- inprog } } _Merge Buffers[buflist:out] { If[[[buflist]Length]=[1]] { out <- [buflist]Index[0]{} { Print["This shouldn't happen!!!!"] } }{ len <- Fold[_Add Len[?], 0, buflist] out <- Fold[_Merge One Buf[?], _internal_array_allocnaked[len, UInt8()], buflist] } } Read@TCP Connection[con,toread(Int32):data,out con,err] { buflen <- [[con]Read Buffer >>]Length If[[toread]=[buflen]] { data <- [con]Read Buffer >> out con <- [con]Read Buffer <<[Array[]] }{ If[[toread]<[buflen]] { data <- [[con]Read Buffer >>]Slice[toread] {} { out con <- [con]Read Buffer <<[~] } }{ If[buflen] { buflist <- [()]Append[[con]Read Buffer >>] ntoread <- [toread]-[[[con]Read Buffer >>]Length] }{ ntoread <- Val[toread] buflist <- () } ,out con,err <- [[con]Read Buffer <<[Array[]]]_Read[ntoread,buflist] { data <- _Merge Buffers[~] } } } } _Check Partial[buffer,delim,offset:partial,none] { none <- If[[offset]=[[buffer]Length]] {} { left <- [[buffer]Length]-[offset] If[_internal_memcmp_offset[buffer, offset, delim, 0, left]] { partial,none <- _Check Partial[buffer,delim,[offset]+[1]] }{ partial <- offset } } } _Check Buffer[buffer,delim,offset:out,partial,none] { dlen <- [delim]Length blen <- [buffer]Length If[[[offset]+[dlen]]<=[blen]] { If[_internal_memcmp_offset[buffer, offset, delim, 0, dlen]] { out,partial,none <- _Check Buffer[buffer,delim,[offset]+[1]] }{ out <- offset } }{ partial,none <- _Check Partial[buffer,delim,offset] } } _Check Split[old,new,delim,offset:out,not split] { ,not split <- If[[offset]<[[old]Length]] { left <- [[old]Length]-[offset] If[_internal_memcmp_offset[old, offset, delim, 0, left]] { next <- [offset]+[1] }{ If[_internal_memcmp_offset[new, 0, delim, left, [[delim]Length]-[left]]] { next <- [offset]+[1] }{ out <- offset } } out,not split <- _Check Split[old,new,delim,next] } } _Read Delim@TCP Connection[con,delim,poffset,buflist:data,out con,err] { read[[con]Filedes >>, _internal_array_allocnaked[512i32, UInt8()], 512i64] { numread <- Trunc Int32[~] } { outbuf <- [~]Length <<[numread] } If[[numread]=[-1]] { If[Wait for IO[[con]Filedes >>, 1]] { data,out con,err <- [con]_Read Delim[delim,poffset,buflist] }{ err <- [Fold[_Add Len[?], 0, buflist]]+[Length[[con]Read Buffer >>]] } }{ ,checknew <- If[[poffset]>=[0]] { If[[[delim]Length]>[[outbuf]Length]] { //Avoid possibility of having to check across more than 2 buffers data,out con,err <-[[con]Read Buffer <<[ _Merge Buffers[[[buflist]Append[[con]Read Buffer >>]]Append[outbuf]] ] ]Read Delim[delim] }{ ,checknew <- _Check Split[[con]Read Buffer >>, outbuf, delim ,poffset] { before <- [_internal_array_copychunk[[con]Read Buffer >>, 0, _internal_array_allocnaked[~, UInt8()], 0, ~] ]Length <<[~] data <- _Merge Buffers[[buflist]Append[before]] after off <- [[delim]Length]- [[[[con]Read Buffer >>]Length]-[~]] rbufferlen <- [[outbuf]Length]-[after off] out con <- [con]Read Buffer <<[ [_internal_array_copychunk[outbuf, after off, _internal_array_allocnaked[rbufferlen, UInt8()], 0, rbufferlen] ]Length <<[rbufferlen] ] } } } Val[checknew] { ,npoffset <- _Check Buffer[outbuf,delim,0] { before <- [_internal_array_copychunk[outbuf, 0, _internal_array_allocnaked[~, UInt8()], 0, ~] ]Length <<[~] If[[[con]Read Buffer >>]Length] { tomerge <- [[buflist]Append[[con]Read Buffer >>]]Append[before] }{ tomerge <- [buflist]Append[before] } data <- _Merge Buffers[tomerge] rbufferlen <- [[outbuf]Length]-[[~]+[[delim]Length]] out con <- [con]Read Buffer <<[ [_internal_array_copychunk[outbuf, [~]+[[delim]Length], _internal_array_allocnaked[rbufferlen, UInt8()], 0, rbufferlen] ]Length <<[rbufferlen] ] }{ If[[[con]Read Buffer >>]Length] { nbuflist <- [buflist]Append[[con]Read Buffer >>] }{ nbuflist <- Val[buflist] } ncon <- [con]Read Buffer <<[outbuf] }{ If[[[con]Read Buffer >>]Length] { nbuflist <- [[buflist]Append[[con]Read Buffer >>]]Append[outbuf] ncon <- [con]Read Buffer <<[Array[]] }{ nbuflist <- [buflist]Append[outbuf] ncon <- Val[con] } } } data,out con,err <- [ncon]_Read Delim[delim,npoffset,nbuflist] } } Read Delim@TCP Connection[con,delim(Array):data,out con,err] { ,poffset <-_Check Buffer[[con]Read Buffer >>, delim,0] { data <- [_internal_array_copychunk[[con]Read Buffer >>, 0, _internal_array_allocnaked[~, UInt8()], 0, ~]]Length <<[~] rbufferlen <- [[[con]Read Buffer >>]Length]-[[~]+[[delim]Length]] out con <- [con]Read Buffer <<[ [_internal_array_copychunk[[con]Read Buffer >>, [~]+[[delim]Length], _internal_array_allocnaked[rbufferlen, UInt8()], 0, rbufferlen] ]Length <<[rbufferlen] ] }{ buflist <- () ncon <- Val[con] }{ If[[[con]Read Buffer >>]Length] { ncon <- [con]Read Buffer <<[Array[]] buflist <- [()]Append[[con]Read Buffer >>] }{ ncon <- Val[con] buflist <- () } poffset <- -1 } data,out con,err <- [ncon]_Read Delim[delim,poffset,buflist] } Close@TCP Connection[con:out] { out <- close[[con]Filedes >>] } //This effectively leaks a context and thus any data on the stack of that context //Need either handle cleanup of contexts or find a better way to accomplish this Wait Forever[] { Pause[Val[?]] }