
golang版本1.12.9;操作系统:
golang的底层使用epoll来实现IO复用。netPoll通过pollDesc结构体将文件描述符与底层进行了绑定。netpoll实现了用户层面的与底层网络IO相关的goroutine的阻塞/非阻塞管理。
对netpoll的介绍按照这篇文章的思路按照tcp建链中的listen/accept/read/write/close动作详解过程。
下面以TCP为例完整解析TCP的建链/断链以及读写过程
listen流程:
ListenTCP--listenTCP--internetSocket--socket--listenStream
unix的listen函数用于将一个socket转换为监听socket。golang中同时结合了创建socket的步骤。
//src/net/(networkstring,laddr*TCPAddr)(*TCPListener,error){switchnetwork{//支持tcp协议为”tcp4“和“tcp6”,当使用"tcp"时可以通过地址格式进行判断case"tcp","tcp4","tcp6":default:returnnil,OpError{Op:"listen",Net:network,Source:nil,Addr:(),Err:UnknownNetworkError(network)}}//对laddr进行初始化(非nil),用于在socket函数中进入监听处理流程(见下文)ifladdr==nil{laddr=TCPAddr{}}sl:=sysListener{network:network,address:()}ln,err:=((),laddr)iferr!=nil{returnnil,OpError{Op:"listen",Net:network,Source:nil,Addr:(),Err:err}}returnln,nil}func(sl*sysListener)listenTCP(,laddr*TCPAddr)(*TCPListener,error){//此处mode为"listen",可以表示一个tcp服务端;此外还有一个mode为"dial",表示连接的发起端,可以表示一个tcp客户端fd,err:=internetSocket(ctx,,laddr,nil,_STREAM,0,"listen",)iferr!=nil{returnnil,err}returnTCPListener{fd},nil}funcinternetSocket(,netstring,laddr,raddrsockaddr,sotype,protoint,modestring,ctrlFnfunc(string,string,)error)(fd*netFD,errerror){//此处判断mode为"dial"的场景。如果"dial"mode下的远端IP为通配符,则将远端IP转换为本地IP(127.0.0.1或::1),即默认连接本地server。if(=="aix"||=="windows"||=="openbsd"||=="nacl")mode=="dial"(){raddr=(net)}//favoriteAddrFamily函数用于判断地址类型为IPv4还是IPv6,主要用于net为"tcp"时的场景。family,ipv6only:=favoriteAddrFamily(net,laddr,raddr,mode)returnsocket(ctx,net,family,sotype,proto,ipv6only,laddr,raddr,ctrlFn)}//看下这个用于判断地址类型的函数funcfavoriteAddrFamily(networkstring,laddr,raddrsockaddr,modestring)(familyint,ipv6onlybool){//可以看到,如果直接写明"tcp4"或"tcp6"时会直接返回对应的地址类型switchnetwork[len(network)-1]{case'4':_INET,falsecase'6':_INET6,true}//下面用于处理network为"tcp的场景",可以看出直接指定"tcp4"或"tcp6"时可以提高一些执行效率,但可能影响扩展性//如果使用监听模式,且本地没有指定监听地址,需要通过对系统本地地址进行测试来判定使用的IP类型ifmode=="listen"(laddr==nil||()){//supportsIPv4map函数用于测试系统是否支持ipv4MappedIPv6功能。如果系统支持该功能,或者不支持IPv4,则使用IPv6ifsupportsIPv4map()||!supportsIPv4(){_INET6,false}//如果没有指定监听地址,同时不支持ipv4MappedIPv6功能,且支持IPv4,则使用IPv4ifladdr==nil{_INET,false}//通过判断IP地址(长度和格式)来判断所使用的IP类型。实现函数定义在src/net/tcpsock_中(),false}//如果未指定本端和远端地址或明确指定了本端和远端需要的地址类型为IPv4,则使用IPv4。可用于处理"listen"和"dial"mode。//"listen"mode下已经处理了laddr==nil的情况,如果laddr非nil,调用family()函数判断IP类型即可获得IP类型if(laddr==nil||()==_INET)(raddr==nil||()==_INET){_INET,false}//其他情况使用IPv6,如仅支持IPv6的场景_INET6,false}从下面注释中可以看出socket用于返回一个网络描述符。listen场景下,需要使用socket-setsocketopt-bind-listen这几个系统调用来创建一个监听socket
1//socketreturnsanetworkfiledescriptorthatisreadyfor2//asynchronousI/Ousingthenetworkpoller.--此处应该是同步IO吧3funcsocket(,netstring,family,sotype,protoint,ipv6onlybool,laddr,raddrsockaddr,ctrlFnfunc(string,string,)error)(fd*netFD,errerror){4//创建一个socket5s,err:=sysSocket(family,sotype,proto)6iferr!=nil{7returnnil,err8}9//设置socket选项,处理IPv6并设置允许广播10iferr=setDefaultSockopts(s,family,sotype,ipv6only);err!=nil{11(s)12returnnil,err13}14//初始化一个newFD,下面讲解15iffd,err=newFD(s,family,sotype,net);err!=nil{16(s)17returnnil,err18}1920//用于处理TCP或UDP服务端。服务端需要确保本地监听地址非nil(但可以为""),否则会被认为是一个客户端socket。ListenTCP中已经对laddr赋初值21ifladdr!=nilraddr==nil{22switchsotype{23//处理流协议,_STREAM,_SEQPACKET:25iferr:=(laddr,listenerBacklog(),ctrlFn);err!=nil{26()27returnnil,err28}29returnfd,nil30//处理数据报协议,_DGRAM:32iferr:=(laddr,ctrlFn);err!=nil{33()34returnnil,err35}36returnfd,nil37}38}39//处理非监听socket场景,即客户端发起连接40iferr:=(ctx,laddr,raddr,ctrlFn);err!=nil{41()42returnnil,err43}44returnfd,nil45}golang的系统调用
以上述第5行代码中创建一个socket为例,看golang如何通过系统调用产生一个socket。sysSocket中过的调用链如下:
。socket函数内容如下,实际通过调用RawSyscall来运行系统调用,其系统调用ID为SYS_SOCKET,值为41
funcsocket(domainint,typint,protoint)(fdint,errerror){r0,_,e1:=RawSyscall(SYS_SOCKET,uintptr(domain),uintptr(typ),uintptr(proto))fd=int(r0)ife1!=0{err=errnoErr(e1)}return}golang中的系统调用定义在src/syscall目录下,redhat系统对应的文件为zsysnum_linux_。
操作系统中的系统调用ID定义在/usr/include/asm/中(内容如下),其中32位系统使用asm/unistd_32.h,64位系统使用asm/unistd_64.h。
如golang中的SYS_SOCKET系统调用ID为41,对应redhat系统的定义为ifndef_ASM_X86_UNISTD_Hdefine__X32_SYSCALL_BIT0x40000000includeasm/unistd_32.hincludeasm/unistd_/unistd_64.hif/*_ASM_X86_UNISTD_H*/
golang可以通过如下4个函数来执行系统调用,后缀带"6"的表示有6个入参,不带"6"的表示有4个入参。(redhat系统的实现定义在src/syscall/asm_linux_中)
golang的系统调用可以简单分为:阻塞系统调用,非阻塞系统调用和wrapped系统调用。wrapped系统调用就是自己封装的,如下文的epoll相关的系统调用就属于这一类。
funcSyscall(trap,a1,a2,a3uintptr)(r1,r2uintptr,errErrno)funcSyscall6(trap,a1,a2,a3,a4,a5,a6uintptr)(r1,r2uintptr,errErrno)funcRawSyscall(trap,a1,a2,a3uintptr)(r1,r2uintptr,errErrno)funcRawSyscall6(trap,a1,a2,a3,a4,a5,a6uintptr)(r1,r2uintptr,errErrno)
Syscall与RawSyscall的有如下区别(以下内容来自golangsyscall原理),通常系统调用使用Syscall,防止阻塞同一个P中的其他goroutine的执行
Syscall在进入系统调用的时候,调用了runtime·entersyscall(SB)函数,在结束系统调用的时候调用了runtime·exitsyscall(SB)。做到进入和退出syscall的时候通知runtime。
这两个函数runtime·entersyscall和runtime·exitsyscall的实现在文件里面。其实在runtime·entersyscall函数里面,通知系统调用时候,是会将g的M的P解绑,P可以去继续获取M执行其余的g,这样提升效率。所以如果用户代码使用了RawSyscall来做一些阻塞的系统调用,是有可能阻塞其它的g的。RawSyscall只是为了在执行那些一定不会阻塞的系统调用时,能节省两次对runtime的函数调用消耗
RawSyscall的源码如下,第一个参数表示系统调用ID,其余为该系统调用所需的参数。整体上比较简单,3~8行将参数传到寄存器,10行执行系统调用,生成socket,后续就是用于处理错误码和返回值
1//funcRawSyscall(trap,a1,a2,a3uintptr)(r1,r2,erruintptr)2TEXT·RawSyscall(SB),NOSPLIT,$0-563MOVQa1+8(FP),DI4MOVQa2+16(FP),SI5MOVQa3+24(FP),DX6MOVQ$0,R107MOVQ$0,R88MOVQ$0,R99MOVQtrap+0(FP),AX//syscallentry10SYSCALL11CMPQAX,$0xfffffffffffff00112JLSok113MOVQ$-1,r1+32(FP)14MOVQ$0,r2+40(FP)15NEGQAX16MOVQAX,err+48(FP)17RET18ok1:19MOVQAX,r1+32(FP)20MOVQDX,r2+40(FP)21MOVQ$0,err+48(FP)22RET
socket代码就是调用了RawSyscall函数来执行SYS_SOCKET类型的系统调用,由于socket系统调用不会阻塞,因此可以使用RawSyscall。
-----------------------------------------------------------------------------------------------------------------------
继续listen流程,在创建完socket并设置socket选项后,进入流处理环节。需要注意的是,此处用到了newFD函数初始化的变量fd。
func(fd*netFD)listenStream(laddrsockaddr,backlogint,ctrlFnfunc(string,string,)error)error{varerrerror//此处设置了监听socket所使用的选项,允许地址socket重用,实际中并不推荐使用socket地址重用。iferr=setDefaultListenerSockopts();err!=nil{returnerr}//构建一个实现了结构的结构体,如//,err=();err!=nil{returnerr}//ctrlFn在bind前调用,可以用于设置socket选项。此处为nil。用法可以参考net/listen_中的TestListenConfigControl函数ifctrlFn!=nil{c,err:=newRawConn(fd)iferr!=nil{returnerr}iferr:=ctrlFn((),(),c);err!=nil{returnerr}}//为socket绑定地址iferr=(,lsa);err!=nil{("bind",err)}//使用系统调用SYS_LISTEN,第二个参数表示监听队列大小,来自"/proc/sys/net/core/somaxconn"或(参见src/net/sock_)//该函数等同于系统调用://define__NR_epoll_create1291defineSYS_epoll_ctl233defineSYS_epoll_create1291
TEXTruntime·epollcreate1(SB),NOSPLIT,$0MOVLflags+0(FP),DIMOVL$SYS_epoll_create1,AXSYSCALLMOVLAX,ret+8(FP)RET
在创建完epoll之后,注册epoll事件
funcpoll_runtime_pollOpen(fduintptr)(*pollDesc,int){//获取一个pollDesc。pollcache为全局pd链表,可以为listen和accept过程提供pd。pd:=()lock()//pd全局链表中的节点都应该是可用或初始状态,0为初始化状态!=0!=pdReady{throw("runtime:blockedwriteonfreepolldesc")}!=0!=pdReady{throw("runtime:blockedreadonfreepolldesc")}//pollDesc中保存了文件描述符=fd//此处将pd的状态初始化为false,表示该pd可用=++=0=0++=0=0unlock()varerrnoint32//注册文件描述符到epoll句柄中errno=netpollopen(fd,pd)returnpd,int(errno)}poll节点的申请流程如下
//poll其实存储在一个链表中,指向下一个未使用的节点。在非空时,可以直接返回该节点
//+-----++-----++-----++-----++-----+
//|pd3+--+pd2+---+pd1+--+pd0+---+nil|
//+-----++--^--++-----++-----++-----+
//|
//+---------+|
//|+--+
//+---------+
func(c*pollCache)alloc()*pollDesc{lock()==nil{constpdSize=(pollDesc{})//首次会创建1024个节点n:=pollBlockSize/pdSizeifn==0{n=1}//Mustbeinnon-GCmemorybecausecanbereferenced//onlyfromepoll/kqueueinternals.//调用malloc申请内存mem:=persistentalloc(n*pdSize,0,_sys)//通过指针移动来获取节点并将节点串成链表。需要注意的是,末节点的link指向nilfori:=uintptr(0);in;i++{pd:=(*pollDesc)(add(mem,i*pdSize))==pd}}//此处有2个作用:如果非空,则返回指向的节点,并将指向下一个可用节点;初始化链表后,指向的是首节点,将其指向首节点之后//的节点并返回首节点使用。如果链表节点全部被使用,会重新创建1024个节点//当删除注册的event事件时,会回收该节点.pd:==()returnpd}netpollopen将文件描述符注册到epoll中,此处可以看到pollDesc的作用是作为epollctl的参数成员,即用户数据部分。runtime/netpoll_的netpoll函数中会处理此处注册的事件和数据
funcnetpollopen(fduintptr,pd*pollDesc)int32{varevepollevent//设置触发事件对应描述符上有可读数据|对应描述符上有可写数据|描述符被挂起|设置为边缘触发模式(仅在状态变更时上报一次事件)//epoll的事件可以参考epoll文档=_EPOLLIN|_EPOLLOUT|_EPOLLRDHUP|_EPOLLET//构造epoll的用户数据*(**pollDesc)(())=pd//传给epollctl的最后一个参数ev有2个数据,events和data,对应系统调用函数epoll_ctl的最后一个入参//structepoll_event{//__uint32_tevents;/*Epollevents*///epoll_data_tdata;/*Userdatavariable*///};return-epollctl(epfd,_EPOLL_CTL_ADD,int32(fd),ev)}runtime_pollUnblock的实现函数为poll_runtime_pollUnblock。获取pd读/写阻塞的goroutine并将其状态切换为runnable,poll_runtime_pollUnblock函数一般在关闭连接时使用。
pollDesc结构体中的rg和wg比较难理解,它们与netpoll相关,将底层缓存区的读写情况反映为当前读写对应的goroutine的状态。当读缓存区没有数据时,会导致rg阻塞(非pdReady),此时调用netpollunblock返回的为读操作所在的goroutine;而当执行write操作时,如果缓存区没有空间,此时会导致wg阻塞,此时调用netpollunblock返回的为写操作所在的goroutine。在非阻塞时,rg/wg表示当前读/写goroutine状态,pdReady表示可以进行读/写操作,pdWait表示当前goroutine将会被park(调用gopark)住。
注意:用户读写操作可以使用同一个goroutine。
funcpoll_runtime_pollUnblock(pd*pollDesc){lock(){throw("runtime:unblockonclosingpolldesc")}//将pd状态置为closing,poll_runtime_pollClose会停止并回收=++++varrg,wg*(noescape((rg)),nil)//fullmemorybarrierbetweenstoretoclosingandreadofrg/wginnetpollunblock//获取读写对应的goroutine。因此此处ioready设置为false,表示非底层IO的操作,底层epoll上报事件后,会通过调用//netpollunblock函数,此时netpollunblock的第三个参数会变为true,用于将对应事件的goroutine变为非阻塞,处理epoll的读写事件rg=netpollunblock(pd,'r',false)wg=netpollunblock(pd,'w',false)//和都是定时器超时后执行的函数,如果这些函数非空,则清除定时器并置为初始值nil(后续pd需要回收)。//定时器用于设置读写连接的deadline时间点,参见本文末的内容!=nil{deltimer()=nil}!=nil{deltimer()=nil}unlock()//此处才是真正unblock阻塞的goroutine,netpollgoready会调用goready函数将阻塞的goroutine变为runnable状态,继续执行。//此处的unblock操作对应调用netpollblock函数gopark的goroutine,如等待Accept,等待Read等。当Accept有连接到达或Read有数据读取//时,这时需要unblock对应的goroutine继续处理(当然也包括处理错误场景)ifrg!=nil{netpollgoready(rg,3)}ifwg!=nil{netpollgoready(wg,3)}}netpollunblock等待返回读/写操作block的goroutine,如果读/写状态为pdReady(即非阻塞)或初始化状态则返回一个空指针,表示该goroutine没有被阻塞,可直接使用;反之返回一个阻塞的goroutine,golang调用netpollready函数可将其变为runnable。ioready表示是否由底层发起的调用,如果是则需要置为true。只有通过底层epoll事件通知的场景下才会置为true。
ps:这个函数的名字有点奇怪,它并不能主动unblockgoroutine
funcnetpollunblock(pd*pollDesc,modeint32,ioreadybool)*g{gpp:==='w'{gpp=}//使用for循环用于执行原子操作。一个pd对于一条连接,而一条连接可能被多个goroutine操作。for{old:=*gpp//如果gpp为pdReady,则对应的goroutine为unblock状态,返回即可ifold==pdReady{returnnil}//此处用于初始状态时的场景,此时并没有调用netpollblock阻塞goroutine,直接返回即可ifold==0!ioready{//_pollWait//willcheckfortimeout/}varnewuintptrifioready{new=pdReady}//的实现在runtime/internal/atomic/asm_${plantform}.s中,如下处理逻辑为//if(*val==*old){//*val=new;//return1;//}else{//return0;//}//该函数的实现了自旋锁,for循环执行该原子操作。这里的原子循环应该是多goroutine操作同一个pd场景下等待一个相对稳定的状态,因为按照本//函数代码逻辑来看,*gpp一定等于*(gpp,old,new){//old==pdReady的判断应该不会被执行,如果old为pdReady,上面代码已经直接返回了ifold==pdReady||old==pdWait{old=0}//返回阻塞在/上的goroutine地址return(*g)((old))}}}runtime_pollClose实际调用的函数为poll_runtime_pollClose,用于删除注册的事件并回收pd节点
funcpoll_runtime_pollClose(pd*pollDesc){if!{throw("runtime:closepolldescw/ounblock")}//执行本函数前需要调用netpollunblock将goroutine变为非阻塞状态,以便回收pd节点!=0!=pdReady{throw("runtime:blockedwriteonclosingpolldesc")}!=0!=pdReady{throw("runtime:blockedreadonclosingpolldesc")}//调用_EPOLL_CTL_DEL删除注册的epoll事件netpollclose()//回收fd节点(pd)}golang中使用epoll的方式比较巧妙,也比较奇怪。从上面流程可以看出,创建epoll和注册epoll事件时,通过对API层层调用可以看到其运行了系统调用和,但没有直接用到,对Accept,Read等的阻塞是通过poll_runtime_pollWait-netpollblock-gopark阻塞goroutine来实现的,即通过gopark阻塞对应的协程。
是在中调用的,而的是在单独的线程中运行的。
funcnetpoll(blockbool)gList{ifepfd==-1{returngList{}}waitms:=int32(-1)if!block{waitms=0}varevents[128]epolleventretry://这里运行系统调用阻塞等待epfd上发生的事件n:=epollwait(epfd,events[0],int32(len(events)),waitms)ifn0{ifn!=-_EINTR{println("runtime:epollwaitonfd",epfd,"failedwith",-n)throw("runtime:netpollfailed")}gotoretry}vartoRungList//epoll可能一次性上报多个事件fori:=int32(0);in;i++{ev:=events[i]==0{continue}varmodeint32//底层可读事件,其他事件(如_EPOLLHUP)同时涉及到读写,因此读写的goroutine都需要通知(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR)!=0{mode+='r'}//底层可读事件(_EPOLLOUT|_EPOLLHUP|_EPOLLERR)!=0{mode+='w'}ifmode!=0{pd:=*(**pollDesc)(())//这里才是底层通知读写事件来unblock(Accept/Read等)协程的地方。netpollready会返回pd对应的读写goroutine链表(),//最终在函数退出后返回给函数调度。此处golangruntime会将/wg设置为pdreadynetpollready(toRun,pd,mode)}}(){gotoretry}returntoRun}这也是为什么使用用户API执行的协程需要使用pdReady和pdWait来表示协程状态的原因,因为其无法直接获得epoll_wait的事件信息。
-----------------------------------------------------------------------------------------------------------------------
accept流程
上面基本已经分析了epoll的所有底层流程,后续的就比较简单了。
可以看到Accept使用了listen阶段生成的netFD(TCPListener),接收监听socket的TCP连接。accept返回一个TCP连接,就可以在该TCP上进行读写操作,连接类型为TCPConn。每accept一个连接会创建一个新的goroutine,并调用internal/_pollWait来等待读事件
typeTCPConnstruct{conn}typeTCPListenerstruct{fd*netFD}func(l*TCPListener)Accept()(Conn,error){//如果监听socket已经释放,则无法继续执行accept,返回错误if!(){returnnil,}c,err:=()iferr!=nil{returnnil,OpError{Op:"accept",Net:,Source:nil,Addr:,Err:err}}returnc,nil}中会返回建立的TCP连接,后续可以在该连接上执行读写操作
func(ln*TCPListener)accept()(*TCPConn,error){fd,err:=()iferr!=nil{returnnil,err}//封装并返回建立好的tcp连接。newTCPConn同时会设置TCP_NODELAY选项来禁止Nagle算法returnnewTCPConn(fd),nil}func(fd*netFD)accept()(netfd*netFD,errerror){//核心处理函数就是,返回tcp连接的文件描述符。具体见下文d,rsa,errcall,err:=()iferr!=nil{iferrcall!=""{err=wrapSyscallError(errcall,err)}returnnil,err}//封装返回的tcp连接参数,netfd后续会被newTCPConn封装。ps:此处的返回值判断应该无用,newFD不会返回非nil的错误码ifnetfd,err=newFD(d,,,);err!=nil{(d)returnnil,err}//初始化一个pollDesc并注册epoll事件通知,与listen不同,此处用于注册用户连接上的IO读写事件iferr=();err!=nil{//ps:这个地方应该closeaccept的连接,提了个issue,官方已经更新()returnnil,err}//设置netfd中表示本/对端变量的地址lsa,_:=()(()(lsa),()(rsa))returnnetfd,nil}func(fd*FD)Accept()(int,,string,error){//注意此处的(),该函数会增加的引用计数。此处的fd即监听socket,如果监听socket关闭将无法进行readLock操作,//会直接返回错误,即无法创建新的连接。//在accept的连接上进行读操作会增加该连接的引用计数iferr:=();err!=nil{return-1,nil,"",err}//此处减少的引用计数。accept建立后的连接并不受监听socket控制,即使close监听socket,已有的连接会不会被关闭。//此处的readUnlock还有一个作用,在accept失败后,会在引用计数为0且状态为mutexClosed时会调用()函数close该连接。//从代码层面看,只能通过(net/fd_)将网络描述符的设置为mutexClose,该函数为对外API.//如果未关闭,则此处仅仅减少引用计数。()//初始化监听socket的pd,因为监听socket只有读操作,仅初始化。//初始化的原因是当前goroutine没有被阻塞,清除pd上的标记,进入poll_runtime_pollWait等待epoll事件唤醒goroutine,//防止因为残留标记导致虚假唤醒。//需要注意,prepare类函数,如prepareRead/prepareWrite会在其prepare-runtime_pollReset-poll_runtime_pollReset-netpollcheckerr中//对deadline(参见下文)时间进行检验,如果deadline的时间早于当前时间会导致read/write/accept等操作无法阻塞,直接返回timeouterror//funcnetpollcheckerr(pd*pollDesc,modeint32)int{//{//return1//errClosing//}//if(mode=='r')||(mode=='w'){//return2//errTimeout//}//return0//}iferr:=();err!=nil{return-1,nil,"",err}for{//调用系统函数返回accept的连接s,rsa,errcall,err:=accept()iferr==nil{returns,rsa,"",err}switcherr{//如果没有连接到来且使用了epoll,则调用runtime_pollWaitd等待新的连接。:(){iferr=();err==nil{continue}}//此处用于处理连接中客户端断开情况,建链过程中客户端发送RST报文://Thismeansthatasocketonthelisten//queuewasclosedbeforeweAccept()edit;//it'sasillyerror,}return-1,nil,errcall,err}}如上所述,runtime_pollWaitd并没有运行epollwait系统调用,它通过判断并循环等待goroutine变为pdReady。
funcpoll_runtime_pollWait(pd*pollDesc,modeint)int{//判断连接是否已经超时或关闭err:=netpollcheckerr(pd,int32(mode))iferr!=0{returnerr}//=="solaris"||GOOS=="aix"{netpollarm(pd,mode)}for!netpollblock(pd,int32(mode),false){//在超时和关闭情况下无需等待,返回错误err=netpollcheckerr(pd,int32(mode))iferr!=0{returnerr}//Canhappeniftimeouthasfiredandunblockedus,//butbeforewehadachancetorun,timeouthasbeenreset.//Pretithasnothappenedandretry.}return0}netpollblock与netpollunblock对应,前者调用gopark函数阻塞goroutine,后者结合goread函数unparkgoroutine。netpollblock的返回值用于判断处理的goroutine是否为pdReady。从代码实现来看,netpollblock的目的是park一个非pdReady的goroutine,而非直接pack一个goroutine。park一个pdReady的goroutine是不合理的,有可能该goroutine正在进行读写操作。
netpollblock首先将pd中对应mode(读/写)的goroutine状态设置为pdWait,然后park该goroutine,用法与pdWait的定义一致
funcnetpollblock(pd*pollDesc,modeint32,waitiobool)bool{gpp:==='w'{gpp=}//设置gpp状态为pdWaitfor{old:=*gppifold==pdReady{*gpp=0returntrue}ifold!=0{throw("runtime:doublewait")}(gpp,0,pdWait){break}}//needtorecheckerrorstatesaftersettinggpptoWAIT//thisisnecessarybecauseruntime_pollUnblock/runtime_pollSetDeadline/deadlineimpl//dotheopposite:storetoclosing/rd/wd,membarrier,loadofrg/wg//waitio为true可用于等待ioReadyifwaitio||netpollcheckerr(pd,mode)==0{//此处调用gopark阻塞goroutine。gopark返回可能是goroutine变为非阻塞,也可能由于其他原因(如close,timeout等)返回gopark(netpollblockcommit,(gpp),waitReasonIOWait,traceEvGoBlockNet,5)}//becarefultonotloseconcurrentREADYnotificationold:=(gpp,0)ifoldpdWait{throw("runtime:corruptedpolldesc")}returnold==pdReady}read流程
有了上面的基础,read和write就非常简单了。
func(c*conn)Read(b[]byte)(int,error){if!(){return0,}n,err:=(b)iferr!=nilerr!={err=OpError{Op:"read",Net:,Source:,Addr:,Err:err}}//用户需要处理返回错误的情况,如关闭连接returnn,err}func(fd*netFD)Read(p[]byte)(nint,errerror){n,err=(p)//与GC相关(fd)returnn,wrapSyscallError("read",err)}可以看到read和accept的代码逻辑基本一致
func(fd*FD)Read(p[]byte)(int,error){//增加fd的引用计数iferr:=();err!=nil{return0,err}//与accept类似,此处通常仅用于减少fd的引用计数,在read失败后需要手动close连接()iflen(p)==0{//Ifthecallerwantedazerobyteread,returnimmediately//withouttrying(butafteracquiringthereadLock).//,nilwhichlookslike////TODO(bradfitz):makeitwaitforreadability?(Issue15735)return0,nil}iferr:=();err!=nil{return0,err}(p)maxRW{p=p[:maxRW]}for{//如果读取到数据,则直接返回n,err:=(,p)iferr!=nil{n=0//如果暂时没有数据,且使用epoll,则阻塞等待epoll的读事件通知iferr==(){iferr=();err==nil{continue}}//OnMacOSwecanseeEINTRhereiftheuser//pressed^#22838.=="darwin"err=={continue}}err=(n,err)returnn,err}}write流程
write与read类似,在遇到IO阻塞时都需要调用runtime_pollWait等待epoll事件。不同点在于,read在有数据时会一次性读完,而write则需要判断底层是否有足够的空间来写入数据
func(c*conn)Write(b[]byte)(int,error){if!(){return0,}n,err:=(b)iferr!=nil{err=OpError{Op:"write",Net:,Source:,Addr:,Err:err}}returnn,err}func(fd*netFD)Write(p[]byte)(nnint,errerror){nn,err=(p)(fd)returnnn,wrapSyscallError("write",err)}func(fd*FD)Write(p[]byte)(int,error){iferr:=();err!=nil{return0,err}()iferr:=();err!=nil{return0,err}varnnint//循环写入数据,发送数据的大小受发送缓存区限制,如果发送数据过大,则需要分多次发送for{max:=len(p)//从maxRW的注释中可以看到Darwin和FreeBSD不允许一次性读写超过2G的数据。此处表示当写数据超过2G时,仅写入前2G数据//read函数中没有此限制,原因是在网络上读取数据时,socket读缓存区大小远远小于2G。{max=nn+maxRW}//返回发送成功的字节数n,err:=(,p[nn:max])ifn0{nn+=n}//只有发送失败或所有数据发送成功才算write结束ifnn==len(p){returnnn,err}//遇到写缓存区满且使用epoll时,等待epoll上报缓存区有空间事件iferr==(){iferr=();err==nil{continue}}iferr!=nil{returnnn,err}ifn==0{returnnn,}}}设置连接的deadline
可以通过如下参数设置不同的连接终止时间,底层都调用了函数。设置t为0表示永不超时。具体用法可以参见(net/)接口注释。
func(fd*FD)SetDeadline()errorfunc(fd*FD)SetReadDeadline()errorfunc(fd*FD)SetWriteDeadline()error
以上函数调用setDeadlineImpl实现
funcsetDeadlineImpl(fd*FD,,modeint)error{vardint64//如果设置了连接deadline时间,计算到deadline的时间差值。此处主要做一个预处理if!(){d=int64((t))//这里表示deadline时间点为当前时间,则设置为-1ifd==0{d=-1//don'tconfusedeadlinerightnowwithnodeadline}}iferr:=();err!=nil{returnerr}()==0{returnErrNoDeadline}//此处调用函数对定时器进行处理runtime_pollSetDeadline(,d,mode)returnnil}poll_runtime_pollSetDeadline中当到达deadline后执行如下函数,netpolldeadlineimpl实际执行的就是在deadline到期后运行netpollunblock+netpollgoready将阻塞的goroutine变为非阻塞,这会导致返回timeoutio错误
funcpoll_runtime_pollSetDeadline(pd*pollDesc,dint64,modeint){lock(){unlock()return}rd0,wd0:=,:=rd00rd0==wd0ifd0{//获取deadline时间d+=nanotime()//从注释看,这种情况表示deadline时间小于等于当前时间。将deadline时间设置为64bit的最大值ifd=0{//Iftheuserhasadeadlineinthefuture,butthedelaycalculation//overflows,=163-1}}//按照不同mode设置读写对应的deadline时间ifmode=='r'||mode=='r'+'w'{=d}ifmode=='w'||mode=='r'+'w'{=d}//combo用于表示仅设置了读deadline还是同时设置了读写deadlinecombo:===//读场景下deadline时间点执行的函数rtf:=netpollReadDeadline//读场景下deadline时间点执行的函数ifcombo{rtf=netpollDeadline}//如果没有设置读deadline时间点运行的函数,则使用默认的函数==nil{{//设置deadline时间点,以及到时后运行的函数,函数参数等==//Copycurrentseqintothetimerarg.//Timerfuncwillchecktheseqagainstcurrentdescriptorseq,//==//添加时间并启动定时任务addtimer()}//此处用于处理仅读deadline的情况}!=rd0||combo!=combo0{++//{//由于使用了自定义的deadline处理函数,此处调用modtimer重新赋值并启动定时任务modtimer(,,0,rtf,pd,)}else{//此处表示入参t为0的情况,即永不超时,删除定时器和定时器函数deltimer()=nil}}//此处处理写deadline相关的情况,与读类似==nil{!combo{====()}}!=wd0||combo!=combo0{++//!combo{modtimer(,,0,netpollWriteDeadline,pd,)}else{deltimer()=nil}}//Ifwesetthenewdeadlineinthepast,,wg*g//如果deadline时间点早于当前时间,则unblockpd的所有IO,返回timeoutIO错误||{(noescape((wg)),nil)//fullmemorybarrierbetweenstorestord/wdandloadofrg/{rg=netpollunblock(pd,'r',false)}{wg=netpollunblock(pd,'w',false)}}unlock()ifrg!=nil{netpollgoready(rg,3)}ifwg!=nil{netpollgoready(wg,3)}}setDeadLine主要是防止服务端阻塞等待导致的大量冗余连接(长连接),参见Gonet/http超时机制完全手册
TIPS:
epoll的用法可以参见这里

参考:
Golangnetpoll
netpoll
goroutine的状态切换
TheGonetpoller
一个EOF引发的探索之路之三(golang锁源码探索篇)
epoll官方文档
作者:charlieroro
出处: