前言

首先来看看一个最常出错的地方,net/http包中,Response结构体Body的处理方法不当所导致的内存泄露问题

为什么要Close?

首先来看一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"fmt"
"io/ioutil"
"net"
"net/http"
"time"
)

func PrintLocalDial(network, addr string) (net.Conn, error) {
dial := net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}

conn, err := dial.Dial(network, addr)
if err != nil {
return conn, err
}

fmt.Println("connect done, use", conn.LocalAddr().String())

return conn, err
}

func doGet(client *http.Client, url string, id int) {
resp, err := client.Get(url)
if err != nil {
fmt.Println(err)
return
}
// io.Copy(ioutil.Discard, resp.Body)
// fmt.Println("copy")
buf, err := ioutil.ReadAll(resp.Body)
fmt.Printf("%d: %s -- %v\n", id, string(buf[0:1]), err)
if err := resp.Body.Close(); err == nil {
fmt.Println("close")
}
}

func main() {
client := &http.Client{
Transport: &http.Transport{
Dial: PrintLocalDial,
},
}
const URL = "https://www.baidu.com/"

for {
go doGet(client, URL, 1)
go doGet(client, URL, 2)
time.Sleep(2 * time.Second)
}
}

这段代码是标准的写法,首先发出Get请求,然后读Body中的数据,最后将Body用Close方法关闭

相信有不少人在首次编写类似程序的时候,一定有人告诉过你,一定要注意将Body关闭,不然会导致内存泄露的问题,那么,事实是否真的是这样呢?

我们来看看源码(Go 1.13)

首先我们来看看Do方法,只挑重点的代码段:

1
2
3
func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}

可以看到直接调用了私有方法do,再看看这个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (c *Client) do(req *Request) (retres *Response, reterr error) {
if testHookClientDoResult != nil {
defer func() { testHookClientDoResult(retres, reterr) }()
}
if req.URL == nil {
req.closeBody()
return nil, &url.Error{
Op: urlErrorOp(req.Method),
Err: errors.New("http: nil Request.URL"),
}
}

var (
deadline = c.deadline()
reqs []*Request
resp *Response
copyHeaders = c.makeHeadersCopier(req)
reqBodyClosed = false // have we closed the current req.Body?

// Redirect behavior:
redirectMethod string
includeBody bool
)
uerr := func(err error) error {
// the body may have been closed already by c.send()
if !reqBodyClosed {
req.closeBody()
}
var urlStr string
if resp != nil && resp.Request != nil {
urlStr = stripPassword(resp.Request.URL)
} else {
urlStr = stripPassword(req.URL)
}
return &url.Error{
Op: urlErrorOp(reqs[0].Method),
URL: urlStr,
Err: err,
}
}
for {
// For all but the first request, create the next
// request hop and replace req.
if len(reqs) > 0 {
//省略了,不重要,看下面.......
}
//重点
reqs = append(reqs, req)
var err error
var didTimeout func() bool
//重点是下面的send方法
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
// TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancellation/
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}

var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}

req.closeBody()

可以看到,在调用do方法时,首先会调用send方法,我们再来看看send方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
//调用了send函数
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}

又再次调用了send函数,继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
req := ireq // req is either the original request, or a modified fork

if rt == nil {
req.closeBody()
return nil, alwaysFalse, errors.New("http: no Client.Transport or DefaultTransport")
}
//省略
//重点
stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
//调用了rt的RoundTrip方法
resp, err = rt.RoundTrip(req)
if err != nil {
stopTimer()
if resp != nil {
log.Printf("RoundTripper returned a response & error; ignoring response")
}
if tlsErr, ok := err.(tls.RecordHeaderError); ok {
// If we get a bad TLS record header, check to see if the
// response looks like HTTP and give a more helpful error.
// See golang.org/issue/11111.
if string(tlsErr.RecordHeader[:]) == "HTTP/" {
err = errors.New("http: server gave HTTP response to HTTPS client")
}
}
return nil, didTimeout, err

可以看到,send函数实际上调用了RoundTripper这个interface的RoundTrip方法,也就是上面c.transport()类型所实现的RoundTrip方法,那么c.transport()返回值是什么类型呢?来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}
var DefaultTransport RoundTripper = &Transport{
Proxy: ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

我们发现是Transport类型,那来看看Transport类型是如何实现RoundTrip方法的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)

