Fork me on GitHub

grpc-go 客户端Dial过程源码分析

前言:最近做了小组内的分享,是关于浅谈RPC与gRpc源码分析。所以将分享的记录发到个人博客上来。接下来的几篇文章都是gRPC源码分析的个人理解,如有错误,还烦请指出。

源码分析所用案例github仓库地址:https://github.com/BansheeLW/gRPCTest

客户端建立链接发送一次请求获取响应代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
conn, err := grpc.Dial("127.0.0.1:1235", grpc.WithInsecure())
defer conn.Close()
if err != nil{
log.Fatal(err)
}

client := server_hello_proto.NewTigerServiceClient(conn)
response, err := client.HelloTiger(context.Background(), &server_hello_proto.HelloRequest{
Name: "ban",
Age: 11,
})
if err != nil{
log.Fatal(err)
}
log.Println(response)

首先来看 Dial函数

1
2
3
4
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}

从Dial函数声明可以看出,Dial函数实际上是对DialContext的一层封装,Dial函数接收两个参数,target是连接地址,opts是可变长参数,接受一个元素类型为DialOptionl的切片。

DialContext入参多了一个context

1
2
3
4
// DialOption configures how we set up the connection.
type DialOption interface {
apply(*dialOptions)
}

可以看出,DialOption是一个接口类型,有一个apply方法(接收一个dialOption的指针类型)。

接下来看一下使用的地方

DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
	cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
...
for _, opt := range opts {
opt.apply(&cc.dopts)
}
....
}

在看一下grpc中如何生成实现DialOption接口的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// EmptyDialOption does not alter the dial configuration. It can be embedded in
// another structure to build custom dial options.
//
// This API is EXPERIMENTAL.
type EmptyDialOption struct{}

func (EmptyDialOption) apply(*dialOptions) {}

// funcDialOption wraps a function that modifies dialOptions into an
// implementation of the DialOption interface.
type funcDialOption struct {
f func(*dialOptions)
}

func (fdo *funcDialOption) apply(do *dialOptions) {
fdo.f(do)
}

func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
return &funcDialOption{
f: f,
}
}

grpc包中提供了两个实现了DialOption的类型方便使用者去生成参数,特别是这个 funcDialOption 类型 ,提供了new方法得以让使用者去注入逻辑处理函数。

我们在Dial中传入的参数,grpc.WithInsecure(),其函数体的逻辑就是调用newFuncDialOption函数传入一个对clientCoon的secure属性进行设置(关闭客户端连接校验)的函数。

1
2
3
4
5
6
7
8
// WithInsecure returns a DialOption which disables transport security for this
// ClientConn. Note that transport security is required unless WithInsecure is
// set.
func WithInsecure() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.insecure = true
})
}

我们再来看一下dialOptions的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor //一元拦截器
streamInt StreamClientInterceptor //流式拦截器

chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor

cp Compressor //压缩
dc Decompressor //解压缩
bs internalbackoff.Strategy //重试策略
block bool //拨号是否阻塞
insecure bool //安全校验
timeout time.Duration
scChan <-chan ServiceConfig
authority string
copts transport.ConnectOptions
callOptions []CallOption
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
channelzParentID int64
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
healthCheckFunc internal.HealthChecker
minConnectTimeout func() time.Duration
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
// This is used by ccResolverWrapper to backoff between successive calls to
// resolver.ResolveNow(). The user will have no need to configure this, but
// we need to be able to configure this in tests.
resolveNowBackoff func(int) time.Duration
resolvers []resolver.Builder
}

以上说了一堆,其实就是客户端拨号连接时,有一些默认配置,如果需要修改这些配置,就需要传递实现的了DialOption接口的类型值切片进来覆盖默认配置。gRPC包通过暴露接口的形式来实现对包内不可导出类型属性的修改。

ClientConn

整个Dial拨号调用过程中,ClientConn是最重要的类型,它是一个结构体类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc

target string //目标地址
parsedTarget resolver.Target //地址解析器
authority string
dopts dialOptions //拨号建立链接相关参数
csMgr *connectivityStateManager //链接状态管理器

balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper

mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters //长连接保活相关
curBalancerName string
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value

firstResolveEvent *grpcsync.Event

channelzID int64 // channelz unique identification number
czData *channelzData
}

也就是说,ClientCoon是一个终端(客户端)逻辑TCP链接,用来执行RPC相关业务。它封装了一些功能:命名(目标地址)解析、建立TCP连接(带重试和回退),及建立连接过程中的一些错误处理。

ClientCoon结构体包含dialOptions类型的字段,这个dialOptions结构体类型还有一个属性ConnectOptions需要我们关注下,这个结构体类型包含了所有与服务端交流是相关的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client
// connection. Only one of TransportCredentials and CredsBundle is non-nil.
TransportCredentials credentials.TransportCredentials
// CredsBundle is the credentials bundle to be used. Only one of
// TransportCredentials and CredsBundle is non-nil.
CredsBundle credentials.Bundle
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.
InitialConnWindowSize int32
// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID int64
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
}

整体调用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())

//连接相关默认配置覆盖
for _, opt := range opts {
opt.apply(&cc.dopts)
}

//拦截器相关设置(将拦截器串联起来形成一个调用链,其实就是放入cc.dopts.chainUnaryInts或cc.dopts.chainStreamInts里)
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)

defer func() {
if err != nil {
cc.Close()
}
}()
// RPC运行时信息收集工具相关处理
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
Severity: channelz.CtINFO,
},
})
} else {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
channelz.Info(cc.channelzID, "Channel Created")
}
cc.csMgr.channelzID = cc.channelzID
}

//tls连接加密证书相关检查
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
} else {
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
return nil, errCredentialsConflict
}
for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
}
}
}
// 如果提供了服务配置
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
// 设置默认服务配置
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
//keepalive相关配置
cc.mkp = cc.dopts.copts.KeepaliveParams

//如果没设置拨号函数,则使用默认的拨号函数,如果有设置代理的话,这个函数会根据系统环境变量(HTTP_PROXY或者HTTPS_PROXY)的代理设置来进行网络连接
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
)
}
//UserAgent添加“grpcUA”
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
// 如果设置了超时
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
}()

scSet := false
// 如果提供了scChan,支持对serviceConfig进行热更
if cc.dopts.scChan != nil {
// Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
scSet = true// 成功获取初始的serviceConfig
}
default:
}
}
// 提供retry时的退避算法
if cc.dopts.bs == nil {
cc.dopts.bs = backoff.DefaultExponential
}

// resolverBuilder,用于解析target为目标服务列表
// Determine the resolver to use.
cc.parsedTarget = grpcutil.ParseTarget(cc.target)
channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
}
}
//连接证书
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc.authority = cc.parsedTarget.Endpoint
}
// 如果提供了scChan但是还没有获取到初始的serviceConfig,则阻塞等待serviceConfig
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
// 启动子协程,监听scChan,进行serviceConfig的热更
if cc.dopts.scChan != nil {
go cc.scWatcher()
}

var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}

// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()

// A blocking dial blocks until the clientConn is ready.
// 默认Dial不会等待网络连接完成,如果指定了blcok,则会阻塞等待网络连接完成才返回
if cc.dopts.block {
for {
s := cc.GetState()
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
return nil, ctx.Err()
}
}
}

return cc, nil
}

所以,从以上代码可以看出,调用Dial函数实际上是调用了DialContext函数,而DialContext主要是对 ConnectCoon结构体、 dialOptions结构体(cc.dopts)及ConnectOptions结构体(cc.dopts.copts)做初始化操作。