BFE的HTTP/HTTP2/SPDY/WebSocket/TLS等网络协议基于Go语言官方开源协议库。为更好满足反向代理的需求场景,在BFE中进行了二次定制开发,包括性能优化、防攻击机制完善、兼容性改进、增加探针等。

本章重点介绍HTTP/HTTP2协议的实现。SPDY的实现与HTTP2的实现非常相似,这里不再赘述。其它协议的实现可参考BFE的代码组织中的说明查阅对应源码,也可在BFE开源社区提问交流。

HTTP协议

HTTP代码的组织

在bfe_http目录下包含如下代码:

$ ls bfe/bfe_http
chunked.go       cookie.go       header_test.go  readrequest_test.go   response.go            sniff.go     transfer_test.go
chunked_test.go  cookie_test.go  httputil        request.go            response_test.go       state.go     transport.go
client.go        eof_reader.go   lex.go          request_test.go       response_writer.go     status.go
common.go        header.go       lex_test.go     requestwrite_test.go  responsewrite_test.go  transfer.go

各文件的功能说明如下:

类别 文件名或目录 说明
基础类型 common.go HTTP基础数据类型定义
state.go HTTP协议内部状态指标
eof_reader.go EofReader类型定义,实现了io.ReadCloser接口,并永远返回EOF
协议消息 request.go HTTP请求类型的定义、读取及发送
response.go HTTP响应类型的定义、读取及发送
header.go HTTP头部类型定义及相关操作
cookie.go HTTP Cookie字段的处理
status.go HTTP响应状态码定义
lex.go HTTP合法字符表
消息收发 client.go RoundTrpper接口定义,支持并发的发送请求并获取响应
transport.go HTTP连接池管理,实现了RoundTrpper接口,在反向代理场景用于管理与后端的HTTP通信
transfer.go transferWriter/transfterReader类型定义,在反向代理场景用于向后端流式发送请求及读取响应
response_writer.go ResponseWriter类型定义,在反向代理场景用于构造响应并发送
辅助工具 httputil HTTP相关辅助函数
chunked.go HTTP Chunked编码处理
sniff.go HTTP MIME检测算法实现(https://mimesniff.spec.whatwg.org)

从用户读取HTTP请求

在 bfe_http/request.go文件中实现了从HTTP连接上读取一个HTTP请求,包括以下步骤:

  • 读取HTTP请求行并解析请求方法、URI及协议版本号
  • 读取HTTP请求头部并解析
  • 读取HTTP请求主体
// bfe_http/request.go

// ReadRequest reads and parses a request from b.
func ReadRequest(b *bfe_bufio.Reader, maxUriBytes int) (req *Request, err error) {
      tp := newTextprotoReader(b)
      req = new(Request)
      req.State = new(RequestState)

      // Read first line (eg. GET /index.html HTTP/1.0)
      var s string
      if s, err = tp.ReadLine(); err != nil {
            return nil, err
      }
    ...

      // Parse request method, uri, proto
      var ok bool
      req.Method, req.RequestURI, req.Proto, ok = parseRequestLine(s)
      if !ok {
            return nil, &badStringError{"malformed HTTP request", s}
       }
      rawurl := req.RequestURI
      if req.ProtoMajor, req.ProtoMinor, ok = ParseHTTPVersion(req.Proto); !ok {
            return nil, &badStringError{"malformed HTTP version", req.Proto}
      }
      if req.URL, err = url.ParseRequestURI(rawurl); err != nil {
            return nil, err
      }
    ...

      // Read and parser request header
      mimeHeader, headerKeys, err := tp.ReadMIMEHeaderAndKeys()
      if err != nil {
            return nil, err
      }
      req.Header = Header(mimeHeader)
      req.HeaderKeys = headerKeys
    ...

    // Read request body
      err = readTransfer(req, b)
      if err != nil {
            return nil, err
      }

      return req, nil
}

注意在最后一个步骤中,readTransfer(req, b)并未直接将请求内容立即读取到内存中。如果这样做,会大大增加反向代理的内存开销,同时也会增加请求转发延迟。

在readTransfer函数中,根据请求方法、传输编码、请求主体长度,返回满足io.ReadCloser接口类型的不同实现,用于按需读取请求内容。

// bfe_http/transfer.go

// Prepare body reader. ContentLength < 0 means chunked encoding
// or close connection when finished, since multipart is not supported yet
switch {
case chunked(t.TransferEncoding):
        if noBodyExpected(t.RequestMethod) {
              t.Body = EofReader
        } else {
              t.Body = &body{src: newChunkedReader(r), hdr: msg, r: r, closing: t.Close}
        }

case realLength == 0:
        t.Body = EofReader

case realLength > 0:
        // set r for peek data from body
        t.Body = &body{src: io.LimitReader(r, realLength), r: r, closing: t.Close}

default:
        // realLength < 0, i.e. "Content-Length" not mentioned in header
        if t.Close {
              // Close semantics (i.e. HTTP/1.0)
              t.Body = &body{src: r, closing: t.Close}
        } else {
              // Persistent connection (i.e. HTTP/1.1)
              t.Body = EofReader
        }
}

向后端转发请求并获取响应

在 bfe_http/transport.go中Transport类型实现了RoundTripper接口,支持发送请求并获取响应。主要包括以下步骤:

  • 检查请求的合法性
  • 从连接池获取到目地后端的闲置连接,或新建连接(如无闲置连接)
  • 使用该连接发送请求,并读取响应

连接的数据类型是persistConn,包含的核心成员如下:

// bfe_http/transport.go

// persistConn wraps a connection, usually a persistent one
// (but may be used for non-keep-alive requests as well)
type persistConn struct {
    t        *Transport
    cacheKey string // its connectMethod.String()
    conn     net.Conn
    closed   bool                // whether conn has been closed

    reqch    chan requestAndChan // written by roundTrip; read by readLoop
    writech  chan writeRequest   // written by roundTrip; read by writeLoop
    closech  chan struct{}       // broadcast close when readLoop (TCP connection) closes
    ...
}

同时,persistConn包含两个相关协程 writeLoop()和readLoop(),分别用于向后端连接发送请求及读取响应。

// bfe_http/transport.go

func (pc *persistConn) writeLoop() {
    defer close(pc.closech)
    ...
    for {
        select {
        case wr := <-pc.writech:
            ...
            // Write the HTTP request and flush buffer
            err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
            if err == nil {
                err = pc.bw.Flush()
            }
            if err != nil {
                err = WriteRequestError{Err: err}
                pc.markBroken()
            }
            // Return the write result
            wr.ch <- err
        case <-pc.closech:
            return
        }
    }
}

func (pc *persistConn) readLoop() {
    defer close(pc.closech)
    ...
    alive := true
    for alive {
        ...
        rc := <-pc.reqch
        var resp *Response
        if err == nil {
            // Read the HTTP response
            resp, err = ReadResponse(pc.br, rc.req)
            ...
        }
        ...
        if err != nil {
            pc.close()
        } else {
            ...
            // Wrapper the HTTP Body 
            resp.Body = &bodyEOFSignal{body: resp.Body}
        }
        ...

        // Return the read result
        if err != nil {
            err = ReadRespHeaderError{Err: err}
        }
        rc.ch <- responseAndError{resp, err}
        ...
    }
}

向用户回复HTTP响应

反向代理通过ResponseWriter接口来构造及发送响应,包括以下接口:

  • Header(): 通过该方法设置响应头部
  • WriteHeader(): 通过该方法设置响应状态码并发送响应头部
  • Write(): 通过该方法发送响应主体数据
// bfe_http/response_writer.go

// A ResponseWriter interface is used by an HTTP handler to
// construct an HTTP response.
type ResponseWriter interface {
    // Header returns the header map that will be sent by WriteHeader.
    // Changing the header after a call to WriteHeader (or Write) has
    // no effect.
    Header() Header

    // Write writes the data to the connection as part of an HTTP reply.
    // If WriteHeader has not yet been called, Write calls   
    // WriteHeader(http.StatusOK) before writing the data.  
    // If the Header does not contain a Content-Type line, Write adds a  
    // Content-Type set to the result of passing the initial 512 bytes of 
    // written data to DetectContentType.
    Write([]byte) (int, error)

    // WriteHeader sends an HTTP response header with status code.
    // If WriteHeader is not called explicitly, the first call to Write
    // will trigger an implicit WriteHeader(http.StatusOK).
    // Thus explicit calls to WriteHeader are mainly used to
    // send error codes.
    WriteHeader(int)
}

在bfe_server/response.go文件中实现了ResponseWriter接口,用于发送HTTP/HTTPS响应。

HTTP2协议

HTTP2代码的组织

在bfe_http2目录下包含如下代码:

$ls bfe/bfe_http2
errors.go       flow_test.go   headermap.go  http2_test.go     server_test.go  transport.go   z_spec_test.go
errors_test.go  frame.go       hpack         priority_test.go  state.go        write.go
flow.go         frame_test.go  http2.go      server.go         testdata        writesched.go

各文件的功能说明如下:

类别 文件名或子目录 说明
流处理层 server.go HTTP2协议连接核心处理逻辑
flow.go HTTP2流量控制窗口
writesched.go HTTP2协议帧发送优先级队列
帧处理层 frame.go HTTP2协议帧定义及解析
write.go HTTP2协议帧发送方法
hpack/ HTTP2协议头部压缩算法HPACK
基础数据类型 headermap.go HTTP2常见请求头部定义
errors.go HTTP2协议错误定义
state.go HTTP2协议内部状态指标
辅助工具 transport.go 封装了HTTP2客户端;仅用于与后端实例通信

HTTP2连接处理模块

BFE在接收到一个HTTP2连接后,除了创建连接处理主协程, 还会创建多个子协程配合完成协议逻辑的处理。单个HTTP2协议连接处理模块结构如图所示。

模块内部结构自底向上划分为三个层级:

帧处理层

  • 帧处理层实现HTTP2协议帧序列化、压缩及传输
  • 帧处理层包含两个独立收发协程,分别负责协议帧的接收与发送
  • 帧处理层与流处理层通过管道通信 (RecvChan/SendChan/WroteChan)

流处理层

  • 流处理层实现协议核心逻辑,例如:流创建、流数据传输、流关闭、多路复用、流优先级、流量控制等
  • 流处理层为每流创建Request/ResponseWriter实例,并在独立协程中运行应用逻辑

接口层

  • 为HTTP应用Handler提供标准Request/ResponseWriter实现, 屏蔽HTTP2协议数据传输细节
  • HTTP应用Handler运行在Stream Goroutine协程中
  • HTTP应用Handler通过Request实例获取HTTP 请求(读取自特定HTTP2流)
  • HTTP应用Handler通过ResponseWriter实例发送HTTP响应(发往特定HTTP2流)

HTTP2连接相关协程及关系

每个HTTP2连接的各协程,基于CSP(Communicating Sequential Processes)模型协作,具体如下:

帧处理层的协程

每个HTTP2连接包含两个读写协程,分别负责读取或发送HTTP2协议帧,其中:

  • 帧接收协程(Frame Recv Goroutine) 从连接上读取HTTP2协议帧并放入帧接收队列
// bfe_http2/server.go

// readFrames is the loop that reads incoming frames.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
    gate := make(gate)
    gateDone := gate.Done
    for {
        f, err := sc.framer.ReadFrame()
        ...

        // Send the frame to readFrameCh
        select {
        case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
        case <-sc.doneServing:
            return
        }

        // Waiting for the frame to be processed
        select {
            case <-gate:
            case <-sc.doneServing:
                return
        }
        ...
    }
}
  • 帧发送协程(Frame Send Goroutine) 从帧发送队列获取帧,并写入连接,同时将写结果放入写结果队列WroteChan
