使用golang net/http库发送http请求,最后都是调用 transport的 RoundTrip方法中。

type RoundTripper interface {
    RoundTrip(*Request) (*Response, error)
}

RoundTrip代表一个http事务,给一个请求返回一个响应。RoundTripper必须是并发安全的。RoundTripper接口的实现Transport结构体在源码包net/http/transport.go 中。

type Transport struct {
    idleMu     sync.Mutex
    wantIdle   bool                                // user has requested to close all idle conns  用户是否已关闭所有的空闲连接
    idleConn   map[connectMethodKey][]*persistConn // most recently used at end,保存从connectMethodKey(代表着不同的协议,不同的host,也就是不同的请求)到persistConn的映射
    /*
    idleConnCh 用来在并发http请求的时候在多个 goroutine 里面相互发送持久连接,也就是说,
    这些持久连接是可以重复利用的, 你的http请求用某个persistConn用完了,
    通过这个channel发送给其他http请求使用这个persistConn
    */
    idleConnCh map[connectMethodKey]chan *persistConn
    idleLRU    connLRU

    reqMu       sync.Mutex
    reqCanceler map[*Request]func(error)   //请求取消器

    altMu    sync.Mutex   // guards changing altProto only
    altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme  为空或者map[string]RoundTripper,key为URI  的scheme,用于自定义的协议及对应的处理请求的RoundTripper

    // Proxy specifies a function to return a proxy for a given
    // Request. If the function returns a non-nil error, the
    // request is aborted with the provided error.
    //
    // The proxy type is determined by the URL scheme. "http"
    // and "socks5" are supported. If the scheme is empty,
    // "http" is assumed.
    //
    // If Proxy is nil or returns a nil *URL, no proxy is used.
    Proxy func(*Request) (*url.URL, error)   //根据给定的Request返回一个代理,如果返回一个不为空的error,请求会终止

    // DialContext specifies the dial function for creating unencrypted TCP connections.
    // If DialContext is nil (and the deprecated Dial below is also nil),
    // then the transport dials using package net.
    /*
    DialContext用于指定创建未加密的TCP连接的dial功能,如果该函数为空,则使用net包下的dial函数
    */
    DialContext func(ctx context.Context, network, addr string) (net.Conn, error)

    // Dial specifies the dial function for creating unencrypted TCP connections.
    //
    // Deprecated: Use DialContext instead, which allows the transport
    // to cancel dials as soon as they are no longer needed.
    // If both are set, DialContext takes priority.
    /*
    Dial获取一个tcp连接,也就是net.Conn结构,然后就可以写入request,从而获取到response
    DialContext比Dial函数的优先级高
    */
    Dial func(network, addr string) (net.Conn, error)

    // DialTLS specifies an optional dial function for creating
    // TLS connections for non-proxied HTTPS requests.
    //
    // If DialTLS is nil, Dial and TLSClientConfig are used.
    //
    // If DialTLS is set, the Dial hook is not used for HTTPS
    // requests and the TLSClientConfig and TLSHandshakeTimeout
    // are ignored. The returned net.Conn is assumed to already be
    // past the TLS handshake.
    /*
    DialTLS  为创建非代理的HTTPS请求的TLS连接提供一个可选的dial功能
    如果DialTLS为空,则使用Dial和TLSClientConfig
    如果设置了DialTLS,则HTTPS的请求不使用Dial的钩子,并且TLSClientConfig 和 TLSHandshakeTimeout会被忽略
    返回的net.Conn假设已经通过了TLS握手
    */
    DialTLS func(network, addr string) (net.Conn, error)

    // TLSClientConfig specifies the TLS configuration to use with
    // tls.Client.
    // If nil, the default configuration is used.
    // If non-nil, HTTP/2 support may not be enabled by default.
    /*
      TLSClientConfig指定tls.Client使用的TLS配置信息
    如果为空,则使用默认配置
    如果不为空,默认情况下未启动HTTP/2支持
    */
    TLSClientConfig *tls.Config

    // TLSHandshakeTimeout specifies the maximum amount of time waiting to
    // wait for a TLS handshake. Zero means no timeout.
    /*
    指定TLS握手的超时时间
    */
    TLSHandshakeTimeout time.Duration

    // DisableKeepAlives, if true, prevents re-use of TCP connections
    // between different HTTP requests.
    DisableKeepAlives bool   //如果为true,则阻止在不同http请求之间重用TCP连接

    // DisableCompression, if true, prevents the Transport from
    // requesting compression with an "Accept-Encoding: gzip"
    // request header when the Request contains no existing
    // Accept-Encoding value. If the Transport requests gzip on
    // its own and gets a gzipped response, it's transparently
    // decoded in the Response.Body. However, if the user
    // explicitly requested gzip it is not automatically
    // uncompressed.
    DisableCompression bool   //如果为true,则进制传输使用 Accept-Encoding: gzip

    // MaxIdleConns controls the maximum number of idle (keep-alive)
    // connections across all hosts. Zero means no limit.
    MaxIdleConns int   //指定最大的空闲连接数

    // MaxIdleConnsPerHost, if non-zero, controls the maximum idle
    // (keep-alive) connections to keep per-host. If zero,
    // DefaultMaxIdleConnsPerHost is used.
    MaxIdleConnsPerHost int  //用于控制某一个主机的连接的最大空闲数

    // IdleConnTimeout is the maximum amount of time an idle
    // (keep-alive) connection will remain idle before closing
    // itself.
    // Zero means no limit.
    IdleConnTimeout time.Duration   //指定空闲连接保持的最长时间,如果为0,则不受限制

    // ResponseHeaderTimeout, if non-zero, specifies the amount of
    // time to wait for a server's response headers after fully
    // writing the request (including its body, if any). This
    // time does not include the time to read the response body.
    /*
    ResponseHeaderTimeout,如果非零,则指定在完全写入请求(包括其正文,如果有)之后等待服务器响应头的最长时间。
    此时间不包括读响应体的时间。
    */
    ResponseHeaderTimeout time.Duration

    // ExpectContinueTimeout, if non-zero, specifies the amount of
    // time to wait for a server's first response headers after fully
    // writing the request headers if the request has an
    // "Expect: 100-continue" header. Zero means no timeout and
    // causes the body to be sent immediately, without
    // waiting for the server to approve.
    // This time does not include the time to send the request header.
    /*
   如果请求头是"Expect:100-continue",ExpectContinueTimeout  如果不为0,它表示等待服务器第一次响应头的最大时间
    零表示没有超时并导致正文立即发送,无需等待服务器批准。
    此时间不包括发送请求标头的时间。
    */
    ExpectContinueTimeout time.Duration

    // TLSNextProto specifies how the Transport switches to an
    // alternate protocol (such as HTTP/2) after a TLS NPN/ALPN
    // protocol negotiation. If Transport dials an TLS connection
    // with a non-empty protocol name and TLSNextProto contains a
    // map entry for that key (such as "h2"), then the func is
    // called with the request's authority (such as "example.com"
    // or "example.com:1234") and the TLS connection. The function
    // must return a RoundTripper that then handles the request.
    // If TLSNextProto is not nil, HTTP/2 support is not enabled
    // automatically.
    /*
TLSNextProto指定在TLS NPN / ALPN协议协商之后传输如何切换到备用协议(例如HTTP / 2)。
    如果传输使用非空协议名称拨打TLS连接并且TLSNextProto包含该密钥的映射条目(例如“h2”),则使用请求的权限调用func(例如“example.com”或“example” .com:1234“)和TLS连接。
    该函数必须返回一个RoundTripper,然后处理该请求。 如果TLSNextProto不是nil,则不会自动启用HTTP / 2支持。
    */
    TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper

    // ProxyConnectHeader optionally specifies headers to send to
    // proxies during CONNECT requests.
    /*
    ProxyConnectHeader可选地指定在CONNECT请求期间发送给代理的标头。
    */
    ProxyConnectHeader Header

    // MaxResponseHeaderBytes specifies a limit on how many
    // response bytes are allowed in the server's response
    // header.
    //
    // Zero means to use a default limit.
    /*
    指定服务器返回的响应头的最大字节数
    为0则使用默认的限制
    */
    MaxResponseHeaderBytes int64

    // nextProtoOnce guards initialization of TLSNextProto and
    // h2transport (via onceSetNextProtoDefaults)
    //nextProtoOnce保护  TLSNextProto和 h2transport 的初始化
    nextProtoOnce sync.Once
    h2transport   *http2Transport // non-nil if http2 wired up,如果是http2连通,则不为nil

    // TODO: tunable on max per-host TCP dials in flight (Issue 13957)
}

