Fork me on GitHub
厦门鱼肝油厂


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

longhorn是如何确定哪个longhorn-manager实例才是负责处理资源调谐的

发表于 2021-07-30

以volume create 为例

volumen 资源创建后,volume Informer watch 到资源变化,volume controller 的worker 开始处理,因为longhorn-manager 实例有多个,也就意味着informer有多个,那么,究竟是哪个manager的informer才是处理某次资源变化的负责人呢?

代码探析:

==controller/volume_controller.go==

1
2
3
4
5
6
7
8
// 当前的节点 manager 是否负责运行sync逻辑
isResponsible, err := vc.isResponsibleFor(volume, defaultEngineImage)
if err != nil {
return err
}
if !isResponsible {
return nil
}

我们查看下 isResponsibleFor 方法的逻辑,代码省略了判错及一些不影响主流程阅读的内容

==controller/volume_controller.go==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// isResponsibleFor picks a running node that has the default engine image deployed.
// We need the default engine image deployed on the node to perform operation like backup operations.
// Prefer picking the node v.Spec.NodeID if it meet the above requirement. 如果满足上述要求,则优先选择节点 v.Spec.NodeID
func (vc *VolumeController) isResponsibleFor(v *longhorn.Volume, defaultEngineImage string) (bool, error) {
var err error


isResponsible := isControllerResponsibleFor(vc.controllerID, vc.ds, v.Name, v.Spec.NodeID, v.Status.OwnerID)

// No node in the system has the default engine image,
// Fall back to the default logic where we pick a running node to be the owner
if len(readyNodesWithDefaultEI) == 0 {
return isResponsible, nil
}

preferredOwnerEngineAvailable, err := vc.ds.CheckEngineImageReadiness(defaultEngineImage, v.Spec.NodeID)

currentOwnerEngineAvailable, err := vc.ds.CheckEngineImageReadiness(defaultEngineImage, v.Status.OwnerID)

currentNodeEngineAvailable, err := vc.ds.CheckEngineImageReadiness(defaultEngineImage, vc.controllerID)


// 如果当前节点engine不可用,那么就不负责处理了
// 当前节点engine可用 && (最优、继续、需要新的,都是当前节点)
isPreferredOwner := currentNodeEngineAvailable && isResponsible

// (当前那节点engine可用 && 最优节点engine 不可用 && 当前控制器就是的节点就是之前的owner节点) 继续当前节点
continueToBeOwner := currentNodeEngineAvailable && !preferredOwnerEngineAvailable && vc.controllerID == v.Status.OwnerID

// (当前那节点engine可用 && 最优节点engine 不可用 && owner节点不engine不可用) 选择当前节点
requiresNewOwner := currentNodeEngineAvailable && !preferredOwnerEngineAvailable && !currentOwnerEngineAvailable

return isPreferredOwner || continueToBeOwner || requiresNewOwner, nil
}

我们继续看下 isControllerResponsibleFor,这个函数是longhorn-manager 资源用来判断当前 manager是否负责处理 sync 逻辑的 通用函数
==controller/controller_manager.go==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 挑选指导原则:优先 Spec.NodeId,优先最优节点
func isControllerResponsibleFor(controllerID string, ds *datastore.DataStore, name, preferredOwnerID, currentOwnerID string) bool {
// we use this approach so that if there is an issue with the data store
// we don't accidentally transfer ownership
isOwnerUnavailable := func(node string) bool {
isUnavailable, err := ds.IsNodeDownOrDeletedOrMissingManager(node)
if node != "" && err != nil {
logrus.Errorf("Error while checking IsNodeDownOrDeletedOrMissingManager for object %v, node %v: %v", name, node, err)
}
return node == "" || isUnavailable
}

// 当前节点是最优节点
isPreferredOwner := controllerID == preferredOwnerID
// owner节点是当前节点,且最优节点不可用,那么还是用当前owner节点
continueToBeOwner := currentOwnerID == controllerID && isOwnerUnavailable(preferredOwnerID)
// owner节点不可用,最优节点不可用,需要新节点
requiresNewOwner := isOwnerUnavailable(currentOwnerID) && isOwnerUnavailable(preferredOwnerID)
return isPreferredOwner || continueToBeOwner || requiresNewOwner
}

总结:

通过以上代码,我们可以总结出:lognhorn-manager 实例主要是判断当前节点是否可负责处理sync逻辑,而判断能否使用当前节点:

最优先条件是当前节点是 perfer(最优节点,perfer的值一般是 xx.Spec.NodeID)

其次是当前节点就是owner节点

最次是最优节点及owner节点都不可用了,那么就使用当前节点

可以看出,节点的挑选最终的优化方向是往 “资源.Spec.NodeId 是哪个,那么就由这个期望的node上的manager来处理”

==注:voulume contoller 这一块加入了 节点engineImage 的判断,与其他资源(例:replica)有些不同,但核心的处理逻辑还是一致的==

(最优不可用,Owner不可用),两个或多个节点都获取到了负责权,这种情况下,是通过 更新 Status.OwnerID,通过 k8s 的版本冲突机制来保证只有最先更新的manager获得处理权,即这一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 这里是 让哪个 vc instance 来处理 volume,如果是volume create,那就是第一个update 的 vc 获取处理权
// 如果只是 volume (update等)其他操作,也是谁先更新谁获得处理权
if volume.Status.OwnerID != vc.controllerID {
volume.Status.OwnerID = vc.controllerID
volume, err = vc.ds.UpdateVolumeStatus(volume)
if err != nil {
// we don't mind others coming first
if apierrors.IsConflict(errors.Cause(err)) {
return nil
}
return err
}
log.Debugf("Volume got new owner %v", vc.controllerID)
}

longhron磁盘发现机制探索

发表于 2021-07-30

longhorn 的crd nodes.longhorn.io 里是有存储disk信息的,这个node自定义资源的创建是在 longhorn-manager Daemon 启动时initDaemonNode

datastore/longhorn.go:CreateDefaultNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

