dialoptions.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "net"
  22. "time"
  23. "golang.org/x/net/context"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/internal"
  27. "google.golang.org/grpc/internal/backoff"
  28. "google.golang.org/grpc/internal/envconfig"
  29. "google.golang.org/grpc/internal/transport"
  30. "google.golang.org/grpc/keepalive"
  31. "google.golang.org/grpc/resolver"
  32. "google.golang.org/grpc/stats"
  33. )
  34. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  35. // values passed to Dial.
  36. type dialOptions struct {
  37. unaryInt UnaryClientInterceptor
  38. streamInt StreamClientInterceptor
  39. cp Compressor
  40. dc Decompressor
  41. bs backoff.Strategy
  42. block bool
  43. insecure bool
  44. timeout time.Duration
  45. scChan <-chan ServiceConfig
  46. authority string
  47. copts transport.ConnectOptions
  48. callOptions []CallOption
  49. // This is used by v1 balancer dial option WithBalancer to support v1
  50. // balancer, and also by WithBalancerName dial option.
  51. balancerBuilder balancer.Builder
  52. // This is to support grpclb.
  53. resolverBuilder resolver.Builder
  54. waitForHandshake bool
  55. channelzParentID int64
  56. disableServiceConfig bool
  57. disableRetry bool
  58. }
  59. // DialOption configures how we set up the connection.
  60. type DialOption interface {
  61. apply(*dialOptions)
  62. }
  63. // EmptyDialOption does not alter the dial configuration. It can be embedded in
  64. // another structure to build custom dial options.
  65. //
  66. // This API is EXPERIMENTAL.
  67. type EmptyDialOption struct{}
  68. func (EmptyDialOption) apply(*dialOptions) {}
  69. // funcDialOption wraps a function that modifies dialOptions into an
  70. // implementation of the DialOption interface.
  71. type funcDialOption struct {
  72. f func(*dialOptions)
  73. }
  74. func (fdo *funcDialOption) apply(do *dialOptions) {
  75. fdo.f(do)
  76. }
  77. func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
  78. return &funcDialOption{
  79. f: f,
  80. }
  81. }
  82. // WithWaitForHandshake blocks until the initial settings frame is received from
  83. // the server before assigning RPCs to the connection. Experimental API.
  84. func WithWaitForHandshake() DialOption {
  85. return newFuncDialOption(func(o *dialOptions) {
  86. o.waitForHandshake = true
  87. })
  88. }
  89. // WithWriteBufferSize determines how much data can be batched before doing a
  90. // write on the wire. The corresponding memory allocation for this buffer will
  91. // be twice the size to keep syscalls low. The default value for this buffer is
  92. // 32KB.
  93. //
  94. // Zero will disable the write buffer such that each write will be on underlying
  95. // connection. Note: A Send call may not directly translate to a write.
  96. func WithWriteBufferSize(s int) DialOption {
  97. return newFuncDialOption(func(o *dialOptions) {
  98. o.copts.WriteBufferSize = s
  99. })
  100. }
  101. // WithReadBufferSize lets you set the size of read buffer, this determines how
  102. // much data can be read at most for each read syscall.
  103. //
  104. // The default value for this buffer is 32KB. Zero will disable read buffer for
  105. // a connection so data framer can access the underlying conn directly.
  106. func WithReadBufferSize(s int) DialOption {
  107. return newFuncDialOption(func(o *dialOptions) {
  108. o.copts.ReadBufferSize = s
  109. })
  110. }
  111. // WithInitialWindowSize returns a DialOption which sets the value for initial
  112. // window size on a stream. The lower bound for window size is 64K and any value
  113. // smaller than that will be ignored.
  114. func WithInitialWindowSize(s int32) DialOption {
  115. return newFuncDialOption(func(o *dialOptions) {
  116. o.copts.InitialWindowSize = s
  117. })
  118. }
  119. // WithInitialConnWindowSize returns a DialOption which sets the value for
  120. // initial window size on a connection. The lower bound for window size is 64K
  121. // and any value smaller than that will be ignored.
  122. func WithInitialConnWindowSize(s int32) DialOption {
  123. return newFuncDialOption(func(o *dialOptions) {
  124. o.copts.InitialConnWindowSize = s
  125. })
  126. }
  127. // WithMaxMsgSize returns a DialOption which sets the maximum message size the
  128. // client can receive.
  129. //
  130. // Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
  131. func WithMaxMsgSize(s int) DialOption {
  132. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  133. }
  134. // WithDefaultCallOptions returns a DialOption which sets the default
  135. // CallOptions for calls over the connection.
  136. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  137. return newFuncDialOption(func(o *dialOptions) {
  138. o.callOptions = append(o.callOptions, cos...)
  139. })
  140. }
  141. // WithCodec returns a DialOption which sets a codec for message marshaling and
  142. // unmarshaling.
  143. //
  144. // Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
  145. func WithCodec(c Codec) DialOption {
  146. return WithDefaultCallOptions(CallCustomCodec(c))
  147. }
  148. // WithCompressor returns a DialOption which sets a Compressor to use for
  149. // message compression. It has lower priority than the compressor set by the
  150. // UseCompressor CallOption.
  151. //
  152. // Deprecated: use UseCompressor instead.
  153. func WithCompressor(cp Compressor) DialOption {
  154. return newFuncDialOption(func(o *dialOptions) {
  155. o.cp = cp
  156. })
  157. }
  158. // WithDecompressor returns a DialOption which sets a Decompressor to use for
  159. // incoming message decompression. If incoming response messages are encoded
  160. // using the decompressor's Type(), it will be used. Otherwise, the message
  161. // encoding will be used to look up the compressor registered via
  162. // encoding.RegisterCompressor, which will then be used to decompress the
  163. // message. If no compressor is registered for the encoding, an Unimplemented
  164. // status error will be returned.
  165. //
  166. // Deprecated: use encoding.RegisterCompressor instead.
  167. func WithDecompressor(dc Decompressor) DialOption {
  168. return newFuncDialOption(func(o *dialOptions) {
  169. o.dc = dc
  170. })
  171. }
  172. // WithBalancer returns a DialOption which sets a load balancer with the v1 API.
  173. // Name resolver will be ignored if this DialOption is specified.
  174. //
  175. // Deprecated: use the new balancer APIs in balancer package and
  176. // WithBalancerName.
  177. func WithBalancer(b Balancer) DialOption {
  178. return newFuncDialOption(func(o *dialOptions) {
  179. o.balancerBuilder = &balancerWrapperBuilder{
  180. b: b,
  181. }
  182. })
  183. }
  184. // WithBalancerName sets the balancer that the ClientConn will be initialized
  185. // with. Balancer registered with balancerName will be used. This function
  186. // panics if no balancer was registered by balancerName.
  187. //
  188. // The balancer cannot be overridden by balancer option specified by service
  189. // config.
  190. //
  191. // This is an EXPERIMENTAL API.
  192. func WithBalancerName(balancerName string) DialOption {
  193. builder := balancer.Get(balancerName)
  194. if builder == nil {
  195. panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
  196. }
  197. return newFuncDialOption(func(o *dialOptions) {
  198. o.balancerBuilder = builder
  199. })
  200. }
  201. // withResolverBuilder is only for grpclb.
  202. func withResolverBuilder(b resolver.Builder) DialOption {
  203. return newFuncDialOption(func(o *dialOptions) {
  204. o.resolverBuilder = b
  205. })
  206. }
  207. // WithServiceConfig returns a DialOption which has a channel to read the
  208. // service configuration.
  209. //
  210. // Deprecated: service config should be received through name resolver, as
  211. // specified here.
  212. // https://github.com/grpc/grpc/blob/master/doc/service_config.md
  213. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  214. return newFuncDialOption(func(o *dialOptions) {
  215. o.scChan = c
  216. })
  217. }
  218. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  219. // when backing off after failed connection attempts.
  220. func WithBackoffMaxDelay(md time.Duration) DialOption {
  221. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  222. }
  223. // WithBackoffConfig configures the dialer to use the provided backoff
  224. // parameters after connection failures.
  225. //
  226. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  227. // for use.
  228. func WithBackoffConfig(b BackoffConfig) DialOption {
  229. return withBackoff(backoff.Exponential{
  230. MaxDelay: b.MaxDelay,
  231. })
  232. }
  233. // withBackoff sets the backoff strategy used for connectRetryNum after a failed
  234. // connection attempt.
  235. //
  236. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  237. func withBackoff(bs backoff.Strategy) DialOption {
  238. return newFuncDialOption(func(o *dialOptions) {
  239. o.bs = bs
  240. })
  241. }
  242. // WithBlock returns a DialOption which makes caller of Dial blocks until the
  243. // underlying connection is up. Without this, Dial returns immediately and
  244. // connecting the server happens in background.
  245. func WithBlock() DialOption {
  246. return newFuncDialOption(func(o *dialOptions) {
  247. o.block = true
  248. })
  249. }
  250. // WithInsecure returns a DialOption which disables transport security for this
  251. // ClientConn. Note that transport security is required unless WithInsecure is
  252. // set.
  253. func WithInsecure() DialOption {
  254. return newFuncDialOption(func(o *dialOptions) {
  255. o.insecure = true
  256. })
  257. }
  258. // WithTransportCredentials returns a DialOption which configures a connection
  259. // level security credentials (e.g., TLS/SSL). This should not be used together
  260. // with WithCredentialsBundle.
  261. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  262. return newFuncDialOption(func(o *dialOptions) {
  263. o.copts.TransportCredentials = creds
  264. })
  265. }
  266. // WithPerRPCCredentials returns a DialOption which sets credentials and places
  267. // auth state on each outbound RPC.
  268. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  269. return newFuncDialOption(func(o *dialOptions) {
  270. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  271. })
  272. }
  273. // WithCredentialsBundle returns a DialOption to set a credentials bundle for
  274. // the ClientConn.WithCreds. This should not be used together with
  275. // WithTransportCredentials.
  276. //
  277. // This API is experimental.
  278. func WithCredentialsBundle(b credentials.Bundle) DialOption {
  279. return newFuncDialOption(func(o *dialOptions) {
  280. o.copts.CredsBundle = b
  281. })
  282. }
  283. // WithTimeout returns a DialOption that configures a timeout for dialing a
  284. // ClientConn initially. This is valid if and only if WithBlock() is present.
  285. //
  286. // Deprecated: use DialContext and context.WithTimeout instead.
  287. func WithTimeout(d time.Duration) DialOption {
  288. return newFuncDialOption(func(o *dialOptions) {
  289. o.timeout = d
  290. })
  291. }
  292. func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
  293. return newFuncDialOption(func(o *dialOptions) {
  294. o.copts.Dialer = f
  295. })
  296. }
  297. func init() {
  298. internal.WithContextDialer = withContextDialer
  299. internal.WithResolverBuilder = withResolverBuilder
  300. }
  301. // WithDialer returns a DialOption that specifies a function to use for dialing
  302. // network addresses. If FailOnNonTempDialError() is set to true, and an error
  303. // is returned by f, gRPC checks the error's Temporary() method to decide if it
  304. // should try to reconnect to the network address.
  305. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  306. return withContextDialer(
  307. func(ctx context.Context, addr string) (net.Conn, error) {
  308. if deadline, ok := ctx.Deadline(); ok {
  309. return f(addr, deadline.Sub(time.Now()))
  310. }
  311. return f(addr, 0)
  312. })
  313. }
  314. // WithStatsHandler returns a DialOption that specifies the stats handler for
  315. // all the RPCs and underlying network connections in this ClientConn.
  316. func WithStatsHandler(h stats.Handler) DialOption {
  317. return newFuncDialOption(func(o *dialOptions) {
  318. o.copts.StatsHandler = h
  319. })
  320. }
  321. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
  322. // non-temporary dial errors. If f is true, and dialer returns a non-temporary
  323. // error, gRPC will fail the connection to the network address and won't try to
  324. // reconnect. The default value of FailOnNonTempDialError is false.
  325. //
  326. // FailOnNonTempDialError only affects the initial dial, and does not do
  327. // anything useful unless you are also using WithBlock().
  328. //
  329. // This is an EXPERIMENTAL API.
  330. func FailOnNonTempDialError(f bool) DialOption {
  331. return newFuncDialOption(func(o *dialOptions) {
  332. o.copts.FailOnNonTempDialError = f
  333. })
  334. }
  335. // WithUserAgent returns a DialOption that specifies a user agent string for all
  336. // the RPCs.
  337. func WithUserAgent(s string) DialOption {
  338. return newFuncDialOption(func(o *dialOptions) {
  339. o.copts.UserAgent = s
  340. })
  341. }
  342. // WithKeepaliveParams returns a DialOption that specifies keepalive parameters
  343. // for the client transport.
  344. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  345. return newFuncDialOption(func(o *dialOptions) {
  346. o.copts.KeepaliveParams = kp
  347. })
  348. }
  349. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for
  350. // unary RPCs.
  351. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  352. return newFuncDialOption(func(o *dialOptions) {
  353. o.unaryInt = f
  354. })
  355. }
  356. // WithStreamInterceptor returns a DialOption that specifies the interceptor for
  357. // streaming RPCs.
  358. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  359. return newFuncDialOption(func(o *dialOptions) {
  360. o.streamInt = f
  361. })
  362. }
  363. // WithAuthority returns a DialOption that specifies the value to be used as the
  364. // :authority pseudo-header. This value only works with WithInsecure and has no
  365. // effect if TransportCredentials are present.
  366. func WithAuthority(a string) DialOption {
  367. return newFuncDialOption(func(o *dialOptions) {
  368. o.authority = a
  369. })
  370. }
  371. // WithChannelzParentID returns a DialOption that specifies the channelz ID of
  372. // current ClientConn's parent. This function is used in nested channel creation
  373. // (e.g. grpclb dial).
  374. func WithChannelzParentID(id int64) DialOption {
  375. return newFuncDialOption(func(o *dialOptions) {
  376. o.channelzParentID = id
  377. })
  378. }
  379. // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
  380. // service config provided by the resolver and provides a hint to the resolver
  381. // to not fetch service configs.
  382. func WithDisableServiceConfig() DialOption {
  383. return newFuncDialOption(func(o *dialOptions) {
  384. o.disableServiceConfig = true
  385. })
  386. }
  387. // WithDisableRetry returns a DialOption that disables retries, even if the
  388. // service config enables them. This does not impact transparent retries, which
  389. // will happen automatically if no data is written to the wire or if the RPC is
  390. // unprocessed by the remote server.
  391. //
  392. // Retry support is currently disabled by default, but will be enabled by
  393. // default in the future. Until then, it may be enabled by setting the
  394. // environment variable "GRPC_GO_RETRY" to "on".
  395. //
  396. // This API is EXPERIMENTAL.
  397. func WithDisableRetry() DialOption {
  398. return newFuncDialOption(func(o *dialOptions) {
  399. o.disableRetry = true
  400. })
  401. }
  402. // WithMaxHeaderListSize returns a DialOption that specifies the maximum
  403. // (uncompressed) size of header list that the client is prepared to accept.
  404. func WithMaxHeaderListSize(s uint32) DialOption {
  405. return newFuncDialOption(func(o *dialOptions) {
  406. o.copts.MaxHeaderListSize = &s
  407. })
  408. }
  409. func defaultDialOptions() dialOptions {
  410. return dialOptions{
  411. disableRetry: !envconfig.Retry,
  412. copts: transport.ConnectOptions{
  413. WriteBufferSize: defaultWriteBufSize,
  414. ReadBufferSize: defaultReadBufSize,
  415. },
  416. }
  417. }