gRPC-go 連接管理
(金慶的專欄 2017.12)
把 example greeter 改一下,處理 SayHello() 請求時,不僅僅返回本次請求者的名字,
還返回上次請求的名字,如:
```
λ go run greeter_client/main.go
2017/12/25 17:59:13 Greeting: Hello 'world' (prev '')
2017/12/25 17:59:15 Greeting: Hello 'world2' (prev 'world')
```
先將客戶端單次請求改為多次請求:
```go
r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
log.Printf("Greeting: %s", r.Message)
time.Sleep(2 * time.Second)
r, err = c.SayHello(context.Background(), &pb.HelloRequest{Name: name + "2"})
log.Printf("Greeting: %s", r.Message)
...
```
服務器需要為每個連接保存各自的數據。連接創建時初始化數據,連接斷開時清理數據。
這里利用了連接統計的接口,不知道是否是最適當的實現方式?
服務器創建時添加 StatsHandler 選項,輸入一個 stats.Handler 的實現。
```
- s := grpc.NewServer()
+ s := grpc.NewServer(grpc.StatsHandler(&statshandler{}))
```
statshandler 需實現4個方法,只用到2個連接相關的方法,TagConn() 和 HandleConn(),
另外2個 TagRPC() 和 HandleRPC() 用于RPC統計, 實現為空。
```go
type statshandler struct{}
// TagConn 用來給連接打個標簽,以此來標識連接(實在是找不出還有什么辦法來標識連接).
// 這個標簽是個指針,可保證每個連接唯一。
// 將該指針添加到上下文中去,鍵為 connCtxKey{}.
func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, connCtxKey{}, info)
}
// TagRPC 為空.
func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
// HandleConn 會在連接開始和結束時被調用,分別會輸入不同的狀態.
func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
tag, ok := getConnTagFromContext(ctx)
if !ok {
log.Fatal("can not get conn tag")
}
connsMutex.Lock()
defer connsMutex.Unlock()
switch s.(type) {
case *stats.ConnBegin:
conns[tag] = ""
log.Printf("begin conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns))
case *stats.ConnEnd:
delete(conns, tag)
log.Printf("end conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns))
default:
log.Printf("illegal ConnStats type\n")
}
}
// HandleRPC 為空.
func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
}
```
用一個map來管理所有連接,以連接的標簽(是個指針)為鍵,值為上次請求者的名字。
因為有多線程訪問,所有加個 Mutex 來保護。
連接結束時,將從 conns 中刪除連接相關的數據。
```go
var connsMutex sync.Mutex
var conns map[*stats.ConnTagInfo]string = make(map[*stats.ConnTagInfo]string)
```
getConnTagFromContext() 從上下文中取連接標簽:
```go
type connCtxKey struct{}
func getConnTagFromContext(ctx context.Context) (*stats.ConnTagInfo, bool) {
tag, ok := ctx.Value(connCtxKey{}).(*stats.ConnTagInfo)
return tag, ok
}
```
最后將 SayHello() 改為記錄請求者名字,并返回上次請求者的名字。
```go
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
tag, _ := getConnTagFromContext(ctx)
log.Printf("SayHello(), conn tag = (%p)%#v\n", tag, tag)
connsMutex.Lock()
defer connsMutex.Unlock()
prev := conns[tag]
conns[tag] = in.Name
return &pb.HelloReply{Message: fmt.Sprintf("Hello '%s' (prev '%s')", in.Name, prev)}, nil
}
```
測試多個客戶端連接,可以看到每個客戶端有自己的狀態,互不影響。
```
E:\Git\grpc-go\examples\helloworld (master)
λ go run greeter_server/main.go
2017/12/25 18:39:03 start
2017/12/25 18:39:11 begin conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1
2017/12/25 18:39:11 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:13 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:13 begin conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 2
2017/12/25 18:39:13 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:15 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:15 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:17 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:17 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:19 end conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1
2017/12/25 18:39:19 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:21 end conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 0
```
(金慶的專欄 2017.12)
把 example greeter 改一下,處理 SayHello() 請求時,不僅僅返回本次請求者的名字,
還返回上次請求的名字,如:
```
λ go run greeter_client/main.go
2017/12/25 17:59:13 Greeting: Hello 'world' (prev '')
2017/12/25 17:59:15 Greeting: Hello 'world2' (prev 'world')
```
先將客戶端單次請求改為多次請求:
```go
r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
log.Printf("Greeting: %s", r.Message)
time.Sleep(2 * time.Second)
r, err = c.SayHello(context.Background(), &pb.HelloRequest{Name: name + "2"})
log.Printf("Greeting: %s", r.Message)
...
```
服務器需要為每個連接保存各自的數據。連接創建時初始化數據,連接斷開時清理數據。
這里利用了連接統計的接口,不知道是否是最適當的實現方式?
服務器創建時添加 StatsHandler 選項,輸入一個 stats.Handler 的實現。
```
- s := grpc.NewServer()
+ s := grpc.NewServer(grpc.StatsHandler(&statshandler{}))
```
statshandler 需實現4個方法,只用到2個連接相關的方法,TagConn() 和 HandleConn(),
另外2個 TagRPC() 和 HandleRPC() 用于RPC統計, 實現為空。
```go
type statshandler struct{}
// TagConn 用來給連接打個標簽,以此來標識連接(實在是找不出還有什么辦法來標識連接).
// 這個標簽是個指針,可保證每個連接唯一。
// 將該指針添加到上下文中去,鍵為 connCtxKey{}.
func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, connCtxKey{}, info)
}
// TagRPC 為空.
func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
// HandleConn 會在連接開始和結束時被調用,分別會輸入不同的狀態.
func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
tag, ok := getConnTagFromContext(ctx)
if !ok {
log.Fatal("can not get conn tag")
}
connsMutex.Lock()
defer connsMutex.Unlock()
switch s.(type) {
case *stats.ConnBegin:
conns[tag] = ""
log.Printf("begin conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns))
case *stats.ConnEnd:
delete(conns, tag)
log.Printf("end conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns))
default:
log.Printf("illegal ConnStats type\n")
}
}
// HandleRPC 為空.
func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
}
```
用一個map來管理所有連接,以連接的標簽(是個指針)為鍵,值為上次請求者的名字。
因為有多線程訪問,所有加個 Mutex 來保護。
連接結束時,將從 conns 中刪除連接相關的數據。
```go
var connsMutex sync.Mutex
var conns map[*stats.ConnTagInfo]string = make(map[*stats.ConnTagInfo]string)
```
getConnTagFromContext() 從上下文中取連接標簽:
```go
type connCtxKey struct{}
func getConnTagFromContext(ctx context.Context) (*stats.ConnTagInfo, bool) {
tag, ok := ctx.Value(connCtxKey{}).(*stats.ConnTagInfo)
return tag, ok
}
```
最后將 SayHello() 改為記錄請求者名字,并返回上次請求者的名字。
```go
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
tag, _ := getConnTagFromContext(ctx)
log.Printf("SayHello(), conn tag = (%p)%#v\n", tag, tag)
connsMutex.Lock()
defer connsMutex.Unlock()
prev := conns[tag]
conns[tag] = in.Name
return &pb.HelloReply{Message: fmt.Sprintf("Hello '%s' (prev '%s')", in.Name, prev)}, nil
}
```
測試多個客戶端連接,可以看到每個客戶端有自己的狀態,互不影響。
```
E:\Git\grpc-go\examples\helloworld (master)
λ go run greeter_server/main.go
2017/12/25 18:39:03 start
2017/12/25 18:39:11 begin conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1
2017/12/25 18:39:11 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:13 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:13 begin conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 2
2017/12/25 18:39:13 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:15 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:15 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:17 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:17 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:19 end conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1
2017/12/25 18:39:19 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:21 end conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 0
```