http2_client.go 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "io"
  21. "math"
  22. "net"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "golang.org/x/net/context"
  29. "golang.org/x/net/http2"
  30. "golang.org/x/net/http2/hpack"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/credentials"
  33. "google.golang.org/grpc/internal/channelz"
  34. "google.golang.org/grpc/keepalive"
  35. "google.golang.org/grpc/metadata"
  36. "google.golang.org/grpc/peer"
  37. "google.golang.org/grpc/stats"
  38. "google.golang.org/grpc/status"
  39. )
  40. // http2Client implements the ClientTransport interface with HTTP2.
  41. type http2Client struct {
  42. ctx context.Context
  43. cancel context.CancelFunc
  44. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  45. userAgent string
  46. md interface{}
  47. conn net.Conn // underlying communication channel
  48. loopy *loopyWriter
  49. remoteAddr net.Addr
  50. localAddr net.Addr
  51. authInfo credentials.AuthInfo // auth info about the connection
  52. readerDone chan struct{} // sync point to enable testing.
  53. writerDone chan struct{} // sync point to enable testing.
  54. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  55. // that the server sent GoAway on this transport.
  56. goAway chan struct{}
  57. // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
  58. awakenKeepalive chan struct{}
  59. framer *framer
  60. // controlBuf delivers all the control related tasks (e.g., window
  61. // updates, reset streams, and various settings) to the controller.
  62. controlBuf *controlBuffer
  63. fc *trInFlow
  64. // The scheme used: https if TLS is on, http otherwise.
  65. scheme string
  66. isSecure bool
  67. perRPCCreds []credentials.PerRPCCredentials
  68. // Boolean to keep track of reading activity on transport.
  69. // 1 is true and 0 is false.
  70. activity uint32 // Accessed atomically.
  71. kp keepalive.ClientParameters
  72. keepaliveEnabled bool
  73. statsHandler stats.Handler
  74. initialWindowSize int32
  75. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  76. maxSendHeaderListSize *uint32
  77. bdpEst *bdpEstimator
  78. // onSuccess is a callback that client transport calls upon
  79. // receiving server preface to signal that a succefull HTTP2
  80. // connection was established.
  81. onSuccess func()
  82. maxConcurrentStreams uint32
  83. streamQuota int64
  84. streamsQuotaAvailable chan struct{}
  85. waitingStreams uint32
  86. nextID uint32
  87. mu sync.Mutex // guard the following variables
  88. state transportState
  89. activeStreams map[uint32]*Stream
  90. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  91. prevGoAwayID uint32
  92. // goAwayReason records the http2.ErrCode and debug data received with the
  93. // GoAway frame.
  94. goAwayReason GoAwayReason
  95. // Fields below are for channelz metric collection.
  96. channelzID int64 // channelz unique identification number
  97. czData *channelzData
  98. onGoAway func(GoAwayReason)
  99. onClose func()
  100. }
  101. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  102. if fn != nil {
  103. return fn(ctx, addr)
  104. }
  105. return dialContext(ctx, "tcp", addr)
  106. }
  107. func isTemporary(err error) bool {
  108. switch err := err.(type) {
  109. case interface {
  110. Temporary() bool
  111. }:
  112. return err.Temporary()
  113. case interface {
  114. Timeout() bool
  115. }:
  116. // Timeouts may be resolved upon retry, and are thus treated as
  117. // temporary.
  118. return err.Timeout()
  119. }
  120. return true
  121. }
  122. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  123. // and starts to receive messages on it. Non-nil error returns if construction
  124. // fails.
  125. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  126. scheme := "http"
  127. ctx, cancel := context.WithCancel(ctx)
  128. defer func() {
  129. if err != nil {
  130. cancel()
  131. }
  132. }()
  133. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  134. if err != nil {
  135. if opts.FailOnNonTempDialError {
  136. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  137. }
  138. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  139. }
  140. // Any further errors will close the underlying connection
  141. defer func(conn net.Conn) {
  142. if err != nil {
  143. conn.Close()
  144. }
  145. }(conn)
  146. var (
  147. isSecure bool
  148. authInfo credentials.AuthInfo
  149. )
  150. transportCreds := opts.TransportCredentials
  151. perRPCCreds := opts.PerRPCCredentials
  152. if b := opts.CredsBundle; b != nil {
  153. if t := b.TransportCredentials(); t != nil {
  154. transportCreds = t
  155. }
  156. if t := b.PerRPCCredentials(); t != nil {
  157. perRPCCreds = append(perRPCCreds, t)
  158. }
  159. }
  160. if transportCreds != nil {
  161. scheme = "https"
  162. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
  163. if err != nil {
  164. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  165. }
  166. isSecure = true
  167. }
  168. kp := opts.KeepaliveParams
  169. // Validate keepalive parameters.
  170. if kp.Time == 0 {
  171. kp.Time = defaultClientKeepaliveTime
  172. }
  173. if kp.Timeout == 0 {
  174. kp.Timeout = defaultClientKeepaliveTimeout
  175. }
  176. dynamicWindow := true
  177. icwz := int32(initialWindowSize)
  178. if opts.InitialConnWindowSize >= defaultWindowSize {
  179. icwz = opts.InitialConnWindowSize
  180. dynamicWindow = false
  181. }
  182. writeBufSize := opts.WriteBufferSize
  183. readBufSize := opts.ReadBufferSize
  184. maxHeaderListSize := defaultClientMaxHeaderListSize
  185. if opts.MaxHeaderListSize != nil {
  186. maxHeaderListSize = *opts.MaxHeaderListSize
  187. }
  188. t := &http2Client{
  189. ctx: ctx,
  190. ctxDone: ctx.Done(), // Cache Done chan.
  191. cancel: cancel,
  192. userAgent: opts.UserAgent,
  193. md: addr.Metadata,
  194. conn: conn,
  195. remoteAddr: conn.RemoteAddr(),
  196. localAddr: conn.LocalAddr(),
  197. authInfo: authInfo,
  198. readerDone: make(chan struct{}),
  199. writerDone: make(chan struct{}),
  200. goAway: make(chan struct{}),
  201. awakenKeepalive: make(chan struct{}, 1),
  202. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  203. fc: &trInFlow{limit: uint32(icwz)},
  204. scheme: scheme,
  205. activeStreams: make(map[uint32]*Stream),
  206. isSecure: isSecure,
  207. perRPCCreds: perRPCCreds,
  208. kp: kp,
  209. statsHandler: opts.StatsHandler,
  210. initialWindowSize: initialWindowSize,
  211. onSuccess: onSuccess,
  212. nextID: 1,
  213. maxConcurrentStreams: defaultMaxStreamsClient,
  214. streamQuota: defaultMaxStreamsClient,
  215. streamsQuotaAvailable: make(chan struct{}, 1),
  216. czData: new(channelzData),
  217. onGoAway: onGoAway,
  218. onClose: onClose,
  219. }
  220. t.controlBuf = newControlBuffer(t.ctxDone)
  221. if opts.InitialWindowSize >= defaultWindowSize {
  222. t.initialWindowSize = opts.InitialWindowSize
  223. dynamicWindow = false
  224. }
  225. if dynamicWindow {
  226. t.bdpEst = &bdpEstimator{
  227. bdp: initialWindowSize,
  228. updateFlowControl: t.updateFlowControl,
  229. }
  230. }
  231. // Make sure awakenKeepalive can't be written upon.
  232. // keepalive routine will make it writable, if need be.
  233. t.awakenKeepalive <- struct{}{}
  234. if t.statsHandler != nil {
  235. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  236. RemoteAddr: t.remoteAddr,
  237. LocalAddr: t.localAddr,
  238. })
  239. connBegin := &stats.ConnBegin{
  240. Client: true,
  241. }
  242. t.statsHandler.HandleConn(t.ctx, connBegin)
  243. }
  244. if channelz.IsOn() {
  245. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "")
  246. }
  247. if t.kp.Time != infinity {
  248. t.keepaliveEnabled = true
  249. go t.keepalive()
  250. }
  251. // Start the reader goroutine for incoming message. Each transport has
  252. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  253. // dispatches the frame to the corresponding stream entity.
  254. go t.reader()
  255. // Send connection preface to server.
  256. n, err := t.conn.Write(clientPreface)
  257. if err != nil {
  258. t.Close()
  259. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  260. }
  261. if n != len(clientPreface) {
  262. t.Close()
  263. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  264. }
  265. var ss []http2.Setting
  266. if t.initialWindowSize != defaultWindowSize {
  267. ss = append(ss, http2.Setting{
  268. ID: http2.SettingInitialWindowSize,
  269. Val: uint32(t.initialWindowSize),
  270. })
  271. }
  272. if opts.MaxHeaderListSize != nil {
  273. ss = append(ss, http2.Setting{
  274. ID: http2.SettingMaxHeaderListSize,
  275. Val: *opts.MaxHeaderListSize,
  276. })
  277. }
  278. err = t.framer.fr.WriteSettings(ss...)
  279. if err != nil {
  280. t.Close()
  281. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  282. }
  283. // Adjust the connection flow control window if needed.
  284. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  285. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  286. t.Close()
  287. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  288. }
  289. }
  290. t.framer.writer.Flush()
  291. go func() {
  292. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  293. err := t.loopy.run()
  294. if err != nil {
  295. errorf("transport: loopyWriter.run returning. Err: %v", err)
  296. }
  297. // If it's a connection error, let reader goroutine handle it
  298. // since there might be data in the buffers.
  299. if _, ok := err.(net.Error); !ok {
  300. t.conn.Close()
  301. }
  302. close(t.writerDone)
  303. }()
  304. return t, nil
  305. }
  306. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  307. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  308. s := &Stream{
  309. done: make(chan struct{}),
  310. method: callHdr.Method,
  311. sendCompress: callHdr.SendCompress,
  312. buf: newRecvBuffer(),
  313. headerChan: make(chan struct{}),
  314. contentSubtype: callHdr.ContentSubtype,
  315. }
  316. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  317. s.requestRead = func(n int) {
  318. t.adjustWindow(s, uint32(n))
  319. }
  320. // The client side stream context should have exactly the same life cycle with the user provided context.
  321. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  322. // So we use the original context here instead of creating a copy.
  323. s.ctx = ctx
  324. s.trReader = &transportReader{
  325. reader: &recvBufferReader{
  326. ctx: s.ctx,
  327. ctxDone: s.ctx.Done(),
  328. recv: s.buf,
  329. },
  330. windowHandler: func(n int) {
  331. t.updateWindow(s, uint32(n))
  332. },
  333. }
  334. return s
  335. }
  336. func (t *http2Client) getPeer() *peer.Peer {
  337. pr := &peer.Peer{
  338. Addr: t.remoteAddr,
  339. }
  340. // Attach Auth info if there is any.
  341. if t.authInfo != nil {
  342. pr.AuthInfo = t.authInfo
  343. }
  344. return pr
  345. }
  346. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  347. aud := t.createAudience(callHdr)
  348. authData, err := t.getTrAuthData(ctx, aud)
  349. if err != nil {
  350. return nil, err
  351. }
  352. callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
  353. if err != nil {
  354. return nil, err
  355. }
  356. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  357. // first and create a slice of that exact size.
  358. // Make the slice of certain predictable size to reduce allocations made by append.
  359. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  360. hfLen += len(authData) + len(callAuthData)
  361. headerFields := make([]hpack.HeaderField, 0, hfLen)
  362. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  363. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  364. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  365. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  366. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  367. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  368. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  369. if callHdr.PreviousAttempts > 0 {
  370. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  371. }
  372. if callHdr.SendCompress != "" {
  373. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  374. }
  375. if dl, ok := ctx.Deadline(); ok {
  376. // Send out timeout regardless its value. The server can detect timeout context by itself.
  377. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  378. timeout := dl.Sub(time.Now())
  379. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  380. }
  381. for k, v := range authData {
  382. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  383. }
  384. for k, v := range callAuthData {
  385. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  386. }
  387. if b := stats.OutgoingTags(ctx); b != nil {
  388. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  389. }
  390. if b := stats.OutgoingTrace(ctx); b != nil {
  391. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  392. }
  393. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  394. var k string
  395. for _, vv := range added {
  396. for i, v := range vv {
  397. if i%2 == 0 {
  398. k = v
  399. continue
  400. }
  401. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  402. if isReservedHeader(k) {
  403. continue
  404. }
  405. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  406. }
  407. }
  408. for k, vv := range md {
  409. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  410. if isReservedHeader(k) {
  411. continue
  412. }
  413. for _, v := range vv {
  414. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  415. }
  416. }
  417. }
  418. if md, ok := t.md.(*metadata.MD); ok {
  419. for k, vv := range *md {
  420. if isReservedHeader(k) {
  421. continue
  422. }
  423. for _, v := range vv {
  424. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  425. }
  426. }
  427. }
  428. return headerFields, nil
  429. }
  430. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  431. // Create an audience string only if needed.
  432. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  433. return ""
  434. }
  435. // Construct URI required to get auth request metadata.
  436. // Omit port if it is the default one.
  437. host := strings.TrimSuffix(callHdr.Host, ":443")
  438. pos := strings.LastIndex(callHdr.Method, "/")
  439. if pos == -1 {
  440. pos = len(callHdr.Method)
  441. }
  442. return "https://" + host + callHdr.Method[:pos]
  443. }
  444. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  445. authData := map[string]string{}
  446. for _, c := range t.perRPCCreds {
  447. data, err := c.GetRequestMetadata(ctx, audience)
  448. if err != nil {
  449. if _, ok := status.FromError(err); ok {
  450. return nil, err
  451. }
  452. return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
  453. }
  454. for k, v := range data {
  455. // Capital header names are illegal in HTTP/2.
  456. k = strings.ToLower(k)
  457. authData[k] = v
  458. }
  459. }
  460. return authData, nil
  461. }
  462. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  463. callAuthData := map[string]string{}
  464. // Check if credentials.PerRPCCredentials were provided via call options.
  465. // Note: if these credentials are provided both via dial options and call
  466. // options, then both sets of credentials will be applied.
  467. if callCreds := callHdr.Creds; callCreds != nil {
  468. if !t.isSecure && callCreds.RequireTransportSecurity() {
  469. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  470. }
  471. data, err := callCreds.GetRequestMetadata(ctx, audience)
  472. if err != nil {
  473. return nil, status.Errorf(codes.Internal, "transport: %v", err)
  474. }
  475. for k, v := range data {
  476. // Capital header names are illegal in HTTP/2
  477. k = strings.ToLower(k)
  478. callAuthData[k] = v
  479. }
  480. }
  481. return callAuthData, nil
  482. }
  483. // NewStream creates a stream and registers it into the transport as "active"
  484. // streams.
  485. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  486. ctx = peer.NewContext(ctx, t.getPeer())
  487. headerFields, err := t.createHeaderFields(ctx, callHdr)
  488. if err != nil {
  489. return nil, err
  490. }
  491. s := t.newStream(ctx, callHdr)
  492. cleanup := func(err error) {
  493. if s.swapState(streamDone) == streamDone {
  494. // If it was already done, return.
  495. return
  496. }
  497. // The stream was unprocessed by the server.
  498. atomic.StoreUint32(&s.unprocessed, 1)
  499. s.write(recvMsg{err: err})
  500. close(s.done)
  501. // If headerChan isn't closed, then close it.
  502. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  503. close(s.headerChan)
  504. }
  505. }
  506. hdr := &headerFrame{
  507. hf: headerFields,
  508. endStream: false,
  509. initStream: func(id uint32) (bool, error) {
  510. t.mu.Lock()
  511. if state := t.state; state != reachable {
  512. t.mu.Unlock()
  513. // Do a quick cleanup.
  514. err := error(errStreamDrain)
  515. if state == closing {
  516. err = ErrConnClosing
  517. }
  518. cleanup(err)
  519. return false, err
  520. }
  521. t.activeStreams[id] = s
  522. if channelz.IsOn() {
  523. atomic.AddInt64(&t.czData.streamsStarted, 1)
  524. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  525. }
  526. var sendPing bool
  527. // If the number of active streams change from 0 to 1, then check if keepalive
  528. // has gone dormant. If so, wake it up.
  529. if len(t.activeStreams) == 1 && t.keepaliveEnabled {
  530. select {
  531. case t.awakenKeepalive <- struct{}{}:
  532. sendPing = true
  533. // Fill the awakenKeepalive channel again as this channel must be
  534. // kept non-writable except at the point that the keepalive()
  535. // goroutine is waiting either to be awaken or shutdown.
  536. t.awakenKeepalive <- struct{}{}
  537. default:
  538. }
  539. }
  540. t.mu.Unlock()
  541. return sendPing, nil
  542. },
  543. onOrphaned: cleanup,
  544. wq: s.wq,
  545. }
  546. firstTry := true
  547. var ch chan struct{}
  548. checkForStreamQuota := func(it interface{}) bool {
  549. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  550. if firstTry {
  551. t.waitingStreams++
  552. }
  553. ch = t.streamsQuotaAvailable
  554. return false
  555. }
  556. if !firstTry {
  557. t.waitingStreams--
  558. }
  559. t.streamQuota--
  560. h := it.(*headerFrame)
  561. h.streamID = t.nextID
  562. t.nextID += 2
  563. s.id = h.streamID
  564. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  565. if t.streamQuota > 0 && t.waitingStreams > 0 {
  566. select {
  567. case t.streamsQuotaAvailable <- struct{}{}:
  568. default:
  569. }
  570. }
  571. return true
  572. }
  573. var hdrListSizeErr error
  574. checkForHeaderListSize := func(it interface{}) bool {
  575. if t.maxSendHeaderListSize == nil {
  576. return true
  577. }
  578. hdrFrame := it.(*headerFrame)
  579. var sz int64
  580. for _, f := range hdrFrame.hf {
  581. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  582. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  583. return false
  584. }
  585. }
  586. return true
  587. }
  588. for {
  589. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  590. if !checkForStreamQuota(it) {
  591. return false
  592. }
  593. if !checkForHeaderListSize(it) {
  594. return false
  595. }
  596. return true
  597. }, hdr)
  598. if err != nil {
  599. return nil, err
  600. }
  601. if success {
  602. break
  603. }
  604. if hdrListSizeErr != nil {
  605. return nil, hdrListSizeErr
  606. }
  607. firstTry = false
  608. select {
  609. case <-ch:
  610. case <-s.ctx.Done():
  611. return nil, ContextErr(s.ctx.Err())
  612. case <-t.goAway:
  613. return nil, errStreamDrain
  614. case <-t.ctx.Done():
  615. return nil, ErrConnClosing
  616. }
  617. }
  618. if t.statsHandler != nil {
  619. outHeader := &stats.OutHeader{
  620. Client: true,
  621. FullMethod: callHdr.Method,
  622. RemoteAddr: t.remoteAddr,
  623. LocalAddr: t.localAddr,
  624. Compression: callHdr.SendCompress,
  625. }
  626. t.statsHandler.HandleRPC(s.ctx, outHeader)
  627. }
  628. return s, nil
  629. }
  630. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  631. // This must not be executed in reader's goroutine.
  632. func (t *http2Client) CloseStream(s *Stream, err error) {
  633. var (
  634. rst bool
  635. rstCode http2.ErrCode
  636. )
  637. if err != nil {
  638. rst = true
  639. rstCode = http2.ErrCodeCancel
  640. }
  641. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  642. }
  643. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  644. // Set stream status to done.
  645. if s.swapState(streamDone) == streamDone {
  646. // If it was already done, return. If multiple closeStream calls
  647. // happen simultaneously, wait for the first to finish.
  648. <-s.done
  649. return
  650. }
  651. // status and trailers can be updated here without any synchronization because the stream goroutine will
  652. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  653. // only after updating this.
  654. s.status = st
  655. if len(mdata) > 0 {
  656. s.trailer = mdata
  657. }
  658. if err != nil {
  659. // This will unblock reads eventually.
  660. s.write(recvMsg{err: err})
  661. }
  662. // If headerChan isn't closed, then close it.
  663. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  664. s.noHeaders = true
  665. close(s.headerChan)
  666. }
  667. cleanup := &cleanupStream{
  668. streamID: s.id,
  669. onWrite: func() {
  670. t.mu.Lock()
  671. if t.activeStreams != nil {
  672. delete(t.activeStreams, s.id)
  673. }
  674. t.mu.Unlock()
  675. if channelz.IsOn() {
  676. if eosReceived {
  677. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  678. } else {
  679. atomic.AddInt64(&t.czData.streamsFailed, 1)
  680. }
  681. }
  682. },
  683. rst: rst,
  684. rstCode: rstCode,
  685. }
  686. addBackStreamQuota := func(interface{}) bool {
  687. t.streamQuota++
  688. if t.streamQuota > 0 && t.waitingStreams > 0 {
  689. select {
  690. case t.streamsQuotaAvailable <- struct{}{}:
  691. default:
  692. }
  693. }
  694. return true
  695. }
  696. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  697. // This will unblock write.
  698. close(s.done)
  699. }
  700. // Close kicks off the shutdown process of the transport. This should be called
  701. // only once on a transport. Once it is called, the transport should not be
  702. // accessed any more.
  703. //
  704. // This method blocks until the addrConn that initiated this transport is
  705. // re-connected. This happens because t.onClose() begins reconnect logic at the
  706. // addrConn level and blocks until the addrConn is successfully connected.
  707. func (t *http2Client) Close() error {
  708. t.mu.Lock()
  709. // Make sure we only Close once.
  710. if t.state == closing {
  711. t.mu.Unlock()
  712. return nil
  713. }
  714. t.state = closing
  715. streams := t.activeStreams
  716. t.activeStreams = nil
  717. t.mu.Unlock()
  718. t.controlBuf.finish()
  719. t.cancel()
  720. err := t.conn.Close()
  721. if channelz.IsOn() {
  722. channelz.RemoveEntry(t.channelzID)
  723. }
  724. // Notify all active streams.
  725. for _, s := range streams {
  726. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
  727. }
  728. if t.statsHandler != nil {
  729. connEnd := &stats.ConnEnd{
  730. Client: true,
  731. }
  732. t.statsHandler.HandleConn(t.ctx, connEnd)
  733. }
  734. go t.onClose()
  735. return err
  736. }
  737. // GracefulClose sets the state to draining, which prevents new streams from
  738. // being created and causes the transport to be closed when the last active
  739. // stream is closed. If there are no active streams, the transport is closed
  740. // immediately. This does nothing if the transport is already draining or
  741. // closing.
  742. func (t *http2Client) GracefulClose() error {
  743. t.mu.Lock()
  744. // Make sure we move to draining only from active.
  745. if t.state == draining || t.state == closing {
  746. t.mu.Unlock()
  747. return nil
  748. }
  749. t.state = draining
  750. active := len(t.activeStreams)
  751. t.mu.Unlock()
  752. if active == 0 {
  753. return t.Close()
  754. }
  755. t.controlBuf.put(&incomingGoAway{})
  756. return nil
  757. }
  758. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  759. // should proceed only if Write returns nil.
  760. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  761. if opts.Last {
  762. // If it's the last message, update stream state.
  763. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  764. return errStreamDone
  765. }
  766. } else if s.getState() != streamActive {
  767. return errStreamDone
  768. }
  769. df := &dataFrame{
  770. streamID: s.id,
  771. endStream: opts.Last,
  772. }
  773. if hdr != nil || data != nil { // If it's not an empty data frame.
  774. // Add some data to grpc message header so that we can equally
  775. // distribute bytes across frames.
  776. emptyLen := http2MaxFrameLen - len(hdr)
  777. if emptyLen > len(data) {
  778. emptyLen = len(data)
  779. }
  780. hdr = append(hdr, data[:emptyLen]...)
  781. data = data[emptyLen:]
  782. df.h, df.d = hdr, data
  783. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  784. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  785. return err
  786. }
  787. }
  788. return t.controlBuf.put(df)
  789. }
  790. func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
  791. t.mu.Lock()
  792. defer t.mu.Unlock()
  793. s, ok := t.activeStreams[f.Header().StreamID]
  794. return s, ok
  795. }
  796. // adjustWindow sends out extra window update over the initial window size
  797. // of stream if the application is requesting data larger in size than
  798. // the window.
  799. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  800. if w := s.fc.maybeAdjust(n); w > 0 {
  801. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  802. }
  803. }
  804. // updateWindow adjusts the inbound quota for the stream.
  805. // Window updates will be sent out when the cumulative quota
  806. // exceeds the corresponding threshold.
  807. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  808. if w := s.fc.onRead(n); w > 0 {
  809. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  810. }
  811. }
  812. // updateFlowControl updates the incoming flow control windows
  813. // for the transport and the stream based on the current bdp
  814. // estimation.
  815. func (t *http2Client) updateFlowControl(n uint32) {
  816. t.mu.Lock()
  817. for _, s := range t.activeStreams {
  818. s.fc.newLimit(n)
  819. }
  820. t.mu.Unlock()
  821. updateIWS := func(interface{}) bool {
  822. t.initialWindowSize = int32(n)
  823. return true
  824. }
  825. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  826. t.controlBuf.put(&outgoingSettings{
  827. ss: []http2.Setting{
  828. {
  829. ID: http2.SettingInitialWindowSize,
  830. Val: n,
  831. },
  832. },
  833. })
  834. }
  835. func (t *http2Client) handleData(f *http2.DataFrame) {
  836. size := f.Header().Length
  837. var sendBDPPing bool
  838. if t.bdpEst != nil {
  839. sendBDPPing = t.bdpEst.add(size)
  840. }
  841. // Decouple connection's flow control from application's read.
  842. // An update on connection's flow control should not depend on
  843. // whether user application has read the data or not. Such a
  844. // restriction is already imposed on the stream's flow control,
  845. // and therefore the sender will be blocked anyways.
  846. // Decoupling the connection flow control will prevent other
  847. // active(fast) streams from starving in presence of slow or
  848. // inactive streams.
  849. //
  850. if w := t.fc.onData(size); w > 0 {
  851. t.controlBuf.put(&outgoingWindowUpdate{
  852. streamID: 0,
  853. increment: w,
  854. })
  855. }
  856. if sendBDPPing {
  857. // Avoid excessive ping detection (e.g. in an L7 proxy)
  858. // by sending a window update prior to the BDP ping.
  859. if w := t.fc.reset(); w > 0 {
  860. t.controlBuf.put(&outgoingWindowUpdate{
  861. streamID: 0,
  862. increment: w,
  863. })
  864. }
  865. t.controlBuf.put(bdpPing)
  866. }
  867. // Select the right stream to dispatch.
  868. s, ok := t.getStream(f)
  869. if !ok {
  870. return
  871. }
  872. if size > 0 {
  873. if err := s.fc.onData(size); err != nil {
  874. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  875. return
  876. }
  877. if f.Header().Flags.Has(http2.FlagDataPadded) {
  878. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  879. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  880. }
  881. }
  882. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  883. // guarantee f.Data() is consumed before the arrival of next frame.
  884. // Can this copy be eliminated?
  885. if len(f.Data()) > 0 {
  886. data := make([]byte, len(f.Data()))
  887. copy(data, f.Data())
  888. s.write(recvMsg{data: data})
  889. }
  890. }
  891. // The server has closed the stream without sending trailers. Record that
  892. // the read direction is closed, and set the status appropriately.
  893. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  894. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  895. }
  896. }
  897. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  898. s, ok := t.getStream(f)
  899. if !ok {
  900. return
  901. }
  902. if f.ErrCode == http2.ErrCodeRefusedStream {
  903. // The stream was unprocessed by the server.
  904. atomic.StoreUint32(&s.unprocessed, 1)
  905. }
  906. statusCode, ok := http2ErrConvTab[f.ErrCode]
  907. if !ok {
  908. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  909. statusCode = codes.Unknown
  910. }
  911. if statusCode == codes.Canceled {
  912. // Our deadline was already exceeded, and that was likely the cause of
  913. // this cancelation. Alter the status code accordingly.
  914. if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
  915. statusCode = codes.DeadlineExceeded
  916. }
  917. }
  918. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  919. }
  920. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  921. if f.IsAck() {
  922. return
  923. }
  924. var maxStreams *uint32
  925. var ss []http2.Setting
  926. var updateFuncs []func()
  927. f.ForeachSetting(func(s http2.Setting) error {
  928. switch s.ID {
  929. case http2.SettingMaxConcurrentStreams:
  930. maxStreams = new(uint32)
  931. *maxStreams = s.Val
  932. case http2.SettingMaxHeaderListSize:
  933. updateFuncs = append(updateFuncs, func() {
  934. t.maxSendHeaderListSize = new(uint32)
  935. *t.maxSendHeaderListSize = s.Val
  936. })
  937. default:
  938. ss = append(ss, s)
  939. }
  940. return nil
  941. })
  942. if isFirst && maxStreams == nil {
  943. maxStreams = new(uint32)
  944. *maxStreams = math.MaxUint32
  945. }
  946. sf := &incomingSettings{
  947. ss: ss,
  948. }
  949. if maxStreams != nil {
  950. updateStreamQuota := func() {
  951. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  952. t.maxConcurrentStreams = *maxStreams
  953. t.streamQuota += delta
  954. if delta > 0 && t.waitingStreams > 0 {
  955. close(t.streamsQuotaAvailable) // wake all of them up.
  956. t.streamsQuotaAvailable = make(chan struct{}, 1)
  957. }
  958. }
  959. updateFuncs = append(updateFuncs, updateStreamQuota)
  960. }
  961. t.controlBuf.executeAndPut(func(interface{}) bool {
  962. for _, f := range updateFuncs {
  963. f()
  964. }
  965. return true
  966. }, sf)
  967. }
  968. func (t *http2Client) handlePing(f *http2.PingFrame) {
  969. if f.IsAck() {
  970. // Maybe it's a BDP ping.
  971. if t.bdpEst != nil {
  972. t.bdpEst.calculate(f.Data)
  973. }
  974. return
  975. }
  976. pingAck := &ping{ack: true}
  977. copy(pingAck.data[:], f.Data[:])
  978. t.controlBuf.put(pingAck)
  979. }
  980. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  981. t.mu.Lock()
  982. if t.state == closing {
  983. t.mu.Unlock()
  984. return
  985. }
  986. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  987. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  988. }
  989. id := f.LastStreamID
  990. if id > 0 && id%2 != 1 {
  991. t.mu.Unlock()
  992. t.Close()
  993. return
  994. }
  995. // A client can receive multiple GoAways from the server (see
  996. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  997. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  998. // sent after an RTT delay with the ID of the last stream the server will
  999. // process.
  1000. //
  1001. // Therefore, when we get the first GoAway we don't necessarily close any
  1002. // streams. While in case of second GoAway we close all streams created after
  1003. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1004. // server was being sent don't get killed.
  1005. select {
  1006. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1007. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1008. if id > t.prevGoAwayID {
  1009. t.mu.Unlock()
  1010. t.Close()
  1011. return
  1012. }
  1013. default:
  1014. t.setGoAwayReason(f)
  1015. close(t.goAway)
  1016. t.state = draining
  1017. t.controlBuf.put(&incomingGoAway{})
  1018. // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
  1019. t.onGoAway(t.goAwayReason)
  1020. }
  1021. // All streams with IDs greater than the GoAwayId
  1022. // and smaller than the previous GoAway ID should be killed.
  1023. upperLimit := t.prevGoAwayID
  1024. if upperLimit == 0 { // This is the first GoAway Frame.
  1025. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1026. }
  1027. for streamID, stream := range t.activeStreams {
  1028. if streamID > id && streamID <= upperLimit {
  1029. // The stream was unprocessed by the server.
  1030. atomic.StoreUint32(&stream.unprocessed, 1)
  1031. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1032. }
  1033. }
  1034. t.prevGoAwayID = id
  1035. active := len(t.activeStreams)
  1036. t.mu.Unlock()
  1037. if active == 0 {
  1038. t.Close()
  1039. }
  1040. }
  1041. // setGoAwayReason sets the value of t.goAwayReason based
  1042. // on the GoAway frame received.
  1043. // It expects a lock on transport's mutext to be held by
  1044. // the caller.
  1045. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1046. t.goAwayReason = GoAwayNoReason
  1047. switch f.ErrCode {
  1048. case http2.ErrCodeEnhanceYourCalm:
  1049. if string(f.DebugData()) == "too_many_pings" {
  1050. t.goAwayReason = GoAwayTooManyPings
  1051. }
  1052. }
  1053. }
  1054. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  1055. t.mu.Lock()
  1056. defer t.mu.Unlock()
  1057. return t.goAwayReason
  1058. }
  1059. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1060. t.controlBuf.put(&incomingWindowUpdate{
  1061. streamID: f.Header().StreamID,
  1062. increment: f.Increment,
  1063. })
  1064. }
  1065. // operateHeaders takes action on the decoded headers.
  1066. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1067. s, ok := t.getStream(frame)
  1068. if !ok {
  1069. return
  1070. }
  1071. atomic.StoreUint32(&s.bytesReceived, 1)
  1072. var state decodeState
  1073. if err := state.decodeHeader(frame); err != nil {
  1074. t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
  1075. // Something wrong. Stops reading even when there is remaining.
  1076. return
  1077. }
  1078. endStream := frame.StreamEnded()
  1079. var isHeader bool
  1080. defer func() {
  1081. if t.statsHandler != nil {
  1082. if isHeader {
  1083. inHeader := &stats.InHeader{
  1084. Client: true,
  1085. WireLength: int(frame.Header().Length),
  1086. }
  1087. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1088. } else {
  1089. inTrailer := &stats.InTrailer{
  1090. Client: true,
  1091. WireLength: int(frame.Header().Length),
  1092. }
  1093. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1094. }
  1095. }
  1096. }()
  1097. // If headers haven't been received yet.
  1098. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  1099. if !endStream {
  1100. // Headers frame is not actually a trailers-only frame.
  1101. isHeader = true
  1102. // These values can be set without any synchronization because
  1103. // stream goroutine will read it only after seeing a closed
  1104. // headerChan which we'll close after setting this.
  1105. s.recvCompress = state.encoding
  1106. if len(state.mdata) > 0 {
  1107. s.header = state.mdata
  1108. }
  1109. } else {
  1110. s.noHeaders = true
  1111. }
  1112. close(s.headerChan)
  1113. }
  1114. if !endStream {
  1115. return
  1116. }
  1117. // if client received END_STREAM from server while stream was still active, send RST_STREAM
  1118. rst := s.getState() == streamActive
  1119. t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
  1120. }
  1121. // reader runs as a separate goroutine in charge of reading data from network
  1122. // connection.
  1123. //
  1124. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1125. // optimal.
  1126. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1127. func (t *http2Client) reader() {
  1128. defer close(t.readerDone)
  1129. // Check the validity of server preface.
  1130. frame, err := t.framer.fr.ReadFrame()
  1131. if err != nil {
  1132. t.Close() // this kicks off resetTransport, so must be last before return
  1133. return
  1134. }
  1135. t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
  1136. if t.keepaliveEnabled {
  1137. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1138. }
  1139. sf, ok := frame.(*http2.SettingsFrame)
  1140. if !ok {
  1141. t.Close() // this kicks off resetTransport, so must be last before return
  1142. return
  1143. }
  1144. t.onSuccess()
  1145. t.handleSettings(sf, true)
  1146. // loop to keep reading incoming messages on this transport.
  1147. for {
  1148. frame, err := t.framer.fr.ReadFrame()
  1149. if t.keepaliveEnabled {
  1150. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1151. }
  1152. if err != nil {
  1153. // Abort an active stream if the http2.Framer returns a
  1154. // http2.StreamError. This can happen only if the server's response
  1155. // is malformed http2.
  1156. if se, ok := err.(http2.StreamError); ok {
  1157. t.mu.Lock()
  1158. s := t.activeStreams[se.StreamID]
  1159. t.mu.Unlock()
  1160. if s != nil {
  1161. // use error detail to provide better err message
  1162. code := http2ErrConvTab[se.Code]
  1163. msg := t.framer.fr.ErrorDetail().Error()
  1164. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1165. }
  1166. continue
  1167. } else {
  1168. // Transport error.
  1169. t.Close()
  1170. return
  1171. }
  1172. }
  1173. switch frame := frame.(type) {
  1174. case *http2.MetaHeadersFrame:
  1175. t.operateHeaders(frame)
  1176. case *http2.DataFrame:
  1177. t.handleData(frame)
  1178. case *http2.RSTStreamFrame:
  1179. t.handleRSTStream(frame)
  1180. case *http2.SettingsFrame:
  1181. t.handleSettings(frame, false)
  1182. case *http2.PingFrame:
  1183. t.handlePing(frame)
  1184. case *http2.GoAwayFrame:
  1185. t.handleGoAway(frame)
  1186. case *http2.WindowUpdateFrame:
  1187. t.handleWindowUpdate(frame)
  1188. default:
  1189. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1190. }
  1191. }
  1192. }
  1193. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1194. func (t *http2Client) keepalive() {
  1195. p := &ping{data: [8]byte{}}
  1196. timer := time.NewTimer(t.kp.Time)
  1197. for {
  1198. select {
  1199. case <-timer.C:
  1200. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1201. timer.Reset(t.kp.Time)
  1202. continue
  1203. }
  1204. // Check if keepalive should go dormant.
  1205. t.mu.Lock()
  1206. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1207. // Make awakenKeepalive writable.
  1208. <-t.awakenKeepalive
  1209. t.mu.Unlock()
  1210. select {
  1211. case <-t.awakenKeepalive:
  1212. // If the control gets here a ping has been sent
  1213. // need to reset the timer with keepalive.Timeout.
  1214. case <-t.ctx.Done():
  1215. return
  1216. }
  1217. } else {
  1218. t.mu.Unlock()
  1219. if channelz.IsOn() {
  1220. atomic.AddInt64(&t.czData.kpCount, 1)
  1221. }
  1222. // Send ping.
  1223. t.controlBuf.put(p)
  1224. }
  1225. // By the time control gets here a ping has been sent one way or the other.
  1226. timer.Reset(t.kp.Timeout)
  1227. select {
  1228. case <-timer.C:
  1229. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1230. timer.Reset(t.kp.Time)
  1231. continue
  1232. }
  1233. t.Close()
  1234. return
  1235. case <-t.ctx.Done():
  1236. if !timer.Stop() {
  1237. <-timer.C
  1238. }
  1239. return
  1240. }
  1241. case <-t.ctx.Done():
  1242. if !timer.Stop() {
  1243. <-timer.C
  1244. }
  1245. return
  1246. }
  1247. }
  1248. }
  1249. func (t *http2Client) Error() <-chan struct{} {
  1250. return t.ctx.Done()
  1251. }
  1252. func (t *http2Client) GoAway() <-chan struct{} {
  1253. return t.goAway
  1254. }
  1255. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1256. s := channelz.SocketInternalMetric{
  1257. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1258. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1259. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1260. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1261. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1262. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1263. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1264. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1265. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1266. LocalFlowControlWindow: int64(t.fc.getSize()),
  1267. SocketOptions: channelz.GetSocketOption(t.conn),
  1268. LocalAddr: t.localAddr,
  1269. RemoteAddr: t.remoteAddr,
  1270. // RemoteName :
  1271. }
  1272. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1273. s.Security = au.GetSecurityValue()
  1274. }
  1275. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1276. return &s
  1277. }
  1278. func (t *http2Client) IncrMsgSent() {
  1279. atomic.AddInt64(&t.czData.msgSent, 1)
  1280. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1281. }
  1282. func (t *http2Client) IncrMsgRecv() {
  1283. atomic.AddInt64(&t.czData.msgRecv, 1)
  1284. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1285. }
  1286. func (t *http2Client) getOutFlowWindow() int64 {
  1287. resp := make(chan uint32, 1)
  1288. timer := time.NewTimer(time.Second)
  1289. defer timer.Stop()
  1290. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1291. select {
  1292. case sz := <-resp:
  1293. return int64(sz)
  1294. case <-t.ctxDone:
  1295. return -1
  1296. case <-timer.C:
  1297. return -2
  1298. }
  1299. }