近些年来,各种类型的产品得到充足的发展,交互性和复杂度都在迅速提高,都需要在极短的时间内将数据同 时投递给大量用户,因此传输技术自然变为未来制约发展的一个重要因素。在此之前对于通信协议首选TCP, 而今因为TCP的种种限制,UDP得到了很多开发人员的青睐,并在UDP的基础上开发出了众多的可靠算法,如 QUIC、KCP等,在此基础上对于UDP的关注面跨越了可靠性,进一步考虑放大UDP的通信能力,典型如何在短 时间内快速接收和处理超大量的数据包。在此之前,我曾研究UDP许多时日,试图在Go中最大化UDP通信能力;本文内容源自于对于对高数量级UDP通 信能力的优化经验,不同于形容TCP通信能力的单位,针对UDP的特性选择以“每秒多少个包”(PPS)来作为通 信能力单位更具有现实意义。
实现最简epoll
对于UDP而言,严格来讲并不需要自己额外再实现epoll,但为了利用多核性能配合端口重用做到“多线程”绑定 统一地址端口,实现简化epoll是很必要的。与 *unix 相关的内容在 golang.org/x/sys/unix 包中,采用系统调用方式简化epoll。对于epoll的使用,主要 在三个API中:
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int max_events, int
timeout);
那么只需要在Go中调用到这三个API即可。
func PollerInit() (*Poller, error) {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, os.NewSyscallError("epoll_create1", err)
}
poller := &Poller{
fd: fd,
}
return poller, nil
}
正常情况下使用epoll的流程是拿到fd之后可添加、修改或删除触发实现,为了实现方便,将 epoll_ctl 的调用 改成了如下实现:
func (poller *Poller) Add(fd int, ev string) error {
e := &unix.EpollEvent{
Fd: int32(fd),
}
switch ev {
case "r":
e.Events = unix.EPOLLIN
case "w":
e.Events = unix.EPOLLOUT
case "rw":
e.Events = unix.EPOLLIN | unix.EPOLLOUT
default:
return fmt.Errorf("unknow epoll event type")
}
return os.NewSyscallError("epoll_ctl add", unix.EpollCtl(poller.fd,
unix.EPOLL_CTL_ADD, fd, e))
}
func (poller *Poller) Mod(fd int, ev string) error {
e := &unix.EpollEvent{
Fd: int32(fd),
}
switch ev {
case "r":
e.Events = unix.EPOLLIN
case "w":
e.Events = unix.EPOLLOUT
case "rw":
e.Events = unix.EPOLLIN | unix.EPOLLOUT
default:
return fmt.Errorf("unknow epoll event type")
}
return os.NewSyscallError("epoll_ctl mod", unix.EpollCtl(poller.fd,
unix.EPOLL_CTL_MOD, fd, e))
}
func (poller *Poller) Del(fd int) error {
return os.NewSyscallError("epoll_ctl del", unix.EpollCtl(poller.fd,
unix.EPOLL_CTL_DEL, fd, nil))
}
接下来是epoll_wait的调用,对应的事件通过回调函数返回给上层:
func (poller *Poller) Polling(eventHandler func(fd int32, ev uint32)) error {
evs := make([]unix.EpollEvent, EPollEventSize)
for {
n, err := unix.EpollWait(poller.fd, evs, 0)
if err != nil {
log.Printf("epoll_wait err: %v", err)
}
if n < 0 && err == unix.EINTR {
continue
}
if err != nil {
return os.NewSyscallError("epoll_wait", err)
}
for i := 0; i < n; i++ {
eventHandler(evs[i].Fd, evs[i].Events)
}
}
}
端口重用:SO_REUSEPORT
SO_REUSEPORT是linux 3.9版本新添加的,支持多个进程或线程绑定到同一地址端口。有了该选项之后,每个进 程或者线程都有属于自己的server socket,避免锁的竞争,可以充分利用到CPU多核资源。有了该选项之后, 可以考虑这样一种结构:每个goroutine拥有一个server socket,对应的拥有epoll fd,实际上就是epoll-per-goroutine结构,最大化利用CPU多核资源:
代码上实现起来相当简单,遵照SO_REUSEPORT的使用规则即可:
func NewUDPSocket(network, addr string, reusePort bool) (int, unix.Sockaddr,
error) {
var sa unix.Sockaddr
udpAddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
return 0, nil, fmt.Errorf("resolve addr err: %v", err)
}
netFamily := unix.AF_INET
if udpAddr.IP.To4() == nil {
netFamily = unix.AF_INET6
}
// listen socket fd
syscall.ForkLock.Lock()
fd, err := unix.Socket(netFamily,
unix.SOCK_DGRAM|unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC, unix.IPPROTO_UDP)
if err == nil {
unix.CloseOnExec(fd)
}
syscall.ForkLock.Unlock()
defer func() {
if err != nil {
_ = unix.Close(fd)
}
}()
switch network {
case "udp":
sockaddr := &unix.SockaddrInet4{}
sockaddr.Port = udpAddr.Port
sa = sockaddr
case "udp4":
sockaddr := &unix.SockaddrInet4{}
sockaddr.Port = udpAddr.Port
copy(sockaddr.Addr[:], udpAddr.IP.To4())
sa = sockaddr
case "udp6":
// IPv6 zone
sockaddr := &unix.SockaddrInet6{}
copy(sockaddr.Addr[:], udpAddr.IP.To16())
if udpAddr.Zone != "" {
var iface *net.Interface
iface, err = net.InterfaceByName(udpAddr.Zone)
if err != nil {
return 0, nil, fmt.Errorf("parse UDPAddr.Zone err: %v", err)
}
sockaddr.ZoneId = uint32(iface.Index)
}
sockaddr.Port = udpAddr.Port
netFamily = unix.AF_INET6
default:
return 0, nil, fmt.Errorf("not support network")
}
if reusePort {
if err = os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd,
unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)); err != nil {
return 0, nil, err
}
}
if err = os.NewSyscallError("bind", unix.Bind(fd, sa)); err != nil {
return 0, nil, err
}
return fd, sa, nil
}
使用recvmmsg代替recvmsg
我们知道在调用recvmsg时会将收到的数据从内核空间拷贝至用户空间,每调用一次就会产生一次内核开销, 短时间内接收超大量的数据包累积起来的内核开销也很可观了,所以从linux 2.6.33开始,新增了 recvmmsg ,允许用户一次性接收多个数据包,对于 recvmmsg 的说明可以参考:recvmmsg document,这里主要说下如何在Go中调用 recvmmsg。 recvmmsg 依赖以下几个结构:
#include <sys/socket.h>
struct mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of received bytes for header */
};
struct iovec { /* Scatter/gather array items */
void *iov_base; /* Starting address */
size_t iov_len; /* Number of bytes to transfer */
};
struct msghdr {
void *msg_name; /* Optional address */
socklen_t msg_namelen; /* Size of address */
struct iovec *msg_iov; /* Scatter/gather array */
size_t msg_iovlen; /* # elements in msg_iov */
void *msg_control; /* Ancillary data, see below */
size_t msg_controllen; /* Ancillary data buffer len */
int msg_flags; /* Flags on received message */
};
带外数据不在本文考虑范围内,因此 msg_control 和 msg_controllen 可以忽略,其中 iovec 为接收数据缓冲 区,本质上 recvmmsg 的传入参数就是mmsghdr数组,数据长度即为期望收到多少个数据包,理解这个结构之 后就好办了,我们可以构建在Go中构建对应的数据结构通过 unix.syscall6 实现调用 recvmmsg 。调用 recvmmsg 之前
func prepare(n, mtu int) ([]mmsghdr, [][]byte, [][]byte) {
mms := make([]mmsghdr, n)
buffers := make([][]byte, n)
names := make([][]byte, n)
for i := range mms {
buffers[i] = make([]byte, mtu)
names[i] = make([]byte, sizeofSockaddrInet6)
v := []iovec{
{Base: (*byte)(unsafe.Pointer(&buffers[i][0])), Len:
uint64(len(buffers[i]))},
}
mms[i].Hdr.Iov = &v[0]
mms[i].Hdr.Iovlen = uint64(len(v))
mms[i].Hdr.Name = (*byte)(unsafe.Pointer(&names[i][0]))
mms[i].Hdr.Namelen = uint32(len(names[i]))
// ignore mms[i].Hdr.Control and mms[i].Hdr.Controllen
}
return mms, buffers, names
}
调用 recvmmsg :
func (rw *ReaderWriter) read() (int, error) {
n, _, err := unix.Syscall6(unix.SYS_RECVMMSG, uintptr(rw.fd),
uintptr(unsafe.Pointer(&rw.msgs[0])), uintptr(len(rw.msgs)),
unix.MSG_WAITFORONE,
0, 0,
)
if err != 0 {
if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
return 0, nil
}
return 0, os.NewSyscallError("recvmmsg", fmt.Errorf("%v",
unix.ErrnoName(err)))
}
return int(n), nil
}
至于 mmsghr 结构,可以参考:
golang.org/x/net/internal/socket/zsys_linux_amd64.go 。至于从 mmsghdr 结构中解析到远端地址和数据,可翻阅linux文档。
网卡多队列绑定CPU核心优化
我们知道,如果一个socket的所有操作都固定在某个CPU核心上是能获得一定的性能提升,如果网卡支持多队 列可以尝试这样一种优化方案:将网卡多队列均匀绑定到CPU多核心上,同时设置 SO_INCOMING_CPU 属性,将 socket的处理与某个CPU核心绑定,同时逻辑线程与某个CPU核心进行亲和性绑定,最终的结果是:某个逻辑线 程上总是处理特定的socket操作,简单来说就是路宽了,每条路上都井然有序,拥挤程度降级,性能得到提 升。那么在Go中能否实现这种优化方案?很可惜我没有找到明确的方法实施这种方案,主要原因是Go刻意弱化了线 程概念和操作,在Go中无法直接设置线程和CPU核心的亲和性以实现上述目的,有线索的同学可以指点一下。即便如此,但将网络多队列均匀到CPU多核心上是具有意义的。在实际测试中发现,偶尔会出现吞吐量下降, 重现率不高,偶然发现是某个CPU核心压力过高,查了网卡队列数据流向之后发现某些核心比较繁忙,开启网 卡多队列绑定到各个CPU核心上之后再次测试各个核心的压力都比较均匀,不至于会出现某个核心压力过高影 响runtime调度。关于这部分内容,这篇文章说的比较好可以作为优化参考:TCP加速技术解决方案
考虑CPU Cache
在我们的代码结构上实现了类似事件循环(event-loop),在这个event-loop中调用 epoll_wait ;前面我们说 过 epoll_wait 的事件是通过回调函数回调到event-loop中,也就是说会event-loop会被频繁的读写访问,此时 就有可能会出现event-loop在CPU中的访问命中率下降,其原理:单个CPU核在读取一个变量时,以cache line 的方式将后续的变量也读取进来,缓存在自己这个核的cache中,而后续的变量也可能被其他CPU核并行缓存。当前面的CPU对前面的变量进行写入时,该变量同样是以cache line为单位写回内存。此时在其他核上,尽管缓 存的是该变量之后的变量,但是由于没法区分自身变量是否被修改,所以它只能认为自己的缓存失效,重新从 内存中读取。为了能够让CPU尽快从高速缓冲中访问到event-loop变量,有必要让event-loop结构恰好填满一个cache line, 避免重复写回,至于手段上比较简单,即在结构中按照cache line大小填充无意义数组变量。
系统调用分离
在压测过程中,我们发现当PPS达到70w/s之后数据再也上不去了,通过pprof看到是系统调用开销,我们所涉 及到的几个系统调用均为阻塞的,阻塞调用在一定程度上会影响吞吐量,解决办法是再独立出goroutine专门负 责系统调用,避免阻塞event-loop。通过上述的优化内容,我实现了最简代码,在12核(E5-2640 2.5GHz)24G机器上跑出了128w PPS的数据,还 有一些比较细化的优化点,如降低GC频率在此不表,做此类优化的难点在于扣细节,结合pprof和实际测试数 据逐点分析哪部分可能会影响吞吐量,哪种优化方案能有效应对,需要反复对比测试数据,有时候还需要考虑 到代码结构上的实现,文中内容略浅显,表述不当的地方请指正。
本文测试代码在这里:
https://github.com/shaoyuan1943/fastudp
作者:shaoyuan1943
来源:微信公众号:GoCN