以上是Transport结构体及每个字段的主要功能,从中可以看到

idleConn 可以理解成 空闲的连接池,用于存放空闲的连接,从而使连接可以复用。

idleConnCh用来在并发http请求的时候在多个goroutine里相互发送persistConn,可以使persistConn持久化连接得到重复使用。

RoundTrip方法
Transport的核心方法时RoundTrip方法,该方法的工作流程基本如下:

RoundTrip方法源码如下:

//RoundTrip实现了RoundTripper接口
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
    //初始化TLSNextProto  http2使用
    t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
    //获取请求的上下文
    ctx := req.Context()
    trace := httptrace.ContextClientTrace(ctx)

    //错误处理
    if req.URL == nil {
        req.closeBody()
        return nil, errors.New("http: nil Request.URL")
    }
    if req.Header == nil {
        req.closeBody()
        return nil, errors.New("http: nil Request.Header")
    }
    scheme := req.URL.Scheme
    isHTTP := scheme == "http" || scheme == "https"
    //如果是http或https请求,对Header中的数据进行校验
    if isHTTP {
        for k, vv := range req.Header {
            if !httplex.ValidHeaderFieldName(k) {
                return nil, fmt.Errorf("net/http: invalid header field name %q", k)
            }
            for _, v := range vv {
                if !httplex.ValidHeaderFieldValue(v) {
                    return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
                }
            }
        }
    }

    //如果该scheme有自定义的RoundTrip,则使用自定义的RoundTrip处理request,并返回response
    altProto, _ := t.altProto.Load().(map[string]RoundTripper)
    if altRT := altProto[scheme]; altRT != nil {
        if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
            return resp, err
        }
    }

    //如果不是http请求,则关闭并退出
    if !isHTTP {
        req.closeBody()
        return nil, &badStringError{"unsupported protocol scheme", scheme}
    }

    //对请求的Method进行校验
    if req.Method != "" && !validMethod(req.Method) {
        return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
    }

    //请求的host为空,则返回
    if req.URL.Host == "" {
        req.closeBody()
        return nil, errors.New("http: no Host in request URL")
    }

    for {
        // treq gets modified by roundTrip, so we need to recreate for each retry.
        //初始化transportRequest,transportRequest是request的包装器
        treq := &transportRequest{Request: req, trace: trace}
        //根据用户的请求信息获取connectMethod  cm
        cm, err := t.connectMethodForRequest(treq)
        if err != nil {
            req.closeBody()
            return nil, err
        }

        // Get the cached or newly-created connection to either the
        // host (for http or https), the http proxy, or the http proxy
        // pre-CONNECTed to https server. In any case, we'll be ready
        // to send it requests.
        //从缓存中获取一个连接,或者新建一个连接
        pconn, err := t.getConn(treq, cm)
        if err != nil {
            t.setReqCanceler(req, nil)
            req.closeBody()
            return nil, err
        }

        var resp *Response
        if pconn.alt != nil {
            // HTTP/2 path.
            t.setReqCanceler(req, nil) // not cancelable with CancelRequest
            resp, err = pconn.alt.RoundTrip(req)
        } else {
            resp, err = pconn.roundTrip(treq)
        }
        if err == nil {
            return resp, nil
        }
        if !pconn.shouldRetryRequest(req, err) {
            // Issue 16465: return underlying net.Conn.Read error from peek,
            // as we've historically done.
            if e, ok := err.(transportReadFromServerError); ok {
                err = e.err
            }
            return nil, err
        }
        testHookRoundTripRetried()

        // Rewind the body if we're able to.  (HTTP/2 does this itself so we only
        // need to do it for HTTP/1.1 connections.)
        if req.GetBody != nil && pconn.alt == nil {
            newReq := *req
            var err error
            newReq.Body, err = req.GetBody()
            if err != nil {
                return nil, err
            }
            req = &newReq
        }
    }
}

