var ( deadline = c.deadline() reqs []*Request resp *Response copyHeaders = c.makeHeadersCopier(req) reqBodyClosed = false// have we closed the current req.Body?
// Redirect behavior: redirectMethod string includeBody bool ) uerr := func(err error)error { // the body may have been closed already by c.send() if !reqBodyClosed { req.closeBody() } var urlStr string if resp != nil && resp.Request != nil { urlStr = stripPassword(resp.Request.URL) } else { urlStr = stripPassword(req.URL) } return &url.Error{ Op: urlErrorOp(reqs[0].Method), URL: urlStr, Err: err, } } for { // For all but the first request, create the next // request hop and replace req. iflen(reqs) > 0 { //省略了,不重要,看下面....... } //重点 reqs = append(reqs, req) var err error var didTimeout func()bool //重点是下面的send方法 if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body reqBodyClosed = true if !deadline.IsZero() && didTimeout() { err = &httpError{ // TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancellation/ err: err.Error() + " (Client.Timeout exceeded while awaiting headers)", timeout: true, } } returnnil, uerr(err) }
var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) if !shouldRedirect { return resp, nil }
req.closeBody()
可以看到,在调用do方法时,首先会调用send方法,我们再来看看send方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// didTimeout is non-nil only if err != nil. func(c *Client)send(req *Request, deadline time.Time)(resp *Response, didTimeout func()bool, errerror) { if c.Jar != nil { for _, cookie := range c.Jar.Cookies(req.URL) { req.AddCookie(cookie) } } //调用了send函数 resp, didTimeout, err = send(req, c.transport(), deadline) if err != nil { returnnil, didTimeout, err } if c.Jar != nil { if rc := resp.Cookies(); len(rc) > 0 { c.Jar.SetCookies(req.URL, rc) } } return resp, nil, nil }
funcsend(ireq *Request, rt RoundTripper, deadline time.Time)(resp *Response, didTimeout func()bool, errerror) { req := ireq // req is either the original request, or a modified fork
if rt == nil { req.closeBody() returnnil, alwaysFalse, errors.New("http: no Client.Transport or DefaultTransport") } //省略 //重点 stopTimer, didTimeout := setRequestCancel(req, rt, deadline) //调用了rt的RoundTrip方法 resp, err = rt.RoundTrip(req) if err != nil { stopTimer() if resp != nil { log.Printf("RoundTripper returned a response & error; ignoring response") } if tlsErr, ok := err.(tls.RecordHeaderError); ok { // If we get a bad TLS record header, check to see if the // response looks like HTTP and give a more helpful error. // See golang.org/issue/11111. ifstring(tlsErr.RecordHeader[:]) == "HTTP/" { err = errors.New("http: server gave HTTP response to HTTPS client") } } returnnil, didTimeout, err
// roundTrip implements a RoundTripper over HTTP. func(t *Transport)roundTrip(req *Request)(*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx := req.Context() trace := httptrace.ContextClientTrace(ctx)
if req.URL == nil { req.closeBody() returnnil, errors.New("http: nil Request.URL") } if req.Header == nil { req.closeBody() returnnil, errors.New("http: nil Request.Header") } scheme := req.URL.Scheme isHTTP := scheme == "http" || scheme == "https" if isHTTP { for k, vv := range req.Header { if !httpguts.ValidHeaderFieldName(k) { returnnil, fmt.Errorf("net/http: invalid header field name %q", k) } for _, v := range vv { if !httpguts.ValidHeaderFieldValue(v) { returnnil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k) } } } }
if t.useRegisteredProtocol(req) { altProto, _ := t.altProto.Load().(map[string]RoundTripper) if altRT := altProto[scheme]; altRT != nil { if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { return resp, err } } } if !isHTTP { req.closeBody() returnnil, &badStringError{"unsupported protocol scheme", scheme} } if req.Method != "" && !validMethod(req.Method) { returnnil, fmt.Errorf("net/http: invalid method %q", req.Method) } if req.URL.Host == "" { req.closeBody() returnnil, errors.New("http: no Host in request URL") }
for { select { case <-ctx.Done(): req.closeBody() returnnil, ctx.Err() default: }
// treq gets modified by roundTrip, so we need to recreate for each retry. treq := &transportRequest{Request: req, trace: trace} cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() returnnil, err }
// Get the cached or newly-created connection to either the // host (for http or https), the http proxy, or the http proxy // pre-CONNECTed to https server. In any case, we'll be ready // to send it requests. //重点是这个函数getConn pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() returnnil, err }
var resp *Response if pconn.alt != nil { // HTTP/2 path. t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } if err == nil { return resp, nil } if http2isNoCachedConnError(err) { t.removeIdleConn(pconn) } elseif !pconn.shouldRetryRequest(req, err) { // Issue 16465: return underlying net.Conn.Read error from peek, // as we've historically done. if e, ok := err.(transportReadFromServerError); ok { err = e.err } returnnil, err } testHookRoundTripRetried()
// Rewind the body if we're able to. if req.GetBody != nil { newReq := *req var err error newReq.Body, err = req.GetBody() if err != nil { returnnil, err } req = &newReq } } }
w := &wantConn{ cm: cm, key: cm.key(), ctx: ctx, ready: make(chanstruct{}, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } deferfunc() { if err != nil { w.cancel(t, err) } }() //尝试去空闲的连接池中寻找 //client针对每个host最多可以分别复用两个连接 // Queue for idle connection. if delivered := t.queueForIdleConn(w); delivered { pc := w.pc if trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func(error) {}) return pc, nil }
cancelc := make(chan error, 1) t.setReqCanceler(req, func(err error) { cancelc <- err }) //当没有在连接池中找到可用连接时,新建一个 // Queue for permission to dial. t.queueForDial(w) // Wait for completion or cancellation. select { case <-w.ready: // Trace success but only for HTTP/1. // HTTP/2 calls trace.GotConn itself. if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) } if w.err != nil { // If the request has been cancelled, that's probably // what caused w.err; if so, prefer to return the // cancellation error (see golang.org/issue/16049). select { case <-req.Cancel: returnnil, errRequestCanceledConn case <-req.Context().Done(): returnnil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } returnnil, err default: // return below } } return w.pc, w.err case <-req.Cancel: returnnil, errRequestCanceledConn case <-req.Context().Done(): returnnil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } returnnil, err } }
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { if t.connsPerHost == nil { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 go t.dialConnFor(w) return }
pc, err := t.dialConn(w.ctx, w.cm) delivered := w.tryDeliver(pc, err) if err == nil && (!delivered || pc.alt != nil) { // pconn was not passed to w, // or it is HTTP/2 and can be shared. // Add to the idle connection pool. t.putOrCloseIdleConn(pc) } if err != nil { t.decConnsPerHost(w.key) } }
// eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool. eofc := make(chanstruct{}) deferclose(eofc) // unblock reader on errors
// Read this once, before loop starts. (to avoid races in tests) testHookMu.Lock() testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead testHookMu.Unlock()
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false }
if !hasBody || bodyWritable { pc.t.setReqCanceler(rc.req, nil)
// Put the idle conn back into the pool before we send the response // so if they process it quickly and make another request, they'll // get this same conn. But we use the unbuffered channel 'rc' // to guarantee that persistConn.roundTrip got out of its select // potentially waiting for this persistConn to close. // but after alive = alive && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace)
if bodyWritable { closeErr = errCallerOwnsConn }
select { case rc.ch <- responseAndError{res: resp}: //利用chan将respone数据发送到上层 case <-rc.callerGone: return }
// Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so // we're allowed to exit now if needed (if alive is false) testHookReadLoopBeforeNextRead() continue }
waitForBodyRead := make(chanbool, 2) //这里是最关键的地方,close函数实际上就是调用了earlyCloseFn函数,后面详细讲 body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func()error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function returnnil
select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return }
// Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancellation or death) select { //内存泄露的元凶 case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) //尝试将connection放回连接池 if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false }
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false }
// A Client is an HTTP client. Its zero value (DefaultClient) is a // usable client that uses DefaultTransport. // // The Client's Transport typically has internal state (cached TCP // connections), so Clients should be reused instead of created as // needed. Clients are safe for concurrent use by multiple goroutines. // // A Client is higher-level than a RoundTripper (such as Transport) // and additionally handles HTTP details such as cookies and // redirects.