if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
if isHTTP {
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}

if t.useRegisteredProtocol(req) {
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
if altRT := altProto[scheme]; altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
}
}
if !isHTTP {
req.closeBody()
return nil, &badStringError{"unsupported protocol scheme", scheme}
}
if req.Method != "" && !validMethod(req.Method) {
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}

for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}

// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}

// Get the cached or newly-created connection to either the
// host (for http or https), the http proxy, or the http proxy
// pre-CONNECTed to https server. In any case, we'll be ready
// to send it requests.
//重点是这个函数getConn
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(req, nil)
req.closeBody()
return nil, err
}

var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(req, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
return resp, nil
}
if http2isNoCachedConnError(err) {
t.removeIdleConn(pconn)
} else if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done.
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
testHookRoundTripRetried()

// Rewind the body if we're able to.
if req.GetBody != nil {
newReq := *req
var err error
newReq.Body, err = req.GetBody()
if err != nil {
return nil, err
}
req = &newReq
}
}
}

可以看到主要是调用了getConn方法返回一个*persistConn类型的变量,继续跟进看看getConn是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}

w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()
//尝试去空闲的连接池中寻找
//client针对每个host最多可以分别复用两个连接
// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
if trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(req, func(error) {})
return pc, nil
}

cancelc := make(chan error, 1)
t.setReqCanceler(req, func(err error) { cancelc <- err })
//当没有在连接池中找到可用连接时,新建一个
// Queue for permission to dial.
t.queueForDial(w)
// Wait for completion or cancellation.
select {
case <-w.ready:
// Trace success but only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil {
// If the request has been cancelled, that's probably
// what caused w.err; if so, prefer to return the
// cancellation error (see golang.org/issue/16049).
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}

当我们新建连接时,若没有在Client结构体所维护的连接池中找到可用的连接,那么就调用queueForDial方法,这里需要注意,当Transport没有设置MaxIdleConnsPerHost这个值的情况下,client针对每个不同的host所能维持的connection默认值为2。

接下来看看queueForDial方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}

t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()

if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}

if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}

跟进dialConnFor方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()

pc, err := t.dialConn(w.ctx, w.cm)
delivered := w.tryDeliver(pc, err)
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}

继续跟dialConn方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
// 生成一个*persistConn对象
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
// Return a typed error, per Issue 16997
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
//下面是连接代码
if cm.scheme() == "https" && t.DialTLS != nil {
var err error
pconn.conn, err = t.DialTLS("tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
if pconn.conn == nil {
return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
}
if tc, ok := pconn.conn.(*tls.Conn); ok {
// Handshake here, in case DialTLS didn't. TLSNextProto below
// depends on it for knowing the connection state.
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
if err := tc.Handshake(); err != nil {
go pconn.conn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return nil, err
}
cs := tc.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
pconn.tlsState = &cs
}
} else {
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
//省略一部分不重要的代码
//重点来了
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
//起了俩goroutine,一个负责在conn上读,一个负责写
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil

兜兜绕绕终于来到了关键的地方,在connection成功建立之后,可以看见启动了两个goroutine来负责读写
我们挑readloop来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}

// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
eofc := make(chan struct{})
defer close(eofc) // unblock reader on errors

// Read this once, before loop starts. (to avoid races in tests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()

alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)

pc.mu.Lock()
if pc.numExpectedResponses == 0 {
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()

rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())

var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace) //从connection中读response
} else {
err = transportReadFromServerError{err}
closeErr = err
}

if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}

select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
pc.readLimit = maxInt64 // effictively no limit for response bodies

pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()

bodyWritable := resp.bodyIsWritable()
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}

if !hasBody || bodyWritable {
pc.t.setReqCanceler(rc.req, nil)

// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
// but after
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)

if bodyWritable {
closeErr = errCallerOwnsConn
}

select {
case rc.ch <- responseAndError{res: resp}: //利用chan将respone数据发送到上层
case <-rc.callerGone:
return
}

// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
testHookReadLoopBeforeNextRead()
continue
}