该方法首先会进行一些校验,如果客户端请求的scheme有自定的RoundTrip,则使用自定义的RoundTrip处理request,并返回response。

该方法主要的请求处理逻辑在for循环里,首先会根据请求从空闲的连接池中获取一个连接或新建一个连接pconn。忽略HTTP/2请求的处理,其他常用的HTTP请求会调用roundTrip方法将客户端发送给服务器,并等待返回response。

getConn获取或新建连接
 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
    req := treq.Request
    trace := treq.trace
    ctx := req.Context()

    //GetConn是钩子函数在获取连接前调用
    if trace != nil && trace.GetConn != nil {
        trace.GetConn(cm.addr())
    }
    //如果可以获取到空闲的连接
    if pc, idleSince := t.getIdleConn(cm); pc != nil {
        if trace != nil && trace.GotConn != nil {  //GotConn是钩子函数,成功获取连接后调用
            trace.GotConn(pc.gotIdleConnTrace(idleSince))
        }
        // set request canceler to some non-nil function so we
        // can detect whether it was cleared between now and when
        // we enter roundTrip
        /*
        将请求的canceler设置为某些非零函数,以便我们可以检测它是否在现在和我们进入roundTrip之间被清除
        */
        t.setReqCanceler(req, func(error) {})
        return pc, nil
    }

    type dialRes struct {
        pc  *persistConn
        err error
    }
    dialc := make(chan dialRes)

    // Copy these hooks so we don't race on the postPendingDial in
    // the goroutine we launch. Issue 11136.
    testHookPrePendingDial := testHookPrePendingDial
    testHookPostPendingDial := testHookPostPendingDial

    //该内部函数handlePendingDial的主要作用是,新开启一个协程,当新建连接完成后但没有被使用,将其放到连接池(缓存)中或将其关闭
    handlePendingDial := func() {
        testHookPrePendingDial()
        go func() {
            if v := <-dialc; v.err == nil {
                t.putOrCloseIdleConn(v.pc)
            }
            testHookPostPendingDial()
        }()
    }

    cancelc := make(chan error, 1)
    t.setReqCanceler(req, func(err error) { cancelc <- err })

    go func() {//开启一个协程新建一个连接
        pc, err := t.dialConn(ctx, cm)
        dialc <- dialRes{pc, err}
    }()

    idleConnCh := t.getIdleConnCh(cm)
    select {
    case v := <-dialc: //获取新建的连接
        // Our dial finished.
        if v.pc != nil { //如果新建的连接不为nil,则返回新建的连接
            if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
                trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
            }
            return v.pc, nil
        }
        // Our dial failed. See why to return a nicer error
        // value.
        select {
        case <-req.Cancel:
            // It was an error due to cancelation, so prioritize that
            // error value. (Issue 16049)
            return nil, errRequestCanceledConn
        case <-req.Context().Done():
            return nil, req.Context().Err()
        case err := <-cancelc:
            if err == errRequestCanceled {
                err = errRequestCanceledConn
            }
            return nil, err
        default:
            // It wasn't an error due to cancelation, so
            // return the original error message:
            return nil, v.err
        }
    case pc := <-idleConnCh:  //如果在新建连接的过程中,有空闲的连接,则返回该空闲的连接
        // Another request finished first and its net.Conn
        // became available before our dial. Or somebody
        // else's dial that they didn't use.
        // But our dial is still going, so give it away
        // when it finishes:
        //如果在dial连接的时候,有空闲的连接,但是这个时候我们仍然正在新建连接,所以当它新建完成后将其放到连接池或丢弃
        handlePendingDial()
        if trace != nil && trace.GotConn != nil {
            trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
        }
        return pc, nil
    case <-req.Cancel:
        handlePendingDial()
        return nil, errRequestCanceledConn
    case <-req.Context().Done():
        handlePendingDial()
        return nil, req.Context().Err()
    case err := <-cancelc:
        handlePendingDial()
        if err == errRequestCanceled {
            err = errRequestCanceledConn
        }
        return nil, err
    }
}

