上一篇文章用的是http协议做服务间的调用协议,这篇改成gRPC。
首先安装包otelgrpc
:
go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc
创建一个api
目录,创建rpc.proto
文件:
syntax = "proto3";
package api;
// advanced目录执行编译: protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative api/rpc.proto
option go_package ="github.com/ilaziness/gopkg/opentelemetry/as/api";
service AsRpc {
rpc Test(Req) returns (Resp) {}
}
service BsRpc {
rpc Test(Req) returns (Resp) {}
}
message Req {
string a = 1;
int32 b = 2;
}
message Resp {
string s = 1;
}
给as
和bs
服务都定义了一个Test
方法。
定义好后生成代码。
然后在被调用端加上rpc服务监听和服务实现的代码。
as
:
// runRpcServer 启动RPC服务
func runRpcServer() {
lis, err := net.Listen("tcp", ":7000")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)
api.RegisterAsRpcServer(s, &AsRpc{})
log.Println("rpc server listening at port", "7000")
if err := s.Serve(lis); err != nil {
log.Fatal(err)
}
}
type AsRpc struct {
api.UnimplementedAsRpcServer
}
// Test test rpc服务接口
func (a *AsRpc) Test(ctx context.Context, req *api.Req) (*api.Resp, error) {
ctx, span := tracer.Start(ctx, "as rpc Test")
defer span.End()
log.Println("as rpc server receive data:", req.GetA(), req.GetB())
return &api.Resp{S: "this is as rpc server response"}, nil
}
实现了as
服务器的Test
接口,然后启动服务在7000
端口。
在grpc.NewServer
方法要加上otelgrpc
的拦截器,这样才能收到传播的追踪上下文信息。
bs
:
// runRpcServer 启动RPC服务
func runRpcServer() {
lis, err := net.Listen("tcp", ":7001")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)
api.RegisterBsRpcServer(s, &BsRpc{})
log.Println("rpc server listening at port", "7000")
if err := s.Serve(lis); err != nil {
log.Fatal(err)
}
}
type BsRpc struct {
api.UnimplementedBsRpcServer
}
// Test test rpc服务接口
func (a *BsRpc) Test(ctx context.Context, req *api.Req) (*api.Resp, error) {
_, span := tracer.Start(ctx, "bs rpc Test")
defer span.End()
log.Println("bs rpc server receive data:", req.GetA(), req.GetB())
return &api.Resp{S: "this is bs rpc server response"}, nil
}
和上面as
一样,bs
监听在7001
端口。
然后是实现客户端调用,本例的调用关系是mian
-> as
-> bs
。
定义两个函数:
// CallAsTest 调用as服务的test
func CallAsTest(ctx context.Context, c api.AsRpcClient) {
resp, err := c.Test(ctx, &api.Req{A: "a", B: 34})
if err != nil {
log.Fatal("call as test error:", err)
}
log.Println("call as response:", resp.S)
}
// CallBsTest 调用bs服务的test
func CallBsTest(ctx context.Context, c api.BsRpcClient) {
resp, err := c.Test(ctx, &api.Req{A: "b", B: 35})
if err != nil {
log.Fatal("call bs test error:", err)
}
log.Println("call bs response:", resp.S)
}
上面函数的功能是用grpc
分别调用as
和bs
的Test
。
然后在main
添加调用as
的代码, runApp
:
// ......
// rpc client
addr := "127.0.0.1:7000"
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
rpcclient := api.NewAsRpcClient(conn)
_, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
//......
// 新增一条rpc的span
ctx, span = tracer.Start(ctx, "main hello call as")
defer span.End()
// rpc call 调用as服务的Test
rpcall.CallAsTest(ctx, rpcclient)
fmt.Fprintf(w, "hello")
}
//.....
创建了客户端对象,在/hello
handler里面调用了CallAsTest
函数。
添加as
调用bs
的Test
代码:
func runApp() {
// rpc client
addr := "127.0.0.1:7001"
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
rpcClient = api.NewBsRpcClient(conn)
_, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// .......
}
var rpcClient api.BsRpcClient
// ....
// Test test rpc服务接口
func (a *AsRpc) Test(ctx context.Context, req *api.Req) (*api.Resp, error) {
ctx, span := tracer.Start(ctx, "as rpc Test")
defer span.End()
log.Println("as rpc server receive data:", req.GetA(), req.GetB())
// rpc call 调用Bs服务的Test
rpcall.CallBsTest(ctx, rpcClient)
return &api.Resp{S: "this is as rpc server response"}, nil
}
创建客户端对象,在Test
服务方法里面添加了调用bs
的函数CallBsTest
。
到这里三个服务应该都是可以正常运行,并且可以互相通过gRPC
调用了。
分别运行三个服务,浏览器访问http://127.0.0.1:8080/hello
,正常输出一个hello
字符串。
打开jaeger UI
查看追踪记录,效果如下:
有两条链路,http的是http协议的调用链路,rpc的是gRPC
的调用链路。
从图上可以看到,rpc链路在客户端和服务的端都会生成一条数据,而http
只会在发起端记录一条http
请求的记录。
总结
gRPC
的链路追踪是在服务端NewServer
和客户端Dial
时注册otelgrpc
的拦截器。
客户端调用时把span
的context
做为具体rpc
客户端方法的context
参数。
服务端对应方法取出context
,用来继续添加span
就可以连上了。
完整代码参考:https://github.com/ilaziness/gopkg/tree/main/opentelemetry/advanced
本来链接:https://360us.net/article/88.html