waitForBodyRead := make(chan bool, 2)
//这里是最关键的地方,close函数实际上就是调用了earlyCloseFn函数,后面详细讲
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil

},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}

resp.Body = body //利用bodyEOFSignal封装body
if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}

select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}

// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancellation or death)
select {
//内存泄露的元凶
case bodyEOF := <-waitForBodyRead:
pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace) //尝试将connection放回连接池
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.req, rc.req.Context().Err())
case <-pc.closech:
alive = false
}

testHookReadLoopBeforeNextRead()
}
}

我们可以看到,负责读response的goroutine会以alive为条件不断循环,这时想让此goroutine退出,需要将alive置为false,但是我们可以看到最后有一个select语句,在一切正常的情况下(下面的几个错误信号不来的情况下)应当是由waitForBodyRead这个chan来推进整个goroutine,但是从上方代码我们可以看到,想让此chan中被写入数据,只有两个回调函数earlyCloseFn和fn,那我们来看看这两个函数应当如何被触发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock()
closed, rerr := es.closed, es.rerr
es.mu.Unlock()
if closed {
return 0, errReadOnClosedResBody
}
if rerr != nil {
return 0, rerr
}

n, err = es.body.Read(p)
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err)
}
return
}

func (es *bodyEOFSignal) Close() error {
es.mu.Lock()
defer es.mu.Unlock()
if es.closed {
return nil
}
es.closed = true
if es.earlyCloseFn != nil && es.rerr != io.EOF {
return es.earlyCloseFn()
}
err := es.body.Close()
return es.condfn(err)
}

// caller must hold es.mu.
func (es *bodyEOFSignal) condfn(err error) error {
if es.fn == nil {
return err
}
err = es.fn(err)
es.fn = nil
return err
}

终于看到了我们心心念念的Close方法,可以看到Close方法在es.earlyCloseFn != nil && es.rerr != io.EOF的情况下会触发earlyCloseFn方法(这里有很多情况,后面详细讨论)

而当这一条件不满足时,Close方法将会调用condfn方法,仔细看,其实这一方法在上面的Read方法中也会调用,而Read方法实际和io里Reader有关,也就是说Read方法实际在我们读resp.Body的时候会触发(相当于我们在读出Body中数据的过程中会触发这个方法)

那么也就是说,如果我们既不读Body中的数据也不调用Close方法,那么这个goroutine和writeloop的goroutine将无法退出(因为writeloop是当readerloop这个goroutine退出时才会退出,可看readloop开头的defer语句),这就将导致大量的goroutine阻塞在select语句无法退出,而且占用了此persistconn,导致连接池中一直无法有空闲的persistconn放回,这会导致HTTP1.1直接退化到1.0(一个http请求一个tcp connection),相关资源也无法被gc回收,最终导致内存泄露的问题。

不过从代码上来看,实际上不仅仅是一定要调用Close方法才可以防止内存泄露,实际上如果能将Body数据读完也是可以的,但建议最好还是调用Close方法,因为也许有许多异常状态会发生(其实如果正常读写,并在读完后调用Close方法,类似于我最开始那个正确的程序,那么Close方法的判断条件中es.rerr会一直等于io.EOF,所以不是说调用了Close方法就会关闭读写的goroutine(实际在既读又close的情况下,只要connection不断开,Close方法只是关闭了io.Reader而已,而且connection的异常实际上也不是由Close方法负责,而是由readloop方法中的resp.Close来进行判断,所以实际上Close方法并非是我们所认为的关闭connection,释放资源(除了io.Reader出现错误时,或者没有读body时才是扮演这一角色),绝大多数情况下,Close方法只是用来关闭读取Body的io.Reader,与connection异常并无关联,也不会实际上执行退出goroutine以供gs释放资源的行为)

所以在正常情况下(尤其是既Close又read的情况下),调用Close方法实际就是关闭了io.Reader后调用了condfn方法,而且正常情况下,condfn方法中的es.fn其实一直是nil,直到Body中的数据被读完,此时Read方法再次调用condfn方法时,es.fn才会被赋值为之前在readloop中定义的回调函数的地址(猜想这也是为了保证先等上层读完数据之后才继续接着取数据),从而将goroutine不阻塞在select,并把connection重新放回连接池备用(此时goroutine也不会退出,因为err是io.Eof,alive并不会变为false导致退出循环)),当Read操作做完之后,Close方法关闭Reader,调用condfn方法,但这时候es.fn又变为了nil(读body的操作已经做完,es.fn重新变为nil,其实可以它看作驱动器,只有当上层读完数据,需要读写goroutine进行下一次循环时,才会被赋值,执行回调函数,推进整个循环结构的进行,防止阻塞),故而不会执行任何其余操作了。