第一步:尝试从空闲的连接池中获取空闲连接(通过getIdleConn方法)

如果缓存中有空闲的连接,则获取空闲的连接,并从idleConn和idleLRU中删除该连接,getIdleConn方法的源码如下:

func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) {
    key := cm.key()
    t.idleMu.Lock()
    defer t.idleMu.Unlock()
    for {
        pconns, ok := t.idleConn[key]
        if !ok {
            return nil, time.Time{}
        }
        if len(pconns) == 1 {
            pconn = pconns[0]
            delete(t.idleConn, key)
        } else {
            // 2 or more cached connections; use the most
            // recently used one at the end.
            pconn = pconns[len(pconns)-1]
            t.idleConn[key] = pconns[:len(pconns)-1]
        }
        t.idleLRU.remove(pconn)
        if pconn.isBroken() {  //如果该连接被关闭,则继续从缓存中查找
            // There is a tiny window where this is
            // possible, between the connecting dying and
            // the persistConn readLoop calling
            // Transport.removeIdleConn. Just skip it and
            // carry on.
            continue
        }
        if pconn.idleTimer != nil && !pconn.idleTimer.Stop() {
            // We picked this conn at the ~same time it
            // was expiring and it's trying to close
            // itself in another goroutine. Don't use it.
            continue
        }
        return pconn, pconn.idleAt
    }
}

前面我们提到过空闲的连接存放在idleConn字段中,该字段是map结构,可以通过客户端的请求信息(proxy,scheme,addr)来获取持久连接persistConn。getIdleConn方法尝试从空闲的连接池idleConn中获取空闲连接,如果获取到空闲的连接,则从该idleConn map中删除,并判断该连接是否被关闭,如果该连接已经被关闭则继续获取。如果可以获取到空闲连接则返回该连接,将使用该连接与服务器进行通讯,如果获取不到则新建连接。