// For newly added node, the customized default disks will be applied only if the setting is enabled.
if !requireLabel {
// Note: this part wasn't moved to the controller is because
// this will be done only once.
// If user remove all the disks on the node, the default disk
// will not be recreated automatically
dataPath, err := s.GetSettingValueExisted(types.SettingNameDefaultDataPath)
// dataPath : 默认是/var/lib/longhorn/,从settings crd中获取
if err != nil {
return nil, err
}
disks, err := types.CreateDefaultDisk(dataPath)
if err != nil {
return nil, err
}
node.Spec.Disks = disks
}

创建默认disk 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

func CreateDefaultDisk(dataPath string) (map[string]DiskSpec, error) {
if err := util.CreateDiskPathReplicaSubdirectory(dataPath); err != nil {
return nil, err
}
diskInfo, err := util.GetDiskInfo(dataPath)
if err != nil {
return nil, err
}
return map[string]DiskSpec{
DefaultDiskPrefix + diskInfo.Fsid: {
Path: diskInfo.Path,
AllowScheduling: true,
EvictionRequested: false,
StorageReserved: diskInfo.StorageMaximum * 30 / 100, // 磁盘预留空间,也即是我们在longhorn Dashboard Node.Size 看到的 Reserved 值
},
}, nil
}

CreateDiskPathReplicaSubdirectory 这个函数只是创建 replicas 子目录,而核心逻辑是GetDiskInfo获取disk信息,那么接下来我们就看下具体代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func GetDiskInfo(directory string) (info *DiskInfo, err error) {
defer func() {
err = errors.Wrapf(err, "cannot get disk info of directory %v", directory)
}()
initiatorNSPath := iscsi_util.GetHostNamespacePath(HostProcPath)
mountPath := fmt.Sprintf("--mount=%s/mnt", initiatorNSPath)
// mountPath: /host/proc/pid/ns/
// stat -f -c "{\"path\":\"%n\",\"fsid\":\"%i\",\"type\":\"%T\",\"freeBlock\":%f,\"totalBlock\":%b,\"blockSize\":%S}" /var/lib/longhorn/
// nsenter --mount=/host/proc/pid/ns/mnt command(stat -fc ...)
output, err := Execute([]string{}, "nsenter", mountPath, "stat", "-fc", "{\"path\":\"%n\",\"fsid\":\"%i\",\"type\":\"%T\",\"freeBlock\":%f,\"totalBlock\":%b,\"blockSize\":%S}", directory)
if err != nil {
return nil, err
}
output = strings.Replace(output, "\n", "", -1)

diskInfo := &DiskInfo{}
err = json.Unmarshal([]byte(output), diskInfo)
if err != nil {
return nil, err
}

diskInfo.StorageMaximum = diskInfo.TotalBlock * diskInfo.BlockSize
diskInfo.StorageAvailable = diskInfo.FreeBlock * diskInfo.BlockSize

return diskInfo, nil
}

可以看出,核心就是用了 linux 的nsenter命令,以及用 stat 命令获取 文件系统信息,最后反序列化转换成自定义disk结构体

附录:
neenter 命令:https://staight.github.io/2019/09/23/nsenter%E5%91%BD%E4%BB%A4%E7%AE%80%E4%BB%8B/

grpc-go 服务端接收请求响应源码分析

发表于 2020-05-09 | 分类于 Golang

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type TigerServer struct {}

func (t *TigerServer) HelloTiger(ctx context.Context,req *server_hello_proto.HelloRequest) ( *server_hello_proto.HelloResponse, error) {
resp := &server_hello_proto.HelloResponse{}
resp.Name = req.Name
resp.Age = req.Age + 11
return resp,nil
}

func main() {
grpcServer := grpc.NewServer()
server_hello_proto.RegisterTigerServiceServer(grpcServer,new(TigerServer))

listener, err := net.Listen("tcp", "127.0.0.1:1235")
if err != nil{
log.Fatal(err)
}
_ = grpcServer.Serve(listener)
}

从以上代码可以看出,开启一个gRPC服务的代码非常简洁

  1. 实例化一个grpc.Server
  2. 将实现了pb文件中service接口的服务对象注册进grpc.Server
  3. 监听端口
  4. 接收连接请求

实例化server

首先我们来看实例化一个server做了些什么操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}

if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
return s
}

可以看出,这个函数与客户端Dial的处理流程很相似,都是接收一个可变参数用于覆盖默认配置,然后是初始化grpc.Server结构体,拦截器相关处理及一些grpc调用监控相关的初始化处理。

这里最关键的就是初始化grpc.Server结构体了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Server struct {
opts serverOptions //服务端可选配置项

mu sync.Mutex // guards following
lis map[net.Listener]bool //监听地址列表
conns map[transport.ServerTransport]bool //连接列表
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info //一个映射,通过service对象名找到这个service对象的信息
events trace.EventLog

quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop

channelzID int64 // channelz unique identification number
czData *channelzData
}

服务端可选配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type serverOptions struct {
creds credentials.TransportCredentials //加密证书相关(TLS)
codec baseCodec //序列化,如protobuf
cp Compressor //压缩 gzip
dc Decompressor //解压缩 gzip
unaryInt UnaryServerInterceptor //单次拦截器
streamInt StreamServerInterceptor //流式拦截器
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
inTapHandle tap.ServerInHandle
statsHandler stats.Handler
//HTTP2 协议相关规范
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
//长连接保活相关,客户端会专门起一个goroutine去处理长连接事宜
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
}

将注册服务对象注册进server

将服务对象注册进server的函数是pb文件中用protoc代码生成插件protoc-gen-go工具自动生成的。