简而言之,每一个connection都有一对goroutine负责读写,在读写完一次之后,readloop这个routine会负责将这个connection放回连接池,并且俩goroutine都不会退出,等到connection上有新的request时,这两个goroutine将会再次被激活,如此往复。

KeepAlive机制

继续上文,那么什么时候Close函数会被调用并使得goroutine退出呢?那就是当我一开始的代码改为下面这样的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func doGet(client *http.Client, url string, id int) {
resp, err := client.Get(url)
if err != nil {
fmt.Println(err)
return
}
//io.Copy(ioutil.Discard, resp.Body)
//fmt.Println("copy")
// buf, err := ioutil.ReadAll(resp.Body)
// fmt.Printf("%d: %s -- %v\n", id, string(buf[0:1]), err)
if err := resp.Body.Close(); err == nil {
fmt.Println("close")
}
}

此时我们不读Body中的数据,直接关闭,那么这个时候,Close函数中的es.rerr会变为nil,从而触发earlyClosefn函数,使得读写goroutine全部退出,connection关闭

在这个问题上,我还想讨论一个机制,那就是keepalive的机制,我们都知道keepalive是旨在让connection更多的承载http信息,而不用一个http创建一个tcp,从而节省资源。

在go中,我们可以通过Transport的Disablekeepalive选项来选择关闭或者启用,当我们关闭了keepalive机制的时候,那么每一个http请求都会新建一个tcp连接,而每个连接将都不会放入client的连接池中,这意味着所有连接都不可复用,两个负责读写的goroutine也是只处理一个request后立即退出

这里有一点要注意,那就是如果你启用了keepalive选项,并不代表你的所有连接真的可以复用了,因为我在实际测试中发现,如果对方服务器的返回头当中Connection这一项置空,那么哪怕你自己启用了,实际上还是有可能无法复用的,这一点很奇怪,因为我看了不少资料表示http1.1应该是默认keepalive,除非显式指明Connection头为close才会导致不启用复用的情况,但是在实际测试中我发现这在一定程度上不是很准确,并不是所有的都默认启用,有些特例是不会的,例如python的SimpleHttpServer,就是默认不启用,返回的头中也没有keealive,所以具体还是看实现

做了个实验,证实了就算服务端包头没有connection alive也可能会keepalive,主要还是看服务器具体如何实现的(绝大多数都是默认支持keep的,不管返回包是否写明,因为http1.1默认实现),附上一张抓包图

capture

可以看见虽然服务器返回头当中没有keepalive(客户端request有keepalive),但是仍然保持了长连接并没有断开,说明还是默认实现了keepalive机制

注:我这里举的例子是在两边均同意复用的情况下才称为keepalive机制启用,vice versa。

keepalive关闭时分四种情况:

首先每次都会新建连接,读写goroutine也都是新的

(1)如果我们既读Body数据,也close了body,那么最后导致读写goroutine退出的将是因为下方代码中resp.Close变为true(因为这个标志表示连接已经断开,不可读,并且由于这句判断语句是在goroutine读取了connection之后判断的,所以实际上在第一次的request读完之后就已经变为了false)->Read方法->condfn方法->goroutine解除阻塞,此时由于alive已经变为false,从而使得两个goroutine在下个循环之前退出。

1
2
3
4
5
6
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}

(2)如果我们没有读数据,直接Close了Body,那么最后导致读写goroutine退出的将是由Close方法中es.rerr等于nil导致earlyCloseFn方法被触发,从而直接退出goroutine(er.rerr正常情况下应该是io.EOF,表示body在被正常读取):

1
2
3
if es.earlyCloseFn != nil && es.rerr != io.EOF {
return es.earlyCloseFn()
}

