resolver_conn_wrapper.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "strings"
  22. "google.golang.org/grpc/grpclog"
  23. "google.golang.org/grpc/internal/channelz"
  24. "google.golang.org/grpc/resolver"
  25. )
  26. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  27. // It implements resolver.ClientConnection interface.
  28. type ccResolverWrapper struct {
  29. cc *ClientConn
  30. resolver resolver.Resolver
  31. addrCh chan []resolver.Address
  32. scCh chan string
  33. done chan struct{}
  34. lastAddressesCount int
  35. }
  36. // split2 returns the values from strings.SplitN(s, sep, 2).
  37. // If sep is not found, it returns ("", s, false) instead.
  38. func split2(s, sep string) (string, string, bool) {
  39. spl := strings.SplitN(s, sep, 2)
  40. if len(spl) < 2 {
  41. return "", "", false
  42. }
  43. return spl[0], spl[1], true
  44. }
  45. // parseTarget splits target into a struct containing scheme, authority and
  46. // endpoint.
  47. //
  48. // If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
  49. // target}.
  50. func parseTarget(target string) (ret resolver.Target) {
  51. var ok bool
  52. ret.Scheme, ret.Endpoint, ok = split2(target, "://")
  53. if !ok {
  54. return resolver.Target{Endpoint: target}
  55. }
  56. ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
  57. if !ok {
  58. return resolver.Target{Endpoint: target}
  59. }
  60. return ret
  61. }
  62. // newCCResolverWrapper parses cc.target for scheme and gets the resolver
  63. // builder for this scheme and builds the resolver. The monitoring goroutine
  64. // for it is not started yet and can be created by calling start().
  65. //
  66. // If withResolverBuilder dial option is set, the specified resolver will be
  67. // used instead.
  68. func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
  69. rb := cc.dopts.resolverBuilder
  70. if rb == nil {
  71. return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
  72. }
  73. ccr := &ccResolverWrapper{
  74. cc: cc,
  75. addrCh: make(chan []resolver.Address, 1),
  76. scCh: make(chan string, 1),
  77. done: make(chan struct{}),
  78. }
  79. var err error
  80. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
  81. if err != nil {
  82. return nil, err
  83. }
  84. return ccr, nil
  85. }
  86. func (ccr *ccResolverWrapper) start() {
  87. go ccr.watcher()
  88. }
  89. // watcher processes address updates and service config updates sequentially.
  90. // Otherwise, we need to resolve possible races between address and service
  91. // config (e.g. they specify different balancer types).
  92. func (ccr *ccResolverWrapper) watcher() {
  93. for {
  94. select {
  95. case <-ccr.done:
  96. return
  97. default:
  98. }
  99. select {
  100. case addrs := <-ccr.addrCh:
  101. select {
  102. case <-ccr.done:
  103. return
  104. default:
  105. }
  106. grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
  107. if channelz.IsOn() {
  108. ccr.addChannelzTraceEvent(addrs)
  109. }
  110. ccr.cc.handleResolvedAddrs(addrs, nil)
  111. case sc := <-ccr.scCh:
  112. select {
  113. case <-ccr.done:
  114. return
  115. default:
  116. }
  117. grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
  118. ccr.cc.handleServiceConfig(sc)
  119. case <-ccr.done:
  120. return
  121. }
  122. }
  123. }
  124. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
  125. ccr.resolver.ResolveNow(o)
  126. }
  127. func (ccr *ccResolverWrapper) close() {
  128. ccr.resolver.Close()
  129. close(ccr.done)
  130. }
  131. // NewAddress is called by the resolver implemenetion to send addresses to gRPC.
  132. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  133. select {
  134. case <-ccr.addrCh:
  135. default:
  136. }
  137. ccr.addrCh <- addrs
  138. }
  139. // NewServiceConfig is called by the resolver implemenetion to send service
  140. // configs to gRPC.
  141. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  142. select {
  143. case <-ccr.scCh:
  144. default:
  145. }
  146. ccr.scCh <- sc
  147. }
  148. func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {
  149. if len(addrs) == 0 && ccr.lastAddressesCount != 0 {
  150. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  151. Desc: "Resolver returns an empty address list",
  152. Severity: channelz.CtWarning,
  153. })
  154. } else if len(addrs) != 0 && ccr.lastAddressesCount == 0 {
  155. var s string
  156. for i, a := range addrs {
  157. if a.ServerName != "" {
  158. s += a.Addr + "(" + a.ServerName + ")"
  159. } else {
  160. s += a.Addr
  161. }
  162. if i != len(addrs)-1 {
  163. s += " "
  164. }
  165. }
  166. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  167. Desc: fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", s),
  168. Severity: channelz.CtINFO,
  169. })
  170. }
  171. ccr.lastAddressesCount = len(addrs)
  172. }