pb文件中服务端相关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 服务描述
var _TigerService_serviceDesc = grpc.ServiceDesc{
ServiceName: "server_hello_proto.TigerService", //服务名
HandlerType: (*TigerServiceServer)(nil), //这里申明了该服务需要实现的接口
Methods: []grpc.MethodDesc{ //服务方法描述
{
MethodName: "HelloTiger", //rpc方法名
Handler: _TigerService_HelloTiger_Handler,//rpc请求的handler
},
{
MethodName: "FeedTiger",
Handler: _TigerService_FeedTiger_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Channel",
Handler: _TigerService_Channel_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "hello.proto",
}

func RegisterTigerServiceServer(s *grpc.Server, srv TigerServiceServer) {
//传入服务描述及具体的服务实现对象
s.RegisterService(&_TigerService_serviceDesc, srv)
}

以下代码可以看出,实际上还是调用grpc包中的注册方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve.
// 这个方法由IDL(也就是protobuf)通过插件生成的代码调用。这个调用必须要在Server开始接受请求之前调用
//这段代码主要是判断具体服务是否实现了服务接口
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
s.register(sd, ss)
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
//如果server已经运行了,不允许注册
if s.serve {
grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
//不允许注册相同名字的服务
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}

可以看出,register主要是将服务名和具体服务信息添加进Server的属性映射m,也就是服务名与具体服务信息的映射,相当于是http web中的路由表。

server开始监听并接收请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
//置为true,则标志着Server真正开始运行
s.serve = true
if s.lis == nil {
// Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}

s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
// Stop or GracefulStop called; block until done and return nil.
<-s.done.Done()
}
}()

//一个Server可以监听多个端口
ls := &listenSocket{Listener: lis}
s.lis[ls] = true

if channelz.IsOn() {
ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
}
s.mu.Unlock()

//退出时,将lis映射中的ls监听删除
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()

var tempDelay time.Duration // how long to sleep on accept failure

for {
rawConn, err := lis.Accept()
if err != nil {
if ne, ok := err.(interface {
Temporary() bool
}); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
case <-s.quit.Done():
timer.Stop()
return nil
}
continue
}
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()

if s.quit.HasFired() {
return nil
}
return err
}
tempDelay = 0
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}

从代码可以看出,每个请求进来,Server会起一个goroutine去处理这个请求,真正处理客户端请求的是 handleRawConn 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
}
// 设置 read和 write的Deadline
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
//如果有配置TLS证书之类的,要进行TLS握手相关操作
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
return
}

//完成HTTP2握手(处理http2连接的建立,http2连接的建立也需要客户端和服务端交换,即http2 Connection Preface)
//将客户端HTTP2请求连接转换为 http2协议类型的transport.ServerTransport,也就是http2Server
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}

//将链接的deadline置为空,因为这时握手已经完成了,而客户端并不是一直都在发送请求的
rawConn.SetDeadline(time.Time{})
//将HTTP2连接添加进Server的HTTP2链接映射
if !s.addConn(st) {
return
}
go func() {
//等待rpc请求到来,处理请求
s.serveStreams(st)
//移除http2连接
s.removeConn(st)
}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 每个http2连接在服务端会生成一个ServerTransport,这里是 htt2server
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
InitialConnWindowSize: s.opts.initialConnWindowSize,
WriteBufferSize: s.opts.writeBufferSize,
ReadBufferSize: s.opts.readBufferSize,
ChannelzParentID: s.channelzID,
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
}
//返回了实现了ServerTransport接口的http2Server
//ServerTransport接口规定了HandleStreams、WriteHeader 等方法
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
s.mu.Lock()
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
c.Close()
channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
return nil
}

return st
}
1
2
3
4
5
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
return newHTTP2Server(conn, config)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
//初始化一些读写缓冲长度的限制
writeBufSize := config.WriteBufferSize
readBufSize := config.ReadBufferSize
maxHeaderListSize := defaultServerMaxHeaderListSize
if config.MaxHeaderListSize != nil {
maxHeaderListSize = *config.MaxHeaderListSize
}
//封装帧的读取,底层用的是http2.frame
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
//初始化配置帧(HTTP2连接前奏)
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
// 流的最大数量
maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
isettings = append(isettings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: maxStreams,
})
}
dynamicWindow := true
iwz := int32(initialWindowSize)
if config.InitialWindowSize >= defaultWindowSize {
iwz = config.InitialWindowSize
dynamicWindow = false
}
icwz := int32(initialWindowSize)
if config.InitialConnWindowSize >= defaultWindowSize {
icwz = config.InitialConnWindowSize
dynamicWindow = false
}
if iwz != defaultWindowSize {
isettings = append(isettings, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(iwz)})
}
if config.MaxHeaderListSize != nil {
isettings = append(isettings, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *config.MaxHeaderListSize,
})
}
if config.HeaderTableSize != nil {
isettings = append(isettings, http2.Setting{
ID: http2.SettingHeaderTableSize,
Val: *config.HeaderTableSize,
})
}
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
}
// tcp连接的KeepAlive相关参数
kp := config.KeepaliveParams
if kp.MaxConnectionIdle == 0 {
kp.MaxConnectionIdle = defaultMaxConnectionIdle
}
if kp.MaxConnectionAge == 0 {
kp.MaxConnectionAge = defaultMaxConnectionAge
}
// Add a jitter to MaxConnectionAge.
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
if kp.MaxConnectionAgeGrace == 0 {
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
// 最大idle时间,超过此客户端连接将被关闭,默认无穷
if kp.Time == 0 {
kp.Time = defaultServerKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
kep := config.KeepalivePolicy
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
done := make(chan struct{})
t := &http2Server{
ctx: context.Background(),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: config.AuthInfo,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
}
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
}

t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)

t.framer.writer.Flush()

defer func() {
if err != nil {
t.Close()
}
}()

// 校验客户端http2建立前奏的perface是否有效
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
if !bytes.Equal(preface, clientPreface) {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
}

frame, err := t.framer.fr.ReadFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, err
}
if err != nil {
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
}
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
}
t.handleSettings(sf)

go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
errorf("transport: loopyWriter.run returning. Err: %v", err)
}
t.conn.Close()
close(t.writerDone)
}()
//起一个goroutine专门处理tcp保活
go t.keepalive()
return t, nil
}

一个HTTP2请求进来,先HTT2协议握手建立HTT2连接(处理建立连接过程中的帧数据),然后一直循环处理新的流的建立(新的HTTP2请求到来)和数据帧的收发(基于HTTP2的多路复用,客户端可以使用同一条链接同时发送多个请求)。HTTP2链接在代码层面为ServerTransport,也就是http2Server

接下来我们看下serveStreams方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
//回调中开启子goroutine,处理rpc请求
go func() {
defer wg.Done()
//grpc基于http2,一个rpc请求使用一个stream,一个连接可能有多个rpc请求,也就存在多个请求stream过来
//当有新的rpc请求进来,就会进入此回调,然后调用server的handleStream方法处理rpc请求
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}