(3)如果我们只读数据,不Close,那么情况和第一种一致,但是如果不Close,那么读body的io.Reader将不会被关闭,没关闭的多了,也可能会内存泄露(因为Close方法代码里有err := es.body.Close()),详情可看下面,写在后面了

(4)如果我们不读也不Close,这里由于keepalive本身就被关闭,connection在一次resquest/response后也失效了,但是由于goroutine一直阻塞,无法退出,所以占用的资源一直无法释放,最后会导致内存泄露

keepalive开启时也分四种情况:

首先每次连接只要连接池中有足够的空闲connection,则不需要新建,可以直接复用,默认情况下每个host对应最多两个connection

(1)如果我们既读Body数据,也close了body,那么读写goroutine在keepalive到期之前将不会退出,一直会重复处理connection数据,将connection放回连接池,处理connection数据。。。这样的循环操作,直到keepalive到期,然后就和keepalive关闭时第一种情况一样了

(2)如果我们不读数据,直接close,那么keepalive将会失效(其实不是真的失效,只是因为不读数据的情况下,es.rerr为nil,导致goroutine被退出,无法放入连接池以供复用,所以看起来好像是失效了,实际只是我们强行关闭了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock()
closed, rerr := es.closed, es.rerr
es.mu.Unlock()
if closed {
return 0, errReadOnClosedResBody
}
if rerr != nil {
return 0, rerr
}

n, err = es.body.Read(p)
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
//因为没有调用read方法,所以es.rerr一直是nil,无法被赋新值,故而close函数中条件被满足了
es.rerr = err
}
fmt.Println("wo zai read han shu zheer")
err = es.condfn(err)
}
return
}

(3)如果我们只读数据,不close,和第一种一致,但是如果不Close,那么读body的io.Reader将不会被关闭,没关闭的多了,也可能会内存泄露(因为Close方法代码里有err := es.body.Close())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (es *bodyEOFSignal) Close() error {
fmt.Println("wori...")
es.mu.Lock()
defer es.mu.Unlock()
if es.closed {
return nil
}
es.closed = true
//es.rerr应该就是记录reader的上一个状态,如果是EOF,那么说明是正常的读写完成,无需退出goroutine
if es.earlyCloseFn != nil && es.rerr != io.EOF {
fmt.Print("i am close")
return es.earlyCloseFn()
}
//正常读数据的话会走到这里而不是在上面的if就返回
//下面这行应该就是关闭了读Body的io.Reader
err := es.body.Close()
fmt.Println("err is:", err)
return es.condfn(err)
}

(4)如果我们既不读也不close,那么keepalive将会失效,因为读写goroutine被阻塞,无法将connection放入连接池,导致后续数据传输无法复用connection,只能一个http请求一个tcp连接,最终导致内存泄露。

另一个小坑点

切记不要在每个请求里都新建一个client结构体。。。也就是说下面的代码是错误的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"net/http"
"time"
)

func main() {
for {
test()
}
}
func test() {
transport := http.Transport{
DisableKeepAlives: true,
}

client := &http.Client{
Transport: &transport,
Timeout: 10 * time.Second,
}
request, _ := http.NewRequest("GET", target, nil)

resp, err := client.Do(request)

}

这样会导致内存疯长,实测。。。

而且在源码中,作者也有注释:

1
2
3
4
5
6
7
8
9
10
// A Client is an HTTP client. Its zero value (DefaultClient) is a
// usable client that uses DefaultTransport.
//
// The Client's Transport typically has internal state (cached TCP
// connections), so Clients should be reused instead of created as
// needed. Clients are safe for concurrent use by multiple goroutines.
//
// A Client is higher-level than a RoundTripper (such as Transport)
// and additionally handles HTTP details such as cookies and
// redirects.

因为client结构底层维护了一个连接池,所以不需要每次都新建,正确代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"net/http"
"time"
)

var (
transport = http.Transport{
DisableKeepAlives: true,
}

client = &http.Client{
Transport: &transport,
Timeout: 10 * time.Second,
}
)

func main() {
for {
test()
}
}
func test() {

request, _ := http.NewRequest("GET", target, nil)

resp, err := client.Do(request)

}

结语

其实看似简单的功能背后也有很多玄机,随便看看都是一个个大坑(笑)