第二步:如果空闲的连接池中没有可用的连接,则会调用dialConn方法新建连接

当我们无法从空闲的连接池中获取连接,就要新建连接。新建连接的大致过程如下:

首先 初始化dialc channel, 该channel用于等待新建的连接,如果连接创建成功则将创建的连接放入到dialc中

go func() {//开启一个协程新建一个连接
        pc, err := t.dialConn(ctx, cm)
        dialc <- dialRes{pc, err}
}()

handlePendingDial函数,该内部函数的主要作用用于开启一个协程,当新建连接成功但没有被使用,则通过该函数将其放到连接池中或将其关闭。

handlePendingDial := func() {
        testHookPrePendingDial()
        go func() {
            if v := <-dialc; v.err == nil {
                t.putOrCloseIdleConn(v.pc)
            }
            testHookPostPendingDial()
        }()
}

其次 用select case 监听事件

1.监听连接是否重建成功,如果连接创建成功则返回该新建的连接

2.通过idleConnCh channel监听是否在创建连接的时候有空闲的连接,如果有空闲的连接则返回空闲连接,并调用handlePendingDial函数,处理新建的连接,将新建的连接放入到空闲的连接池中或将其关闭。

idleConnCh是为了多个http请求之间复用,当一个客户端的请求处理完成之后,首先会尝试将该连接写入到idleConnCh中,如果有其他http在监听等待该idleConnCh,则会写入成功。从而降低客户端请求的等待时间。如果该连接无法放入到idleConnCh中,则会尝试将该连接放入到idleConn中

3.在等待创建连接的过程中也在监听是否有取消客户端请求的消息,如果有也会调用handlePendingDial函数,并返回错误信息。

新建连接的过程

前面我们看到新建连接时,会开启一个协程来执行dialConn方法来创建连接,dialConn方法源码如下:

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
    //初始化persistConn结构体pconn
    pconn := &persistConn{
        t:             t,
        cacheKey:      cm.key(),
        reqch:         make(chan requestAndChan, 1),
        writech:       make(chan writeRequest, 1),
        closech:       make(chan struct{}),
        writeErrCh:    make(chan error, 1),
        writeLoopDone: make(chan struct{}),
    }
    trace := httptrace.ContextClientTrace(ctx)
    wrapErr := func(err error) error {
        if cm.proxyURL != nil {
            // Return a typed error, per Issue 16997
            return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
        }
        return err
    }
    //如果scheme为https,并且DialTLS函数不为nil
    if cm.scheme() == "https" && t.DialTLS != nil {
        var err error
        pconn.conn, err = t.DialTLS("tcp", cm.addr())
        if err != nil {
            return nil, wrapErr(err)
        }
        if pconn.conn == nil {
            return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
        }
        if tc, ok := pconn.conn.(*tls.Conn); ok {
            // Handshake here, in case DialTLS didn't. TLSNextProto below
            // depends on it for knowing the connection state.
            if trace != nil && trace.TLSHandshakeStart != nil {
                trace.TLSHandshakeStart()
            }
            if err := tc.Handshake(); err != nil {
                go pconn.conn.Close()
                if trace != nil && trace.TLSHandshakeDone != nil {
                    trace.TLSHandshakeDone(tls.ConnectionState{}, err)
                }
                return nil, err
            }
            cs := tc.ConnectionState()
            if trace != nil && trace.TLSHandshakeDone != nil {
                trace.TLSHandshakeDone(cs, nil)
            }
            pconn.tlsState = &cs
        }
    } else {
        //创建tcp连接
        conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {
            return nil, wrapErr(err)
        }
        pconn.conn = conn
        if cm.scheme() == "https" {
            var firstTLSHost string
            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
                return nil, wrapErr(err)
            }
            if err = pconn.addTLS(firstTLSHost, trace); err != nil {
                return nil, wrapErr(err)
            }
        }
    }

    ....... 处理代理等消息省略

    //初始化br和bw
    pconn.br = bufio.NewReader(pconn)
    pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
    go pconn.readLoop()
    go pconn.writeLoop()
    return pconn, nil
}

新建连接的过程主要在dialConn方法中,新建连接的大致过程如下:

  1. 首先初始化persistConn结构体

  2. 创建连接,创建连接时区分https和http

  3. 连接创建成功后,会开启两个协程,一个用于处理输入流writeLoop,一个用于处理输出流readLoop

从中我们看到当客户端和服务端每建立一个连接,都会开启两个协程,一个处理输入流writeLoop,一个处理输出流readLoop。

