使用golang net/http库发送http请求,最后都是调用 transport的 RoundTrip方法中。
type RoundTripper interface {
RoundTrip(*Request) (*Response, error)
}
RoundTrip代表一个http事务,给一个请求返回一个响应。RoundTripper必须是并发安全的。RoundTripper接口的实现Transport结构体在源码包net/http/transport.go 中。
type Transport struct {
idleMu sync.Mutex
wantIdle bool // user has requested to close all idle conns 用户是否已关闭所有的空闲连接
idleConn map[connectMethodKey][]*persistConn // most recently used at end,保存从connectMethodKey(代表着不同的协议,不同的host,也就是不同的请求)到persistConn的映射
/*
idleConnCh 用来在并发http请求的时候在多个 goroutine 里面相互发送持久连接,也就是说,
这些持久连接是可以重复利用的, 你的http请求用某个persistConn用完了,
通过这个channel发送给其他http请求使用这个persistConn
*/
idleConnCh map[connectMethodKey]chan *persistConn
idleLRU connLRU
reqMu sync.Mutex
reqCanceler map[*Request]func(error) //请求取消器
altMu sync.Mutex // guards changing altProto only
altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme 为空或者map[string]RoundTripper,key为URI 的scheme,用于自定义的协议及对应的处理请求的RoundTripper
// Proxy specifies a function to return a proxy for a given
// Request. If the function returns a non-nil error, the
// request is aborted with the provided error.
//
// The proxy type is determined by the URL scheme. "http"
// and "socks5" are supported. If the scheme is empty,
// "http" is assumed.
//
// If Proxy is nil or returns a nil *URL, no proxy is used.
Proxy func(*Request) (*url.URL, error) //根据给定的Request返回一个代理,如果返回一个不为空的error,请求会终止
// DialContext specifies the dial function for creating unencrypted TCP connections.
// If DialContext is nil (and the deprecated Dial below is also nil),
// then the transport dials using package net.
/*
DialContext用于指定创建未加密的TCP连接的dial功能,如果该函数为空,则使用net包下的dial函数
*/
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
// Dial specifies the dial function for creating unencrypted TCP connections.
//
// Deprecated: Use DialContext instead, which allows the transport
// to cancel dials as soon as they are no longer needed.
// If both are set, DialContext takes priority.
/*
Dial获取一个tcp连接,也就是net.Conn结构,然后就可以写入request,从而获取到response
DialContext比Dial函数的优先级高
*/
Dial func(network, addr string) (net.Conn, error)
// DialTLS specifies an optional dial function for creating
// TLS connections for non-proxied HTTPS requests.
//
// If DialTLS is nil, Dial and TLSClientConfig are used.
//
// If DialTLS is set, the Dial hook is not used for HTTPS
// requests and the TLSClientConfig and TLSHandshakeTimeout
// are ignored. The returned net.Conn is assumed to already be
// past the TLS handshake.
/*
DialTLS 为创建非代理的HTTPS请求的TLS连接提供一个可选的dial功能
如果DialTLS为空,则使用Dial和TLSClientConfig
如果设置了DialTLS,则HTTPS的请求不使用Dial的钩子,并且TLSClientConfig 和 TLSHandshakeTimeout会被忽略
返回的net.Conn假设已经通过了TLS握手
*/
DialTLS func(network, addr string) (net.Conn, error)
// TLSClientConfig specifies the TLS configuration to use with
// tls.Client.
// If nil, the default configuration is used.
// If non-nil, HTTP/2 support may not be enabled by default.
/*
TLSClientConfig指定tls.Client使用的TLS配置信息
如果为空,则使用默认配置
如果不为空,默认情况下未启动HTTP/2支持
*/
TLSClientConfig *tls.Config
// TLSHandshakeTimeout specifies the maximum amount of time waiting to
// wait for a TLS handshake. Zero means no timeout.
/*
指定TLS握手的超时时间
*/
TLSHandshakeTimeout time.Duration
// DisableKeepAlives, if true, prevents re-use of TCP connections
// between different HTTP requests.
DisableKeepAlives bool //如果为true,则阻止在不同http请求之间重用TCP连接
// DisableCompression, if true, prevents the Transport from
// requesting compression with an "Accept-Encoding: gzip"
// request header when the Request contains no existing
// Accept-Encoding value. If the Transport requests gzip on
// its own and gets a gzipped response, it's transparently
// decoded in the Response.Body. However, if the user
// explicitly requested gzip it is not automatically
// uncompressed.
DisableCompression bool //如果为true,则进制传输使用 Accept-Encoding: gzip
// MaxIdleConns controls the maximum number of idle (keep-alive)
// connections across all hosts. Zero means no limit.
MaxIdleConns int //指定最大的空闲连接数
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
// (keep-alive) connections to keep per-host. If zero,
// DefaultMaxIdleConnsPerHost is used.
MaxIdleConnsPerHost int //用于控制某一个主机的连接的最大空闲数
// IdleConnTimeout is the maximum amount of time an idle
// (keep-alive) connection will remain idle before closing
// itself.
// Zero means no limit.
IdleConnTimeout time.Duration //指定空闲连接保持的最长时间,如果为0,则不受限制
// ResponseHeaderTimeout, if non-zero, specifies the amount of
// time to wait for a server's response headers after fully
// writing the request (including its body, if any). This
// time does not include the time to read the response body.
/*
ResponseHeaderTimeout,如果非零,则指定在完全写入请求(包括其正文,如果有)之后等待服务器响应头的最长时间。
此时间不包括读响应体的时间。
*/
ResponseHeaderTimeout time.Duration
// ExpectContinueTimeout, if non-zero, specifies the amount of
// time to wait for a server's first response headers after fully
// writing the request headers if the request has an
// "Expect: 100-continue" header. Zero means no timeout and
// causes the body to be sent immediately, without
// waiting for the server to approve.
// This time does not include the time to send the request header.
/*
如果请求头是"Expect:100-continue",ExpectContinueTimeout 如果不为0,它表示等待服务器第一次响应头的最大时间
零表示没有超时并导致正文立即发送,无需等待服务器批准。
此时间不包括发送请求标头的时间。
*/
ExpectContinueTimeout time.Duration
// TLSNextProto specifies how the Transport switches to an
// alternate protocol (such as HTTP/2) after a TLS NPN/ALPN
// protocol negotiation. If Transport dials an TLS connection
// with a non-empty protocol name and TLSNextProto contains a
// map entry for that key (such as "h2"), then the func is
// called with the request's authority (such as "example.com"
// or "example.com:1234") and the TLS connection. The function
// must return a RoundTripper that then handles the request.
// If TLSNextProto is not nil, HTTP/2 support is not enabled
// automatically.
/*
TLSNextProto指定在TLS NPN / ALPN协议协商之后传输如何切换到备用协议(例如HTTP / 2)。
如果传输使用非空协议名称拨打TLS连接并且TLSNextProto包含该密钥的映射条目(例如“h2”),则使用请求的权限调用func(例如“example.com”或“example” .com:1234“)和TLS连接。
该函数必须返回一个RoundTripper,然后处理该请求。 如果TLSNextProto不是nil,则不会自动启用HTTP / 2支持。
*/
TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
// ProxyConnectHeader optionally specifies headers to send to
// proxies during CONNECT requests.
/*
ProxyConnectHeader可选地指定在CONNECT请求期间发送给代理的标头。
*/
ProxyConnectHeader Header
// MaxResponseHeaderBytes specifies a limit on how many
// response bytes are allowed in the server's response
// header.
//
// Zero means to use a default limit.
/*
指定服务器返回的响应头的最大字节数
为0则使用默认的限制
*/
MaxResponseHeaderBytes int64
// nextProtoOnce guards initialization of TLSNextProto and
// h2transport (via onceSetNextProtoDefaults)
//nextProtoOnce保护 TLSNextProto和 h2transport 的初始化
nextProtoOnce sync.Once
h2transport *http2Transport // non-nil if http2 wired up,如果是http2连通,则不为nil
// TODO: tunable on max per-host TCP dials in flight (Issue 13957)
}
以上是Transport结构体及每个字段的主要功能,从中可以看到
idleConn 可以理解成 空闲的连接池,用于存放空闲的连接,从而使连接可以复用。
idleConnCh用来在并发http请求的时候在多个goroutine里相互发送persistConn,可以使persistConn持久化连接得到重复使用。
RoundTrip方法
Transport的核心方法时RoundTrip方法,该方法的工作流程基本如下:
RoundTrip方法源码如下:
//RoundTrip实现了RoundTripper接口
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
//初始化TLSNextProto http2使用
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
//获取请求的上下文
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
//错误处理
if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
//如果是http或https请求,对Header中的数据进行校验
if isHTTP {
for k, vv := range req.Header {
if !httplex.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httplex.ValidHeaderFieldValue(v) {
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}
//如果该scheme有自定义的RoundTrip,则使用自定义的RoundTrip处理request,并返回response
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
if altRT := altProto[scheme]; altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
}
//如果不是http请求,则关闭并退出
if !isHTTP {
req.closeBody()
return nil, &badStringError{"unsupported protocol scheme", scheme}
}
//对请求的Method进行校验
if req.Method != "" && !validMethod(req.Method) {
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
//请求的host为空,则返回
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}
for {
// treq gets modified by roundTrip, so we need to recreate for each retry.
//初始化transportRequest,transportRequest是request的包装器
treq := &transportRequest{Request: req, trace: trace}
//根据用户的请求信息获取connectMethod cm
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// Get the cached or newly-created connection to either the
// host (for http or https), the http proxy, or the http proxy
// pre-CONNECTed to https server. In any case, we'll be ready
// to send it requests.
//从缓存中获取一个连接,或者新建一个连接
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(req, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(req, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
return resp, nil
}
if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done.
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
testHookRoundTripRetried()
// Rewind the body if we're able to. (HTTP/2 does this itself so we only
// need to do it for HTTP/1.1 connections.)
if req.GetBody != nil && pconn.alt == nil {
newReq := *req
var err error
newReq.Body, err = req.GetBody()
if err != nil {
return nil, err
}
req = &newReq
}
}
}
该方法首先会进行一些校验,如果客户端请求的scheme有自定的RoundTrip,则使用自定义的RoundTrip处理request,并返回response。
该方法主要的请求处理逻辑在for循环里,首先会根据请求从空闲的连接池中获取一个连接或新建一个连接pconn。忽略HTTP/2请求的处理,其他常用的HTTP请求会调用roundTrip方法将客户端发送给服务器,并等待返回response。
getConn获取或新建连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
//GetConn是钩子函数在获取连接前调用
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
//如果可以获取到空闲的连接
if pc, idleSince := t.getIdleConn(cm); pc != nil {
if trace != nil && trace.GotConn != nil { //GotConn是钩子函数,成功获取连接后调用
trace.GotConn(pc.gotIdleConnTrace(idleSince))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
/*
将请求的canceler设置为某些非零函数,以便我们可以检测它是否在现在和我们进入roundTrip之间被清除
*/
t.setReqCanceler(req, func(error) {})
return pc, nil
}
type dialRes struct {
pc *persistConn
err error
}
dialc := make(chan dialRes)
// Copy these hooks so we don't race on the postPendingDial in
// the goroutine we launch. Issue 11136.
testHookPrePendingDial := testHookPrePendingDial
testHookPostPendingDial := testHookPostPendingDial
//该内部函数handlePendingDial的主要作用是,新开启一个协程,当新建连接完成后但没有被使用,将其放到连接池(缓存)中或将其关闭
handlePendingDial := func() {
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
t.putOrCloseIdleConn(v.pc)
}
testHookPostPendingDial()
}()
}
cancelc := make(chan error, 1)
t.setReqCanceler(req, func(err error) { cancelc <- err })
go func() {//开启一个协程新建一个连接
pc, err := t.dialConn(ctx, cm)
dialc <- dialRes{pc, err}
}()
idleConnCh := t.getIdleConnCh(cm)
select {
case v := <-dialc: //获取新建的连接
// Our dial finished.
if v.pc != nil { //如果新建的连接不为nil,则返回新建的连接
if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
}
return v.pc, nil
}
// Our dial failed. See why to return a nicer error
// value.
select {
case <-req.Cancel:
// It was an error due to cancelation, so prioritize that
// error value. (Issue 16049)
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// It wasn't an error due to cancelation, so
// return the original error message:
return nil, v.err
}
case pc := <-idleConnCh: //如果在新建连接的过程中,有空闲的连接,则返回该空闲的连接
// Another request finished first and its net.Conn
// became available before our dial. Or somebody
// else's dial that they didn't use.
// But our dial is still going, so give it away
// when it finishes:
//如果在dial连接的时候,有空闲的连接,但是这个时候我们仍然正在新建连接,所以当它新建完成后将其放到连接池或丢弃
handlePendingDial()
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
}
return pc, nil
case <-req.Cancel:
handlePendingDial()
return nil, errRequestCanceledConn
case <-req.Context().Done():
handlePendingDial()
return nil, req.Context().Err()
case err := <-cancelc:
handlePendingDial()
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
第一步:尝试从空闲的连接池中获取空闲连接(通过getIdleConn方法)
如果缓存中有空闲的连接,则获取空闲的连接,并从idleConn和idleLRU中删除该连接,getIdleConn方法的源码如下:
func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) {
key := cm.key()
t.idleMu.Lock()
defer t.idleMu.Unlock()
for {
pconns, ok := t.idleConn[key]
if !ok {
return nil, time.Time{}
}
if len(pconns) == 1 {
pconn = pconns[0]
delete(t.idleConn, key)
} else {
// 2 or more cached connections; use the most
// recently used one at the end.
pconn = pconns[len(pconns)-1]
t.idleConn[key] = pconns[:len(pconns)-1]
}
t.idleLRU.remove(pconn)
if pconn.isBroken() { //如果该连接被关闭,则继续从缓存中查找
// There is a tiny window where this is
// possible, between the connecting dying and
// the persistConn readLoop calling
// Transport.removeIdleConn. Just skip it and
// carry on.
continue
}
if pconn.idleTimer != nil && !pconn.idleTimer.Stop() {
// We picked this conn at the ~same time it
// was expiring and it's trying to close
// itself in another goroutine. Don't use it.
continue
}
return pconn, pconn.idleAt
}
}
前面我们提到过空闲的连接存放在idleConn字段中,该字段是map结构,可以通过客户端的请求信息(proxy,scheme,addr)来获取持久连接persistConn。getIdleConn方法尝试从空闲的连接池idleConn中获取空闲连接,如果获取到空闲的连接,则从该idleConn map中删除,并判断该连接是否被关闭,如果该连接已经被关闭则继续获取。如果可以获取到空闲连接则返回该连接,将使用该连接与服务器进行通讯,如果获取不到则新建连接。
第二步:如果空闲的连接池中没有可用的连接,则会调用dialConn方法新建连接
当我们无法从空闲的连接池中获取连接,就要新建连接。新建连接的大致过程如下:
首先 初始化dialc channel, 该channel用于等待新建的连接,如果连接创建成功则将创建的连接放入到dialc中
go func() {//开启一个协程新建一个连接
pc, err := t.dialConn(ctx, cm)
dialc <- dialRes{pc, err}
}()
handlePendingDial函数,该内部函数的主要作用用于开启一个协程,当新建连接成功但没有被使用,则通过该函数将其放到连接池中或将其关闭。
handlePendingDial := func() {
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
t.putOrCloseIdleConn(v.pc)
}
testHookPostPendingDial()
}()
}
其次 用select case 监听事件
1.监听连接是否重建成功,如果连接创建成功则返回该新建的连接
2.通过idleConnCh channel监听是否在创建连接的时候有空闲的连接,如果有空闲的连接则返回空闲连接,并调用handlePendingDial函数,处理新建的连接,将新建的连接放入到空闲的连接池中或将其关闭。
idleConnCh是为了多个http请求之间复用,当一个客户端的请求处理完成之后,首先会尝试将该连接写入到idleConnCh中,如果有其他http在监听等待该idleConnCh,则会写入成功。从而降低客户端请求的等待时间。如果该连接无法放入到idleConnCh中,则会尝试将该连接放入到idleConn中
3.在等待创建连接的过程中也在监听是否有取消客户端请求的消息,如果有也会调用handlePendingDial函数,并返回错误信息。
新建连接的过程
前面我们看到新建连接时,会开启一个协程来执行dialConn方法来创建连接,dialConn方法源码如下:
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
//初始化persistConn结构体pconn
pconn := &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
// Return a typed error, per Issue 16997
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
//如果scheme为https,并且DialTLS函数不为nil
if cm.scheme() == "https" && t.DialTLS != nil {
var err error
pconn.conn, err = t.DialTLS("tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
if pconn.conn == nil {
return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
}
if tc, ok := pconn.conn.(*tls.Conn); ok {
// Handshake here, in case DialTLS didn't. TLSNextProto below
// depends on it for knowing the connection state.
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
if err := tc.Handshake(); err != nil {
go pconn.conn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return nil, err
}
cs := tc.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
pconn.tlsState = &cs
}
} else {
//创建tcp连接
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
....... 处理代理等消息省略
//初始化br和bw
pconn.br = bufio.NewReader(pconn)
pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
新建连接的过程主要在dialConn方法中,新建连接的大致过程如下:
首先初始化persistConn结构体
创建连接,创建连接时区分https和http
连接创建成功后,会开启两个协程,一个用于处理输入流writeLoop,一个用于处理输出流readLoop
从中我们看到当客户端和服务端每建立一个连接,都会开启两个协程,一个处理输入流writeLoop,一个处理输出流readLoop。
readLoop方法
/*
从网络连接中读取消息并解析成Response
*/
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
//函数作用:尝试将pc放入空闲连接池中
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}
// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
eofc := make(chan struct{})
defer close(eofc) // unblock reader on errors
// Read this once, before loop starts. (to avoid races in tests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()
alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)
pc.mu.Lock()
if pc.numExpectedResponses == 0 {
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()
//从reqch通道中获取请求数据和等待返回的response的channel
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context()) //从请求的上下文中获取trace
var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace) //从网络连接中读取http的响应信息Response
} else {
err = transportReadFromServerError{err}
closeErr = err
}
if err != nil { //错误处理
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
pc.readLimit = maxInt64 // effictively no limit for response bodies
pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 //判断是否响应的消息有body
if resp.Close || rc.req.Close || resp.StatusCode <= 199 {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}
if !hasBody { //如果响应体没有body
pc.t.setReqCanceler(rc.req, nil) //将reqCanceler取消函数中对应的req删除
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
// but after
/*
在返回response之前,将空闲的conn放到连接池中
*/
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
select {
case rc.ch <- responseAndError{res: resp}: //将响应信息返回到roundTrip中
case <-rc.callerGone:
return
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
testHookReadLoopBeforeNextRead()
continue
}
waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
resp.Body = body
if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}
//将response返回到到roundTrip中
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancelation or death)
/*
等待返回的response中的body被读完后,才会将连接放入到连接池中,等待再次使用该连接
*/
select {
case bodyEOF := <-waitForBodyRead:
pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.req, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
}
readLoop方法的主要作用是从网络中读取消息并解析成Response返回
定义内部函数tryPutIdleConn,该函数的主要作用是将持久化连接persistConn放入到空闲连接池中
persistConn持久化连接中的reqch chan requestAndChan该channel是用于获取客户端的请求信息并等待返回的response。requestAndChan结构体如下:
type requestAndChan struct {
req *Request
ch chan responseAndError // unbuffered; always send in select on callerGone
// whether the Transport (as opposed to the user client code)
// added the Accept-Encoding gzip header. If the Transport
// set it, only then do we transparently decode the gzip.
addedGzip bool
// Optional blocking chan for Expect: 100-continue (for send).
// If the request has an "Expect: 100-continue" header and
// the server responds 100 Continue, readLoop send a value
// to writeLoop via this chan.
continueCh chan<- struct{}
callerGone <-chan struct{} // closed when roundTrip caller has returned
}
其中req是用户请求信息。ch是response和error相关信息,用于等待从服务端读取响应信息。continueCh用于判断100-continue的情况。
其中req是用户请求信息。ch是response和error相关信息,用于等待从服务端读取响应信息。continueCh用于判断100-continue的情况。
readLoop的大致流程如下:
1.如果连接正常则轮询读取要发送到服务端的客户端请求信息rc
2.调用readResponse方法从网络连接中读取http的响应信息并封装成Response
3.如果读取时有错误信息,则将error信息发送通过rc中的ch字段将错误信息返回
4.如果读取到的响应信息没有响应体(即Body),如果连接正常则尝试将连接放入到连接池中。并将响应信息通过rc中的ch channel发送到roundTrip中,从而响应给http客户端请求。
5.如果读取到的响应信息有响应体(即Body),则会将响应体进行封装,封装成bodyEOFSignal结构体,目的是为了当客户端读取响应体之后,才会将该连接放入到连接池中,等待再次被使用。
tryPutIdleConn将连接放入到空闲连接池中
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
........
waitingDialer := t.idleConnCh[key]
select {
case waitingDialer <- pconn:
// We're done with this pconn and somebody else is
// currently waiting for a conn of this type (they're
// actively dialing, but this conn is ready
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
return nil
default:
if waitingDialer != nil {
// They had populated this, but their dial won
// first, so we can clean up this map entry.
delete(t.idleConnCh, key)
}
}
if t.wantIdle {
return errWantIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
idles := t.idleConn[key]
if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdleHost
}
for _, exist := range idles {
if exist == pconn {
log.Fatalf("dup idle pconn %p in freelist", pconn)
}
}
t.idleConn[key] = append(idles, pconn)
t.idleLRU.add(pconn)
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
oldest := t.idleLRU.removeOldest()
oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest)
}
if t.IdleConnTimeout > 0 {
if pconn.idleTimer != nil {
pconn.idleTimer.Reset(t.IdleConnTimeout)
} else {
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
}
}
pconn.idleAt = time.Now()
return nil
}
tryPutIdleConn将持久化连接放入到空闲连接池的过程大致如下:
1.如果有其他http请求正在等待该连接,则将该连接写入到waitingDialer,从而使其他http请求复用
2.尝试将该连接放入到空闲的连接池中idleConn
writeLoop方法
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech: //获取到要写入到输入流中的request相关数据
startBytesWritten := pc.nwrite
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) //向网络连接中写入数据
if bre, ok := err.(requestBodyReadError); ok { //错误处理
err = bre.error
// Errors reading from the user's
// Request.Body are high priority.
// Set it here before sending on the
// channels below or calling
// pc.close() which tears town
// connections and causes other
// errors.
wr.req.setError(err)
}
if err == nil { //写入时没有错误,则刷新缓存
err = pc.bw.Flush()
}
if err != nil { //错误处理
wr.req.Request.closeBody()
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
pc.writeErrCh <- err // to the body reader, which might recycle us
wr.ch <- err // to the roundTrip function,将err发送到roundTrip,roundTrip根据是否为nil,来判断是否请求发送成功
if err != nil {
pc.close(err)
return
}
case <-pc.closech:
return
}
}
}
writeLoop相对比较简单,主要是向输入流中写入数据,监听pc.writech获取要写入到输入流的request相关数据,并写入到网络连接中。
到此新建连接的大致过程已经讲解完成。
RoundTrip方法源码中的通过getConn方法获取或新建连接我们已经了解,建立连接之后就可以数据的读写了,后面http事务主要在roundTrip方法中完成
roundTrip方法处理请求并等待响应
从上面新建连接我们知道每一个持久连接persistConn,都会有两个协程,一个处理输入流,一个处理输出流。输出流readLoop主要读取 reqch chan requestAndChan中的数据,读取到数据后会等待从网络连接中读取响应数据。输入流writeLoop主要处理writech chan writeRequest中的消息,并将该消息写入到网络连接中。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
......
/*
将请求消息写入到输入流
*/
startBytesWritten := pc.nwrite
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}
//将请求消息发送到输出流,并等待返回
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
for {
testHookWaitResLoop()
select {
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
case <-pc.closech:
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
case re := <-resc: //等待获取response信息
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
case <-cancelChan:
pc.t.CancelRequest(req.Request)
cancelChan = nil
case <-ctxDoneChan:
pc.t.cancelRequest(req.Request, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
}
roundTrip方法的大致流程:
1.将请求消息写入到writech,将请求信息发送给输入流
2.将请求消息写入到reqch,等待服务端响应的消息
3.resc chan就是为了等待从服务端响应的消息。
4.返回从服务端响应的消息或错误信息
以上就是RoundTrip方法的主要流程,如果有理解不足或错误的地方还请指正。
————————————————
版权声明:本文为CSDN博主「思维的深度」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/skh2015java/article/details/89340215