// bfe_http2/server.go

// writeFrames runs in its own goroutine and writes frame
// and then reports when it's done.
func (sc *serverConn) writeFrames() {
    var wm frameWriteMsg
    var err error

    for {
        // get frame from sendChan
        select {
        case wm = <-sc.writeFrameCh:
        case <-sc.doneServing:
            return
        }

        // write frame
        err = wm.write.writeFrame(sc)
        log.Logger.Debug("http2: write Frame: %v, %v", wm, err)

        // report write result
        select {
        case sc.wroteFrameCh <- frameWriteResult{wm, err}:
        case <-sc.doneServing:
            return
        }
    }
}

流处理层的协程

主协程与其它协程通过管道(golang Chan)进行通信, 例如:

  • BodyReadChan:请求处理协程读取请求Body后,通过BodyReadChan向主协程发送读结果消息,主协议接收到消息执行流量控制操作并更新流量控制窗口
  • WriteMsgChan: 请求处理协程发送响应后,通过WriteMsgChan向主协程发送写申请消息,主协议接收到消息后,转换为HTTP2数据帧并放入流发送队列。在合适到时机
  • ReadChan/SendChan/WroteChan:从连接上获取或发送HTTP2协议帧
// bfe_http2/server.go

func (sc *serverConn) serve() {
    ...

      // Write HTTP2 Settings frame and read preface.
    sc.writeFrame(frameWriteMsg{write: writeSettings{...}})
    err := sc.readPreface()
    ...

    // Start readFrames/writeFrames goroutines.
    go sc.readFrames()
    go sc.writeFrames()

    for {
        select {
        case wm := <-sc.wantWriteFrameCh:
            sc.writeFrame(wm)
        case res := <-sc.wroteFrameCh:
            sc.wroteFrame(res)
        case res := <-sc.readFrameCh:
            if !sc.processFrameFromReader(res) {
            return
            }
            ...
        case m := <-sc.bodyReadCh:
            sc.noteBodyRead(m.st, m.n)
        case <-sc.closeNotifyCh: // graceful shutdown
            sc.goAway(ErrCodeNo)
            sc.closeNotifyCh = nil
        ...
        }
    }
}

接口层的协程

每个HTTP2连接为应用层封装了Request对象及ResponseWriter对象,并创建独立的请求处理协程(Stream Goroutine)处理请求并返回响应

  • Stream Goroutine 从Request对象中获取请求
  • Stream Goroutine 向ResponseWriter对象发送响应
// bfe_http2/server.go

func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
    sc.serveG.Check()
    id := f.Header().StreamID
    ...

    // Create a new stream
    st = &stream{
        sc:    sc,
        id:    id,
        state: stateOpen,
        isw:   sc.srv.initialStreamRecvWindowSize(sc.rule),
    }
    ...

    // Create the Reqeust and ResponseWriter
    rw, req, err := sc.newWriterAndRequest(st, f)
    if err != nil {
        return err
    }
    st.body = req.Body.(*RequestBody).pipe // may be nil
    st.declBodyBytes = req.ContentLength
    ...

    // Process the request in a new goroutine
    handler := sc.handler.ServeHTTP
    go sc.runHandler(rw, req, handler)
    return nil
}
最后编辑: kuteng  文档更新时间: 2021-05-09 11:40   作者:kuteng