可以看出,这里是使用waitGroup进行阻塞等待,不断处理同一个http2连接的请求。

HandleStreams使用注册的Handler处理请求streams

1
HandleStreams(func(*Stream), func(context.Context, string) context.Context)

这是一个接口方法,这里的具体实现是http2Server的HandleStreams方法,传入一个处理stream的handler处理函数,一个trace处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
rst: true,
rstCode: se.Code,
onWrite: func() {},
})
}
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}

可以看出该方法循环读取客户端连接发送过来的帧,如果是HEADER帧,说明有新的rpc请求进来,回调handler;如果是DATA帧,将数据分发到stream;如果是RST帧…

接着我们来看下server的handleStream方法,该方法处理新的rpc请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
//下面的这段代码主要是分割字符串取出service服务名与相应的方法名
sm := stream.Method()
//去除开头的'/'
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
service := sm[:pos]
method := sm[pos+1:]

//在server的service映射中根据service名查找对应的service服务
srv, knownService := s.m[service]
if knownService {//如果有找到
//在普通的rpc方法映射中查找对应的方法描述
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
//在流式rpc方法映射中查找对应的流描述
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
//如果请求的service服务或者方法不存在,且server的配置中有配置处理位置服务的方法,则交由这个方法处理
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
var errDesc string
if !knownService {
errDesc = fmt.Sprintf("unknown service %v", service)
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if trInfo != nil {
trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}

接下来我们看下普通一元rpc请求的processUnaryRPC(去除掉一些与主要流程没有太大关系的非核心代码,流式rpc处理方法就不贴出来的,思路是差不多的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
// ...


//设置压缩解压缩的配置
// comp and cp are used for compression. decomp and dc are used for
// decompression. If comp and decomp are both set, they are the same;
// however they are kept separate to ensure that at most one of the
// compressor/decompressor variable pairs are set for use later.
var comp, decomp encoding.Compressor
var cp Compressor
var dc Decompressor


// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
dc = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
decomp = encoding.GetCompressor(rc)
if decomp == nil {
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
t.WriteStatus(stream, st)
return st.Err()
}
}

// If cp is set, use it. Otherwise, attempt to compress the response using
// the incoming message compression method.
//
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
cp = s.opts.cp
stream.SetSendCompress(cp.Type())
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
stream.SetSendCompress(rc)
}
}

var payInfo *payloadInfo
if sh != nil || binlog != nil {
payInfo = &payloadInfo{}
}
//接收数据并解压缩
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if st, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, st); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
}
}
return err
}
if channelz.IsOn() {
t.IncrMsgRecv()
}
// 反序列化,最终数据放到v中,而这个v则指向服务接口实现对应方法的请求参数req
df := func(v interface{}) error {
//反序列化请求参数
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
WireLength: payInfo.wireLength,
Data: d,
Length: len(d),
})
}
//...
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
//调用Handler,这个Handler方法是proto自动编码工具生成的。其内部会去调用service服务的对应方法。df是反序列化方法,最后一个是创建grpc.Server指定的拦截器
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
if appErr != nil {
appStatus, ok := status.FromError(appErr)
if !ok {
// Convert appErr if it is not a grpc status error.
appErr = status.Error(codes.Unknown, appErr.Error())
appStatus, _ = status.FromError(appErr)
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
trInfo.tr.SetError()
}
// 写入错误信息到stream中
if e := t.WriteStatus(stream, appStatus); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
//...
}
return appErr
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
}
opts := &transport.Options{Last: true}

// 序列化reply给客户端
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
}
if sts, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, sts); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
} else {
switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
}
if binlog != nil {
h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{
Header: h,
})
binlog.Log(&binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
return err
}

err = t.WriteStatus(stream, statusOK)

return err
}

上面这段代码的主要逻辑就是从stream中读取req请求,反序列化后调用methodDesc中的handler方法,处理完请求后将响应序列化然后写入stream返回给客户端。

我们看一下消息解析的方法,这个方法是从buf中解析原始的数据转为protobuf序列化后的数据格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
// recvMsg解析出真正的消息(头5个字节:第一个字节代表是否压缩,2-5个字节消息体的长度,后面的数据全部读取给req)
pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return nil, err
}
if payInfo != nil {
payInfo.wireLength = len(d)
}
// 检查压缩类型是否正确
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
return nil, st.Err()
}

var size int
if pf == compressionMade {
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
d, err = dc.Do(bytes.NewReader(d))
size = len(d)
} else {
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
} else {
size = len(d)
}
if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
return d, nil
}

我们再来看下上面提到的proto代码工具自动生成的handler方法(以HelloTiger为例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func _TigerService_HelloTiger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
//dec是传入的反序列化方法
if err := dec(in); err != nil {
return nil, err
}
//如果拦截器没设置,那么直接调用相应的方法
if interceptor == nil {
return srv.(TigerServiceServer).HelloTiger(ctx, in)
}
//servcie服务信息
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/server_hello_proto.TigerService/HelloTiger",
}
//回调
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TigerServiceServer).HelloTiger(ctx, req.(*HelloRequest))
}
//有拦截器,先调用拦截器方法,传入回调函数
return interceptor(ctx, in, info, handler)
}

我们再来看下中的NewContextWithServerTransportStream函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//该函数基于现有ctx生成新的context,并将stream保存到上面
// NewContextWithServerTransportStream creates a new context from ctx and
// attaches stream to it.
//
// This API is EXPERIMENTAL.
func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
return context.WithValue(ctx, streamKey{}, stream)
}


//该接口用于服务端设置传递给客户端的header
// ServerTransportStream is a minimal interface that a transport stream must
// implement. This can be used to mock an actual transport stream for tests of
// handler code that use, for example, grpc.SetHeader (which requires some
// stream to be in context).
//
// See also NewContextWithServerTransportStream.
//
// This API is EXPERIMENTAL.
type ServerTransportStream interface {
Method() string
SetHeader(md metadata.MD) error
SendHeader(md metadata.MD) error
SetTrailer(md metadata.MD) error
}

我们从代码中可以看出,调用NewContextWithServerTransportStream函数时传入的是当前请求的stream.context()