readLoop方法

/*
从网络连接中读取消息并解析成Response
*/
func (pc *persistConn) readLoop() {
    closeErr := errReadLoopExiting // default value, if not changed below
    defer func() {
        pc.close(closeErr)
        pc.t.removeIdleConn(pc)
    }()

    //函数作用:尝试将pc放入空闲连接池中
    tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
        if err := pc.t.tryPutIdleConn(pc); err != nil {
            closeErr = err
            if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
                trace.PutIdleConn(err)
            }
            return false
        }
        if trace != nil && trace.PutIdleConn != nil {
            trace.PutIdleConn(nil)
        }
        return true
    }

    // eofc is used to block caller goroutines reading from Response.Body
    // at EOF until this goroutines has (potentially) added the connection
    // back to the idle pool.
    eofc := make(chan struct{})
    defer close(eofc) // unblock reader on errors

    // Read this once, before loop starts. (to avoid races in tests)
    testHookMu.Lock()
    testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
    testHookMu.Unlock()

    alive := true
    for alive {
        pc.readLimit = pc.maxHeaderResponseSize()
        _, err := pc.br.Peek(1)

        pc.mu.Lock()
        if pc.numExpectedResponses == 0 {
            pc.readLoopPeekFailLocked(err)
            pc.mu.Unlock()
            return
        }
        pc.mu.Unlock()


        //从reqch通道中获取请求数据和等待返回的response的channel
        rc := <-pc.reqch
        trace := httptrace.ContextClientTrace(rc.req.Context())  //从请求的上下文中获取trace

        var resp *Response
        if err == nil {
            resp, err = pc.readResponse(rc, trace)   //从网络连接中读取http的响应信息Response
        } else {
            err = transportReadFromServerError{err}
            closeErr = err
        }

        if err != nil {  //错误处理
            if pc.readLimit <= 0 {
                err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
            }

            select {
            case rc.ch <- responseAndError{err: err}:
            case <-rc.callerGone:
                return
            }
            return
        }
        pc.readLimit = maxInt64 // effictively no limit for response bodies

        pc.mu.Lock()
        pc.numExpectedResponses--
        pc.mu.Unlock()

        hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0  //判断是否响应的消息有body

        if resp.Close || rc.req.Close || resp.StatusCode <= 199 {
            // Don't do keep-alive on error if either party requested a close
            // or we get an unexpected informational (1xx) response.
            // StatusCode 100 is already handled above.
            alive = false
        }

        if !hasBody {  //如果响应体没有body
            pc.t.setReqCanceler(rc.req, nil)  //将reqCanceler取消函数中对应的req删除

            // Put the idle conn back into the pool before we send the response
            // so if they process it quickly and make another request, they'll
            // get this same conn. But we use the unbuffered channel 'rc'
            // to guarantee that persistConn.roundTrip got out of its select
            // potentially waiting for this persistConn to close.
            // but after
            /*
            在返回response之前,将空闲的conn放到连接池中
            */
            alive = alive &&
                !pc.sawEOF &&
                pc.wroteRequest() &&
                tryPutIdleConn(trace)

            select {
            case rc.ch <- responseAndError{res: resp}:  //将响应信息返回到roundTrip中
            case <-rc.callerGone:
                return
            }

            // Now that they've read from the unbuffered channel, they're safely
            // out of the select that also waits on this goroutine to die, so
            // we're allowed to exit now if needed (if alive is false)
            testHookReadLoopBeforeNextRead()
            continue
        }

        waitForBodyRead := make(chan bool, 2)
        body := &bodyEOFSignal{
            body: resp.Body,
            earlyCloseFn: func() error {
                waitForBodyRead <- false
                <-eofc // will be closed by deferred call at the end of the function
                return nil

            },
            fn: func(err error) error {
                isEOF := err == io.EOF
                waitForBodyRead <- isEOF
                if isEOF {
                    <-eofc // see comment above eofc declaration
                } else if err != nil {
                    if cerr := pc.canceled(); cerr != nil {
                        return cerr
                    }
                }
                return err
            },
        }

        resp.Body = body
        if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
            resp.Body = &gzipReader{body: body}
            resp.Header.Del("Content-Encoding")
            resp.Header.Del("Content-Length")
            resp.ContentLength = -1
            resp.Uncompressed = true
        }

        //将response返回到到roundTrip中
        select {
        case rc.ch <- responseAndError{res: resp}:
        case <-rc.callerGone:
            return
        }

        // Before looping back to the top of this function and peeking on
        // the bufio.Reader, wait for the caller goroutine to finish
        // reading the response body. (or for cancelation or death)
        /*
        等待返回的response中的body被读完后,才会将连接放入到连接池中,等待再次使用该连接
        */
        select {
        case bodyEOF := <-waitForBodyRead:
            pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
            alive = alive &&
                bodyEOF &&
                !pc.sawEOF &&
                pc.wroteRequest() &&
                tryPutIdleConn(trace)
            if bodyEOF {
                eofc <- struct{}{}
            }
        case <-rc.req.Cancel:
            alive = false
            pc.t.CancelRequest(rc.req)
        case <-rc.req.Context().Done():
            alive = false
            pc.t.cancelRequest(rc.req, rc.req.Context().Err())
        case <-pc.closech:
            alive = false
        }

        testHookReadLoopBeforeNextRead()
    }
}

