从上一篇入门:https://www.360us.net/article/86.html我们知道用OpenTelemetry实现应用的可观测性需要三个部分:
exporter:负责遥测数据输出,可以输出到控制台,文件,后端存储或者中间的收集节点服务器。instrumentation:这个部分就是产生追踪数据,也就是创建span。TracerProvider:扮演了中间角色,把生成的遥测数据输出到exporter。
目前Go是不支持自动追踪的,一些公共库可以在这里https://opentelemetry.io/ecosystem/registry/?language=go 找到封装好的追踪代码。
比如otelhttp是对net/http的包装,还有gin,gRPC的等等,自己的私有库、包、或者函数就需要自己手动添加代码了。
本文的内容是实现在多个服务之间的追踪。
创建三个服务,分别是main、as sevice和bs service,main作为外部入口,调用as和bs,或者as,bs互相调用。
在测试项目根目录添加文件main.go作为main服务:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
)
// serviceId
const serviceId = "HelloService"
var l *log.Logger
func main() {
l = log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
errCh := make(chan error)
ctx := context.Background()
go runApp()
select {
case <-sigCh:
l.Println("\ngoodbye")
return
case err := <-errCh:
if err != nil {
l.Fatal(err)
}
}
}
// runApp 启动http服务,设置了一个handler
func runApp() {
http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello")
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
功能是定义个服务ID,添加了一个/hello路由,然后监听8080端口。
然后定义as服务,创建一个as目录,添加文件main.go:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
)
// serviceId
const serviceId = "AsService"
func main() {
l := log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go runApp()
select {
case <-sigCh:
l.Println("\ngoodbye")
return
case err := <-errCh:
if err != nil {
l.Fatal(err)
}
}
}
func HelloHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello as service")
}
func runApp() {
http.Handle("/hello", http.HandlerFunc(HelloHandler))
log.Fatal(http.ListenAndServe(":8088", nil))
}
定义服务AsService,监听8088端口。
最后定义bs服务,新建目录bs,添加文件main.go:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
)
// serviceId
const serviceId = "BsService"
func main() {
l := log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go runApp()
select {
case <-sigCh:
l.Println("\ngoodbye")
return
case err := <-errCh:
if err != nil {
l.Fatal(err)
}
}
}
func HelloHandler(w http.ResponseWriter, r *http.Request) {
log.Println("bs service")
fmt.Fprintf(w, "hello bs service")
}
func runApp() {
http.Handle("/hello", http.HandlerFunc(HelloHandler))
log.Fatal(http.ListenAndServe(":8089", nil))
}
BsService监听在8089端口。
添加追踪代码
这一步是在三个服务器里面都添加上追踪代码。
main服务,修改main.go:
import (
// ............
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
// ........
// serviceId
const serviceId = "HelloService"
// tracer Tracer
var tracer trace.Tracer
// newExport create export
func newExporter(url string) (sdktrace.SpanExporter, error) {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
return exp, nil
}
// newTracerProvider return TracerProvider
func newTracerProvider(exp sdktrace.SpanExporter) *sdktrace.TracerProvider {
// Ensure default SDK resources and the required service name are set.
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceId),
),
)
if err != nil {
panic(err)
}
return sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(r),
)
}
var l *log.Logger
// func main() {
// .......
newExporter函数创建了一个jaeger exporter,url参数是jaeger的数据输入地址。
newTracerProvider函数创建TracerProvider。
然后在main里面注册TracerProvider:
// .......
l = log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
errCh := make(chan error)
ctx := context.Background()
exp, err := newExporter("http://127.0.0.1:14268/api/traces")
if err != nil {
log.Fatalf("failed to initialize exporter: %v", err)
}
tp := newTracerProvider(exp)
// Handle shutdown properly so nothing leaks.
defer func() { _ = tp.Shutdown(ctx) }()
otel.SetTracerProvider(tp)
tracer = tp.Tracer(serviceId)
go runApp()
// .........
最后添加追踪代码到handler里面去:
//.......
// 添加追踪代码
ctx, span := tracer.Start(context.Background(), "hello")
defer span.End()
span.SetAttributes(attribute.String("path", "/hello"))
fmt.Fprintf(w, "hello")
//.......
as和bs服务也都做同样的操作,之后三个服务都具有了独立的追踪功能了。
手动运行下,可以到jaegerUI界面查看一下数据是否是正常的。
上下文传播
这一步实现服务之间的跨服务追踪,多个服务组成一条链路。
注册Propagator传播者,在mian函数里面添加如下代码:
// ........
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{}) //添加的代码
tracer = tp.Tracer(serviceId)
// ........
需要导入go.opentelemetry.io/otel/propagation。
三个服务都需要添加这一行代码。
本例使用的是http协议测试服务间调用,新增一个包装的http客户端包。
新增目录httpclient,在里面新建文件client.go:
package httpclient
import (
"context"
"errors"
"io"
"net/http"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
const (
ServiceNameAs = "as"
ServiceNameBs = "bs"
)
var host = map[string]string{
"as": "http://127.0.0.1:8088",
"bs": "http://127.0.0.1:8089",
}
var client http.Client
func init() {
client = http.Client{
Transport: otelhttp.NewTransport(nil),
Timeout: time.Second * 2,
}
}
// Get http get请求服务接口
func Get(ctx context.Context, serviceName string, path string) ([]byte, error) {
if _, ok := host[serviceName]; !ok {
return nil, errors.New("服务不存在")
}
url := host[serviceName] + path
//..... 直接Get
//otelhttp.DefaultClient.Timeout = time.Second * 2
//resp, err := otelhttp.Get(ctx, url)
//.... 自定义client对象
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
用go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp包装了原生的net/http。
添加main调用as的代码,runApp:
// ......
data, err := httpclient.Get(ctx, httpclient.ServiceNameAs, "/hello")
if err != nil {
log.Println("main get error:", err)
} else {
log.Println("main get data:", string(data))
}
fmt.Fprintf(w, "hello")
// ......
as添加调用bs服务的代码, HelloHandler:
// .......
data, err := httpclient.Get(ctx, httpclient.ServiceNameBs, "/hello")
if err != nil {
log.Println("as get error:", err)
} else {
log.Println("as get data:", string(data))
}
fmt.Fprintf(w, "hello as service")
// .....
再修改as和bs的runApp方法:
// as
http.Handle("/hello", otelhttp.NewHandler(
http.HandlerFunc(HelloHandler), "bs service hello",
))
//bs
http.Handle("/hello", otelhttp.NewHandler(
http.HandlerFunc(HelloHandler), "as service hello",
))
因为as和bs需要被调用,所以注册handler的时候需要用otelhttp包装一下,才能获取到传播的上下文信息。
至此上下文传播的代码就添加完了,分别运行三个服务,然后访问http://127.0.0.1:8080/hello,输出hello。
查看jaeger,效果如下图:

上下文传播除了传播追踪所需信息外,还可以使用下面的Baggage传输自定义的数据。
Baggage
Baggage可以传播自定义的键值数据。
修改三个服务注册Propagator处的代码,main函数:
// .....
otel.SetTracerProvider(tp)
//otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{}, // 用这个可以传自定义的值到其他服务
))
tracer = tp.Tracer(serviceId)
// ....
比如我在main服务传递memberId=234324的数据到as服务,as服务获取到main的memberId并输出。
修改main服务的/hello handler:
// 传自定义值
// .....
member, _ := baggage.NewMember("memberId", "234324")
bgg, _ := baggage.New(member)
data, err := httpclient.Get(baggage.ContextWithBaggage(ctx, bgg), httpclient.ServiceNameAs, "/hello")
//data, err := httpclient.Get(ctx, httpclient.ServiceNameAs, "/hello")
// .......
在as服务的HelloHandler获取值:
// ......
// 获取自定义值
bagg := baggage.FromContext(ctx)
log.Println(bagg.String(), bagg.Member("memberId").Value())
data, err := httpclient.Get(ctx, httpclient.ServiceNameBs, "/hello")
// .....
运行,可以在as的控制台可以看到memeberId的值输出。
总结
跨服务追踪的关键是propagation包,他实现了把trace id注入到http头和从http头取回trace id的功能。
完整代码:https://github.com/ilaziness/gopkg/tree/main/opentelemetry/advanced
参考文章
https://opentelemetry.io/docs/instrumentation/go/libraries/
https://opentelemetry.io/docs/instrumentation/go/manual/#propagators-and-context
本文链接:https://360us.net/article/87.html