我们再来看下 stream.ctx的生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
streamID := frame.Header().StreamID
state := &decodeState{
serverSide: true,
}
//解析HEADER帧,获取HEADER帧中的各个字段
if err := state.decodeHeader(frame); err != nil {
if se, ok := status.FromError(err); ok {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: statusCodeConvTab[se.Code()],
onWrite: func() {},
})
}
return false
}

buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
recvCompress: state.data.encoding,
method: state.data.method,
contentSubtype: state.data.contentSubtype,
}
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
if state.data.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
if len(state.data.mdata) > 0 {
// 这里会将state.mdata保存到新的context中
s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
}
if state.data.statsTags != nil {
s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
}
if state.data.statsTrace != nil {
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
}
if t.inTapHandle != nil {
var err error
info := &tap.Info{
FullMethodName: state.data.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
s.cancel()
return true
}
t.maxStreamID = streamID
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: metadata.MD(state.data.mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
// Register the stream with loopy.
t.controlBuf.put(&registerStream{
streamID: s.id,
wq: s.wq,
})
handle(s)
return false
}

当收到一个Header帧,就表明有新的rpc请求到来,这时候就会解析header帧并创建stream,在创建stream的时候,会把用户自定义的header字段保存到stream.context中

接下来看一下返回给客户端响应的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
//反序列化响应消息
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
if err != nil {
channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
return err
}
//压缩
compData, err := compress(data, cp, comp)
if err != nil {
channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
return err
}
//创建消息体头部
hdr, payload := msgHeader(data, compData)
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > s.opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
}
//写入stream
err = t.Write(stream, hdr, payload, opts)
if err == nil && s.opts.statsHandler != nil {
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
}
return err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
//如果Header帧还没发送,先发送HEADER帧
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
if _, ok := err.(ConnectionError); ok {
return err
}
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return status.Errorf(codes.Internal, "transport: %v", err)
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
}
}
// Add some data to header frame so that we can equally distribute bytes across frames.
emptyLen := http2MaxFrameLen - len(hdr)
if emptyLen > len(data) {
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
//构造数据DATA帧
df := &dataFrame{
streamID: s.id,
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
}
//将DATA帧加入到发送队列
return t.controlBuf.put(df)
}

grpc-go 客户端invoke调用源码分析

发表于 2020-05-09 | 分类于 Golang
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
conn, err := grpc.Dial("127.0.0.1:1235", grpc.WithInsecure())
defer conn.Close()
if err != nil{
log.Fatal(err)
}

client := server_hello_proto.NewTigerServiceClient(conn)
response, err := client.HelloTiger(context.Background(), &server_hello_proto.HelloRequest{
Name: "ban",
Age: 11,
})
if err != nil{
log.Fatal(err)
}
log.Println(response)
}

上一篇我们分析了Dial建立连接的流程。现在继续往下分析。

1
client := server_hello_proto.NewTigerServiceClient(conn)

这里是使用pb文件的代码去新建一个连接client,其实也就是使用一个结构体类型(TigerServiceClient)的值保存clientConn连接属性。

1
2
3
type tigerServiceClient struct {
cc *grpc.ClientConn
}

这个TigerServiceClient类型实现了TigerServiceClient接口,也就是实现了根据proto定义的rppc service 接口编译自动生成的客户端stub api接口。

1
2
3
4
type TigerServiceClient interface {
HelloTiger(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
...
}

HelloTiger这个方法入参有三个,查看grpc.CallOption这个参数类型,这是一个接口类型

1
2
3
4
5
6
7
8
9
10
11
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
// before is called before the call is sent to any server. If before
// returns a non-nil error, the RPC fails with that error.
before(*callInfo) error

// after is called after the call has completed. after cannot return an
// error, so any failures should be reported via output parameters.
after(*callInfo)
}

这个接口类型有before和after方法,我们可以实现这个接口,在RPC方法调用前后会调用before和after方法去执行我们的实现逻辑。

然后我们再看客户端stub api的方法实现

1
2
3
4
5
6
7
8
func (c *tigerServiceClient) HelloTiger(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) {
out := new(HelloResponse)
err := c.cc.Invoke(ctx, "/server_hello_proto.TigerService/HelloTiger", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

这里的核心是Invoke调用,也就是本篇分析的重点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

// Invoke sends the RPC request on the wire and returns after response is
// received. This is typically called by generated code.
// Invoke 在逻辑链接ClientCoon上发送RPC请求,收到响应后返回。Invoke通常由proto代码生成工具生成的stub代码发起调用
//
// All errors returned by Invoke are compatible with the status package.
//所有Invoke调用返回的错误都与 status 包兼容
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
//将Dial初始化的callOption与Invoke调用传入的callOption合并到一个切片,允许拦截器去访问这些调用可选配置callOptions
opts = combine(cc.dopts.callOptions, opts)


if cc.dopts.unaryInt != nil {
//拦截器不为空,就调用这个方法(实际上也要调用invoke函数)
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
//拦截器为空,直接调用invoke函数
return invoke(ctx, method, args, reply, cc, opts...)
}

invoke函数声明

1
2
3
4
5
6
7
8
9
10
11
12
13
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
//获取传输层 Trasport 并组合封装到 ClientStream 中返回,在这块会涉及负载均衡、超时控制、 Encoding、 Stream 的动作,与服务端基本一致的行为。
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
//发送请求
if err := cs.SendMsg(req); err != nil {
return err
}
//返回响应
return cs.RecvMsg(reply)
}

创建客户端流对象时,会循环调用callOption的before方法,做一些发送请求前的处理

1
2
3
4
5
for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}

发送请求SendMsg方法声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

func (cs *clientStream) SendMsg(m interface{}) (err error) {
defer func() {
if err != nil && err != io.EOF {
// Call finish on the client stream for errors generated by this SendMsg
// call, as these indicate problems created by this client. (Transport
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
// error will be returned from RecvMsg eventually in that case, or be
// retried.)
cs.finish(err)
}
}()
// 判断客户端流是否已关闭
if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
// 判断当前流是否是客户端流
if !cs.desc.ClientStreams {
cs.sentLast = true
}

//对请求信息预处理,序列化、压缩,生成
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}