readLoop方法的主要作用是从网络中读取消息并解析成Response返回

定义内部函数tryPutIdleConn,该函数的主要作用是将持久化连接persistConn放入到空闲连接池中

persistConn持久化连接中的reqch chan requestAndChan该channel是用于获取客户端的请求信息并等待返回的response。requestAndChan结构体如下:

type requestAndChan struct {
    req *Request 
    ch  chan responseAndError // unbuffered; always send in select on callerGone

    // whether the Transport (as opposed to the user client code)
    // added the Accept-Encoding gzip header. If the Transport
    // set it, only then do we transparently decode the gzip.
    addedGzip bool

    // Optional blocking chan for Expect: 100-continue (for send).
    // If the request has an "Expect: 100-continue" header and
    // the server responds 100 Continue, readLoop send a value
    // to writeLoop via this chan.
    continueCh chan<- struct{}

    callerGone <-chan struct{} // closed when roundTrip caller has returned
}

其中req是用户请求信息。ch是response和error相关信息,用于等待从服务端读取响应信息。continueCh用于判断100-continue的情况。

其中req是用户请求信息。ch是response和error相关信息,用于等待从服务端读取响应信息。continueCh用于判断100-continue的情况。

readLoop的大致流程如下:

1.如果连接正常则轮询读取要发送到服务端的客户端请求信息rc

2.调用readResponse方法从网络连接中读取http的响应信息并封装成Response

3.如果读取时有错误信息,则将error信息发送通过rc中的ch字段将错误信息返回

4.如果读取到的响应信息没有响应体(即Body),如果连接正常则尝试将连接放入到连接池中。并将响应信息通过rc中的ch channel发送到roundTrip中,从而响应给http客户端请求。

5.如果读取到的响应信息有响应体(即Body),则会将响应体进行封装,封装成bodyEOFSignal结构体,目的是为了当客户端读取响应体之后,才会将该连接放入到连接池中,等待再次被使用。

tryPutIdleConn将连接放入到空闲连接池中

func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
    ........

    waitingDialer := t.idleConnCh[key]
    select {
    case waitingDialer <- pconn:
        // We're done with this pconn and somebody else is
        // currently waiting for a conn of this type (they're
        // actively dialing, but this conn is ready
        // first). Chrome calls this socket late binding. See
        // https://insouciant.org/tech/connection-management-in-chromium/
        return nil
    default:
        if waitingDialer != nil {
            // They had populated this, but their dial won
            // first, so we can clean up this map entry.
            delete(t.idleConnCh, key)
        }
    }
    if t.wantIdle {
        return errWantIdle
    }
    if t.idleConn == nil {
        t.idleConn = make(map[connectMethodKey][]*persistConn)
    }
    idles := t.idleConn[key]
    if len(idles) >= t.maxIdleConnsPerHost() {
        return errTooManyIdleHost
    }
    for _, exist := range idles {
        if exist == pconn {
            log.Fatalf("dup idle pconn %p in freelist", pconn)
        }
    }
    t.idleConn[key] = append(idles, pconn)
    t.idleLRU.add(pconn)
    if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
        oldest := t.idleLRU.removeOldest()
        oldest.close(errTooManyIdle)
        t.removeIdleConnLocked(oldest)
    }
    if t.IdleConnTimeout > 0 {
        if pconn.idleTimer != nil {
            pconn.idleTimer.Reset(t.IdleConnTimeout)
        } else {
            pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
        }
    }
    pconn.idleAt = time.Now()
    return nil
}

tryPutIdleConn将持久化连接放入到空闲连接池的过程大致如下:

1.如果有其他http请求正在等待该连接,则将该连接写入到waitingDialer,从而使其他http请求复用

2.尝试将该连接放入到空闲的连接池中idleConn

writeLoop方法


