golang 可以通过 ReverseProxy 实现了服务器代理相关的功能
ReverseProxy 位于 net.http.httputil 包下
实现功能:
- 支持自定义修改响应内容
- 支持连接池
- 支持错误信息自定义处理
- 支持 websocket 服务
- 支持自定义负载均衡策略
- 支持 https 代理
- 支持 url 重写
源码分析
首先我们通过 ReverseProxy 实现一个简单的反向代理
var (
openAddr = "127.0.0.1:2002"
proxyAddr = "http://127.0.0.1:2003/base"
)
/**
* @Author: yang
* @Description:通过 Reverse_Proxy 实现简单的代理
* @Date: 2021/3/31 14:19
*/
func main() {
u, err := url.Parse(proxyAddr)
if err != nil {
log.Println(err)
}
proxy := httputil.NewSingleHostReverseProxy(u)
log.Println("Server is starting :" + openAddr)
log.Fatalln(http.ListenAndServe(openAddr, proxy))
}
我们监听本机 2002 端口,将请求代理到 http://127.0.0.1:2003/base 中
其实我们可以发现代理的实现是通过 httputil.NewSingleHostReverseProxy 返回对应的 handler;
我们可以进去看看这个方法
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
targetQuery := target.RawQuery
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
// explicitly disable User-Agent so it's not set to default value
req.Header.Set("User-Agent", "")
}
}
return &ReverseProxy{Director: director}
}
最后我们可以看到核心返回的的是 ReverseProxy 结构体
type ReverseProxy struct {
// 控制器 是一个函数,函数内容可以对请求进行修改
Director func(*http.Request)
// 连接池,如果为 nil,则使用 http.DefaultTransport
Transport http.RoundTripper
// 刷新内容到客户端的时间间隔
FlushInterval time.Duration
// 错误记录器
ErrorLog *log.Logger
// 缓冲池,在复制 http 响应时使用,用以提高请求效率
BufferPool BufferPool
// 可自定义修改响应的函数
ModifyResponse func(*http.Response) error
// 错误处理回调函数,如果为 nil,则遇到错误会显示 502
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
我们可以看 ReverseProxy 可以传入 http.ListenAndServe 是因为他实现了 ServeHTTP 方法;而这个方法就是具体的代理流程;下面我们一步步拆分看下。。
step 1 连接池设置
如果没有配置,则使用 http 的默认连接池
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}
step 2 验证是否请求终止
从请求中获取上下文, 然后再 rw.(http.CloseNotifier) 转型下,获取到 CloseNotifier 通过 CloseNotifier 监听 请求的channel;
其中http.CloseNotifier是一个接口,只有一个方法CloseNotify() <-chan bool,作用是检测连接是否断开 (比如:关闭浏览器,网络断开等中断请求)
// 获取请求的上下文
ctx := req.Context()
if cn, ok := rw.(http.CloseNotifier); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// 获取到请求信号channel
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel()
case <-ctx.Done():
}
}()
}
step 3 设置请求的ctx信息
获取上游的 request 用于代理时访问下游的 request
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}
step 4 如果上下文中 header 为 nil,则使用 http 的 header 给该 ctx
if outreq.Header == nil {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
step 5 修改req
Director 为 ReverseProxy 的字段之一,可以自定义修改,在 NewSingleHostReverseProxy 方法中有实现案例
outreq.Close = false:将请求头的 Close 字段置为 false,也就是说客户端请求触发到下游的时候,会产生一条链接,保证这条链接是可复用的
p.Director(outreq)
outreq.Close = false
step 6 Upgrade 头的特殊处理
// 先判断请求头 Connection 字段中是否包含 Upgrade 单词,有的话取出返回,没有返回空字符串
reqUpType := upgradeType(outreq.Header)
删除 http.header['Connection']中列出的 hop-by-hop 头信息,所有 Connection 中设置的 key 都删除掉。
removeConnectionHeaders(outreq.Header)
step 7 处理 hop-by-hop 的 header,除了 Te 和 trailers 都删除掉
逐段消息头是客户端和第一层代理之间的消息头,与是否往下传递的 header 信息没有联系,往下游传递的信息里不应该包含这些逐段消息头。
for _, h := range hopHeaders {
hv := outreq.Header.Get(h)
if hv == "" {
continue
}
if h == "Te" && hv == "trailers" {
// Issue 21096: tell backend applications that
// care about trailer support that we support
// trailers. (We do, but we don't go out of
// our way to advertise that unless the
// incoming client request thought it was
// worth mentioning)
continue
}
outreq.Header.Del(h)
}
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
step 8 追加clientIp
其实就是往 X-Forwarded-For header头中添加IP链
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
if prior, ok := outreq.Header["X-Forwarded-For"]; ok {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
outreq.Header.Set("X-Forwarded-For", clientIP)
}
step 9 向下游请求数据
直接通过连接池获取下游数据
res, err := transport.RoundTrip(outreq)
if err != nil {
p.getErrorHandler()(rw, outreq, err)
return
}
step 10 处理升级协议请求
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
step 11 移除逐段头部
首先是删除 Connection中的消息头,然后是删除 hopHeaders定义的消息头
removeConnectionHeaders(res.Header)
for _, h := range hopHeaders {
res.Header.Del(h)
}
step 12 修改返回内容
modifyResponse 底层是调用 ReverseProxy结构体中的ModifyResponse函数,函数内容由开发者自定义
if !p.modifyResponse(rw, res, outreq) {
return
}
step 13 拷贝头部的数据
其实就是代理返回了什么结果,就将内容返回客户端
rw.Header():上游(客户端)的头部数据
res.Header:下游(代理层)返回的头部数据
copyHeader(rw.Header(), res.Header)
step 14 写入状态码
将下游响应的状态码写入上游(客户端)的状态码中
res.StatusCode: 下游(代理层)返回的状态码数据
rw.WriteHeader(res.StatusCode)
step 14 周期刷新内容到 response
周期性把缓冲池里面的内容拷贝数据 (下游–》上游)
err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
if err != nil {
defer res.Body.Close()
// Since we're streaming the response, if we run into an error all we can do
// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
// on read error while copying body.
if !shouldPanicOnCopyError(req) {
p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
return
}
panic(http.ErrAbortHandler)
}
res.Body.Close() // close now, instead of defer, to populate res.Trailer