// TODO(dfawley): should we be checking len(data) instead?
//判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
msgBytes := data // Store the pointer before setting to nil. For binary logging.
// 创建发送消息的函数
op := func(a *csAttempt) error {
//真正发送数据的地方
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
}
// 开始发送(带重试机制)
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
Message: msgBytes,
})
}
return
}

客户端流对象接收响应信息方法声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.binlog != nil && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
cs.Header()
}
var recvInfo *payloadInfo
if cs.binlog != nil {
recvInfo = &payloadInfo{}
}
//接收服务端结果,并且反序列化,填充到m对象上,m就是返回值
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes,
})
}
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)

if cs.binlog != nil {
// finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: cs.Trailer(),
Err: err,
}
if logEntry.Err == io.EOF {
logEntry.Err = nil
}
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.binlog.Log(logEntry)
}
}
return err
}

grpc-go 客户端Dial过程源码分析

发表于 2020-05-09 | 分类于 Golang

前言:最近做了小组内的分享,是关于浅谈RPC与gRpc源码分析。所以将分享的记录发到个人博客上来。接下来的几篇文章都是gRPC源码分析的个人理解,如有错误,还烦请指出。

源码分析所用案例github仓库地址:https://github.com/BansheeLW/gRPCTest

客户端建立链接发送一次请求获取响应代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
conn, err := grpc.Dial("127.0.0.1:1235", grpc.WithInsecure())
defer conn.Close()
if err != nil{
log.Fatal(err)
}

client := server_hello_proto.NewTigerServiceClient(conn)
response, err := client.HelloTiger(context.Background(), &server_hello_proto.HelloRequest{
Name: "ban",
Age: 11,
})
if err != nil{
log.Fatal(err)
}
log.Println(response)

首先来看 Dial函数

1
2
3
4
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}

从Dial函数声明可以看出,Dial函数实际上是对DialContext的一层封装,Dial函数接收两个参数,target是连接地址,opts是可变长参数,接受一个元素类型为DialOptionl的切片。

DialContext入参多了一个context

1
2
3
4
// DialOption configures how we set up the connection.
type DialOption interface {
apply(*dialOptions)
}

可以看出,DialOption是一个接口类型,有一个apply方法(接收一个dialOption的指针类型)。

接下来看一下使用的地方

DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
	cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
...
for _, opt := range opts {
opt.apply(&cc.dopts)
}
....
}

在看一下grpc中如何生成实现DialOption接口的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// EmptyDialOption does not alter the dial configuration. It can be embedded in
// another structure to build custom dial options.
//
// This API is EXPERIMENTAL.
type EmptyDialOption struct{}

func (EmptyDialOption) apply(*dialOptions) {}

// funcDialOption wraps a function that modifies dialOptions into an
// implementation of the DialOption interface.
type funcDialOption struct {
f func(*dialOptions)
}

func (fdo *funcDialOption) apply(do *dialOptions) {
fdo.f(do)
}

func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
return &funcDialOption{
f: f,
}
}

grpc包中提供了两个实现了DialOption的类型方便使用者去生成参数,特别是这个 funcDialOption 类型 ,提供了new方法得以让使用者去注入逻辑处理函数。

我们在Dial中传入的参数,grpc.WithInsecure(),其函数体的逻辑就是调用newFuncDialOption函数传入一个对clientCoon的secure属性进行设置(关闭客户端连接校验)的函数。

1
2
3
4
5
6
7
8
// WithInsecure returns a DialOption which disables transport security for this
// ClientConn. Note that transport security is required unless WithInsecure is
// set.
func WithInsecure() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.insecure = true
})
}

我们再来看一下dialOptions的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor //一元拦截器
streamInt StreamClientInterceptor //流式拦截器

chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor

cp Compressor //压缩
dc Decompressor //解压缩
bs internalbackoff.Strategy //重试策略
block bool //拨号是否阻塞
insecure bool //安全校验
timeout time.Duration
scChan <-chan ServiceConfig
authority string
copts transport.ConnectOptions
callOptions []CallOption
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
channelzParentID int64
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
healthCheckFunc internal.HealthChecker
minConnectTimeout func() time.Duration
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
// This is used by ccResolverWrapper to backoff between successive calls to
// resolver.ResolveNow(). The user will have no need to configure this, but
// we need to be able to configure this in tests.
resolveNowBackoff func(int) time.Duration
resolvers []resolver.Builder
}

以上说了一堆,其实就是客户端拨号连接时,有一些默认配置,如果需要修改这些配置,就需要传递实现的了DialOption接口的类型值切片进来覆盖默认配置。gRPC包通过暴露接口的形式来实现对包内不可导出类型属性的修改。

ClientConn

整个Dial拨号调用过程中,ClientConn是最重要的类型,它是一个结构体类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc

target string //目标地址
parsedTarget resolver.Target //地址解析器
authority string
dopts dialOptions //拨号建立链接相关参数
csMgr *connectivityStateManager //链接状态管理器

balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper

mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters //长连接保活相关
curBalancerName string
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value

firstResolveEvent *grpcsync.Event

channelzID int64 // channelz unique identification number
czData *channelzData
}

也就是说,ClientCoon是一个终端(客户端)逻辑TCP链接,用来执行RPC相关业务。它封装了一些功能:命名(目标地址)解析、建立TCP连接(带重试和回退),及建立连接过程中的一些错误处理。

ClientCoon结构体包含dialOptions类型的字段,这个dialOptions结构体类型还有一个属性ConnectOptions需要我们关注下,这个结构体类型包含了所有与服务端交流是相关的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client
// connection. Only one of TransportCredentials and CredsBundle is non-nil.
TransportCredentials credentials.TransportCredentials
// CredsBundle is the credentials bundle to be used. Only one of
// TransportCredentials and CredsBundle is non-nil.
CredsBundle credentials.Bundle
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.
InitialConnWindowSize int32
// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID int64
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
}

整体调用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())

//连接相关默认配置覆盖
for _, opt := range opts {
opt.apply(&cc.dopts)
}

//拦截器相关设置(将拦截器串联起来形成一个调用链,其实就是放入cc.dopts.chainUnaryInts或cc.dopts.chainStreamInts里)
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)