func (pc *persistConn) writeLoop() {
    defer close(pc.writeLoopDone)
    for {
        select {
        case wr := <-pc.writech:  //获取到要写入到输入流中的request相关数据
            startBytesWritten := pc.nwrite
            err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))  //向网络连接中写入数据
            if bre, ok := err.(requestBodyReadError); ok { //错误处理
                err = bre.error
                // Errors reading from the user's
                // Request.Body are high priority.
                // Set it here before sending on the
                // channels below or calling
                // pc.close() which tears town
                // connections and causes other
                // errors.
                wr.req.setError(err)
            }
            if err == nil { //写入时没有错误,则刷新缓存
                err = pc.bw.Flush()
            }
            if err != nil { //错误处理
                wr.req.Request.closeBody()
                if pc.nwrite == startBytesWritten {
                    err = nothingWrittenError{err}
                }
            }
            pc.writeErrCh <- err // to the body reader, which might recycle us
            wr.ch <- err         // to the roundTrip function,将err发送到roundTrip,roundTrip根据是否为nil,来判断是否请求发送成功
            if err != nil {
                pc.close(err)
                return
            }
        case <-pc.closech:
            return
        }
    }
}

writeLoop相对比较简单,主要是向输入流中写入数据,监听pc.writech获取要写入到输入流的request相关数据,并写入到网络连接中。

到此新建连接的大致过程已经讲解完成。

RoundTrip方法源码中的通过getConn方法获取或新建连接我们已经了解,建立连接之后就可以数据的读写了,后面http事务主要在roundTrip方法中完成

roundTrip方法处理请求并等待响应
从上面新建连接我们知道每一个持久连接persistConn,都会有两个协程,一个处理输入流,一个处理输出流。输出流readLoop主要读取 reqch chan requestAndChan中的数据,读取到数据后会等待从网络连接中读取响应数据。输入流writeLoop主要处理writech chan writeRequest中的消息,并将该消息写入到网络连接中。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
    testHookEnterRoundTrip()
        ......

    /*
    将请求消息写入到输入流
    */
    startBytesWritten := pc.nwrite
    writeErrCh := make(chan error, 1)
    pc.writech <- writeRequest{req, writeErrCh, continueCh}

    //将请求消息发送到输出流,并等待返回
    resc := make(chan responseAndError)
    pc.reqch <- requestAndChan{
        req:        req.Request,
        ch:         resc,
        addedGzip:  requestedGzip,
        continueCh: continueCh,
        callerGone: gone,
    }

    var respHeaderTimer <-chan time.Time
    cancelChan := req.Request.Cancel
    ctxDoneChan := req.Context().Done()
    for {
        testHookWaitResLoop()
        select {
        case err := <-writeErrCh:
            if debugRoundTrip {
                req.logf("writeErrCh resv: %T/%#v", err, err)
            }
            if err != nil {
                pc.close(fmt.Errorf("write error: %v", err))
                return nil, pc.mapRoundTripError(req, startBytesWritten, err)
            }
            if d := pc.t.ResponseHeaderTimeout; d > 0 {
                if debugRoundTrip {
                    req.logf("starting timer for %v", d)
                }
                timer := time.NewTimer(d)
                defer timer.Stop() // prevent leaks
                respHeaderTimer = timer.C
            }
        case <-pc.closech:
            if debugRoundTrip {
                req.logf("closech recv: %T %#v", pc.closed, pc.closed)
            }
            return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
        case <-respHeaderTimer:
            if debugRoundTrip {
                req.logf("timeout waiting for response headers.")
            }
            pc.close(errTimeout)
            return nil, errTimeout
        case re := <-resc:  //等待获取response信息
            if (re.res == nil) == (re.err == nil) {
                panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
            }
            if debugRoundTrip {
                req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
            }
            if re.err != nil {
                return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
            }
            return re.res, nil
        case <-cancelChan:
            pc.t.CancelRequest(req.Request)
            cancelChan = nil
        case <-ctxDoneChan:
            pc.t.cancelRequest(req.Request, req.Context().Err())
            cancelChan = nil
            ctxDoneChan = nil
        }
    }
}

roundTrip方法的大致流程:
1.将请求消息写入到writech,将请求信息发送给输入流

2.将请求消息写入到reqch,等待服务端响应的消息

3.resc chan就是为了等待从服务端响应的消息。

4.返回从服务端响应的消息或错误信息

以上就是RoundTrip方法的主要流程,如果有理解不足或错误的地方还请指正。
————————————————
版权声明:本文为CSDN博主「思维的深度」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/skh2015java/article/details/89340215