defer func() {
if err != nil {
cc.Close()
}
}()
// RPC运行时信息收集工具相关处理
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
Severity: channelz.CtINFO,
},
})
} else {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
channelz.Info(cc.channelzID, "Channel Created")
}
cc.csMgr.channelzID = cc.channelzID
}

//tls连接加密证书相关检查
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
} else {
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
return nil, errCredentialsConflict
}
for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
}
}
}
// 如果提供了服务配置
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
// 设置默认服务配置
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
//keepalive相关配置
cc.mkp = cc.dopts.copts.KeepaliveParams

//如果没设置拨号函数,则使用默认的拨号函数,如果有设置代理的话,这个函数会根据系统环境变量(HTTP_PROXY或者HTTPS_PROXY)的代理设置来进行网络连接
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
)
}
//UserAgent添加“grpcUA”
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
// 如果设置了超时
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
}()

scSet := false
// 如果提供了scChan,支持对serviceConfig进行热更
if cc.dopts.scChan != nil {
// Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
scSet = true// 成功获取初始的serviceConfig
}
default:
}
}
// 提供retry时的退避算法
if cc.dopts.bs == nil {
cc.dopts.bs = backoff.DefaultExponential
}

// resolverBuilder,用于解析target为目标服务列表
// Determine the resolver to use.
cc.parsedTarget = grpcutil.ParseTarget(cc.target)
channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
}
}
//连接证书
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc.authority = cc.parsedTarget.Endpoint
}
// 如果提供了scChan但是还没有获取到初始的serviceConfig,则阻塞等待serviceConfig
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
// 启动子协程,监听scChan,进行serviceConfig的热更
if cc.dopts.scChan != nil {
go cc.scWatcher()
}

var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}

// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()

// A blocking dial blocks until the clientConn is ready.
// 默认Dial不会等待网络连接完成,如果指定了blcok,则会阻塞等待网络连接完成才返回
if cc.dopts.block {
for {
s := cc.GetState()
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
return nil, ctx.Err()
}
}
}

return cc, nil
}

所以,从以上代码可以看出,调用Dial函数实际上是调用了DialContext函数,而DialContext主要是对 ConnectCoon结构体、 dialOptions结构体(cc.dopts)及ConnectOptions结构体(cc.dopts.copts)做初始化操作。

RPC-个人碎碎念

发表于 2020-05-09 | 分类于 RPC

RPC(Remote Procedure Call),通常翻译为远程过程调用,但我觉得翻译成远程程序调用比较方便理解。调用的程序在不同的内存空间,通过网络调用,而不需要了解底层的网络协议(函数调用网络化),它假定了某些传输协议的存在,以便为通信程序建携带信息数据。

RPC也是经典的C/S模式,传送请求,接收回应

RPC调用的流程:

  1. 客户端调用client stub,本地调用,将参数push进栈
  2. clinet stub 将参数打包成一个消息,然后发送这个消息。打包过程叫 marshaling
  3. client所在的系统将消息发送给server
  4. server系统将收到的包传给server stub
  5. server stub 解包得到参数,解包过程叫unmarshaling
  6. 最后server stub调用服务程序,返回结果按照相反的步骤传给client

与RESTful的对比

传输协议:

  • RPC可基于TPC、UDP或HTTP
  • RESTful基于HTTP

操作对象

  • RPC操作的是对象与方法
  • RESTful操作的是资源

功能

client与服务端的点对点调用

  • stub
  • 通信
  • RPC消息序列化

服务治理

  • 服务发现与注销
  • 服务高可用
  • 负载均衡

从RPC的功能延伸出来,可以比较下当前的RPC框架,发现这些RPC框架主要有两个方向

  1. 偏向于跨语言调用方面
  • gRPC
  • thrift(不支持服务治理)
  • rpcx
  1. 偏向于服务治理方面
  • Alibab Dubbo
  • Motan

使用RPC,体会很深的的两个特性:

  1. 屏蔽了本地调用与远程调用的区别,我们调用别的机器上的程序方法就像调用本地方法一样。
  2. 屏蔽了网络通讯的复杂性,我们不用去处理复杂的网络编程相关的东西,能更专注于处理业务逻辑。

HTTP hijack

发表于 2020-04-25 | 分类于 Golang

今天在学习标准库net/rpc时,看rpc基于HTTP的那段内容,发现了一个有趣的地方。

RPC OVER HTTP,只是使用HTTP协议来建立连接,连接建立后,就没HTTP啥事了。这里有个术语叫hijack(劫持)。

RPC OVER HTTP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {

// new 一个 Arith 对象
arith := new(Arith)
// 将 arith 对象注册进rpc default 服务
rpc.Register(arith)
// 将rpc服务注册到HTTP协议上
rpc.HandleHTTP()

// http 负责监听端口
err := http.ListenAndServe(":1234", nil)
if err != nil {
fmt.Println(err.Error())
}
}

1
2
3
4
rpc.HandleHTTP()

--->
里面其实也是将rpc server 作为实现了 http.Handler 接口的 handler(负责处理请求),传递给http包下的缺省 servermux 路由管理器

RPC server 实现了 http.Handler 接口,也就是有ServerHTTP方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    // ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

ServerHTTP 处理了HTTP请求的业务逻辑,它首先处理HTTP的CONNECT请求,接收后就Hijack这个请求,然后将请求conn 扔给 ServerConn去处理。可以看出,net/rpc 只是利用 HTTP CONNECT 建立连接,这与普通的RESTful api 还是不一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Hijacker interface {
// Hijack lets the caller take over the connection.
// After a call to Hijack the HTTP server library
// will not do anything else with the connection.
//
// It becomes the caller's responsibility to manage
// and close the connection.
//
// The returned net.Conn may have read or write deadlines
// already set, depending on the configuration of the
// Server. It is the caller's responsibility to set
// or clear those deadlines as needed.
//
// The returned bufio.Reader may contain unprocessed buffered
// data from the client.
//
// After a call to Hijack, the original Request.Body must not
// be used. The original Request's Context remains valid and
// is not canceled until the Request's ServeHTTP method
// returns.
Hijack() (net.Conn, *bufio.ReadWriter, error)
}

调用Hijack()方法,会将HTTP对应的TCP链接取出来,取出来之后,HTTP 服务就不在管这个链接了,需要由调用方去管理了。

使用了Hijack之后,http的响应有什么不同呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
http.HandleFunc("/", handler2)
_ = http.ListenAndServe(":8008", nil)
}

func handler1(writer http.ResponseWriter, request *http.Request) {
hijacker,_ := writer.(http.Hijacker)
conn, buf, _ := hijacker.Hijack()
defer conn.Close()
_, _ = buf.WriteString("hello world")
_ = buf.Flush()
}

func handler2(writer http.ResponseWriter, request *http.Request) {
_, _ = fmt.Fprint(writer, "hello world")
}

handler1的响应

1
2
$ curl -i "http://localhost:8008/"
hello world%

handler2的响应

1
2
3
4
5
6
7
curl -i "http://localhost:8008/"
HTTP/1.1 200 OK
Date: Fri, 24 Apr 2020 15:43:26 GMT
Content-Length: 11
Content-Type: text/plain; charset=utf-8

hello world%

可以看出, Hijack 后 response header

1
2
3
4
5
6
7
8
9
10
func (c *conn) serve(ctx context.Context) {
...
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
...
}

这是net/http包中的方法,也是http路由的核心方法。调用ServeHTTP(也就是上边的handle方法)方法,如果被hijack了就直接return了,而一般的http请求会经过后边的finishRequest方法,加入headers等并关闭连接。

在Go中,Hijack方法的使用场景有两种

  1. 基于HTTP的RPC
  2. 从HTTP协议升级到WebSocket

Linux下开机自启动顺序之坑小记

发表于 2019-12-19 | 分类于 Linux

背景介绍:工控机上有个使用 superviosr 守护的程序(下称之为ck),ck 的启动依赖于 MySQL。superviosr 与 MySQL 都加入了开机自启动豪华套餐。

问题描述:一线实施人员发现某个试点项目案场的 ck 程序未能正常运行,研发排查发现是因为案场曾停过电,工控机重新启动时,ck 程序先于 MySQL 启动了。而 ck 程序的自动重启次数设置的太低(重启次数用完了 MySQL 仍未完全启动完成),导致未能正常运行。

解决方案:在 supervisor 的自启动设置上,加上后于 MySQL 启动的规则

1
2
$ cd /lib/systemd/system
$ sudo vim sudo vim supervisor.service

1
2
3
// 在After 附加上 mysql.service,以空格分隔
[Uint]
After=network.target mysql.service

macOS下golang 交叉编译失败小记

发表于 2019-12-18 | 分类于 Golang

这段时间在AIOT小组支援,需要做一个在工控机(ubuntu系统)上运行的HTTP服务,用Golang实现,使用sqlite3作为存储。

在 macOS 上编译 Linux 可执行文件时,遇到了一些坑,这里做下记录。

1
$ CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./httpServer ./

编译没有任何问题,可真正到工控机上运行的时候,报了 go-sqlite3 包未 import 的问题。

models setup fail err:sql: unknown driver “sqlite3” (forgotten import?)

查了一下,
go-sqlite3 is cgo module. so you need to install cross C compiler.
大意是go-sqlite包使用了CGO module,所以编译时要开启CGO_ENABLED,即

1
CGO_ENABLED=1

还需要安装C交叉编译器

1
$ brew install FiloSottile/musl-cross/musl-cross

这个安装耗时比较久,大概要40分钟(我的本子风扇会呼啦啦转,发热严重),但请耐心等待安装完成。

然后使用下面的命令就可以编译成功了!!!

1
CC=x86_64-linux-musl-gcc CXX=x86_64-linux-musl-g++ GOARCH=amd64 GOOS=linux CGO_ENABLED=1 go build -ldflags "-linkmode external -extldflags -static" -o ./httpServer ./

参考:

  1. Github上go-sqlite3相关issue

Golang中的Unicode、UTF-8与字节序列

发表于 2019-04-03 | 分类于 Golang

在对Go语言的字符串进行操作时,都会遇到 unicode、UTF-8和字节序列这几个词,一直对这几个名称之间的关系一知半解。今次,我们就来理清这里的关系。

Unicode定义了一个字符与一个编码的映射,但是,对应的存储却没有制定。就拿一个大写字符A来说,对应编码是0x0041,但因此存储规则没有制定,那么,如果我们用4个字节来存储,就是0x00000041。这种规定了用几个字节来存储的方式,就可以说是Unicode编码规范的具体实现。比如,UTF-8(最少用一个字节就能表示一个字符的编码实现,最多使用四个字节)和UTF-16(最少用两个字节能表示一个字符的编码实现)。

Go语言中的string类型值由若干个Unicode字符(也叫Unicode码点)组成,每个Unicode字符都可以由一个rune类型的值来承载。这些字符在底层都会转换为UTF-8编码值,而UTF-8编码值又会以字节序列的形式来表达和存储,因此,一个string类型的值在底层就是一个能够表达若干个UTF-8编码值的字节序列。

我们常使用for …range 语句对string进行遍历,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import "fmt"

func main() {
str := "UTF 码点 "
for i, c := range str {
fmt.Printf("%d: %q [% x]\n", i, c, []byte(string(c)))
}
}

/*
索引 Unicode码点 字节序列(字符存储使用的是UTF-8编码,至少使用一个字节,最多四个字节,英文一个字节,中文三个字节)
0: 'U' [55]
1: 'T' [54]
2: 'F' [46]
3: ' ' [20]
4: '码' [e7 a0 81]
7: '点' [e7 82 b9]
10: ' ' [20]
*/

这段代码,可以看出,for语句逐一迭代的是字符串里的每个Unicode字符,字符串的底层实现是字节序列,使用UTF-8编码,根据Unicode字符的不同而选择不同的字节长度来存储这个字符。所以说,迭代的时候,就是在字节序列里,去迭代每一个UTF-8编码值,也就是每一个Unicode字符。

总结:字符串string就是一个个Unicode字符组成的,底层使用UTF-8编码格式来存储Unicode字符(UTF-8编码方案是Unicode编码规范的一种具体实现,使用至少一个字节来存储Unicode字符,最多四个字节)。

12
BansheeLW

BansheeLW

太阳落山了,我是你的渔船,你的锚

20 日志
7 分类
15 标签
© 2021 BansheeLW
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4