从上一篇入门:https://www.360us.net/article/86.html我们知道用OpenTelemetry实现应用的可观测性需要三个部分:
exporter
:负责遥测数据输出,可以输出到控制台,文件,后端存储或者中间的收集节点服务器。instrumentation
:这个部分就是产生追踪数据,也就是创建span
。TracerProvider
:扮演了中间角色,把生成的遥测数据输出到exporter
。
目前Go是不支持自动追踪的,一些公共库可以在这里https://opentelemetry.io/ecosystem/registry/?language=go 找到封装好的追踪代码。
比如otelhttp
是对net/http
的包装,还有gin
,gRPC
的等等,自己的私有库、包、或者函数就需要自己手动添加代码了。
本文的内容是实现在多个服务之间的追踪。
创建三个服务,分别是main
、as sevice
和bs service
,main
作为外部入口,调用as
和bs
,或者as
,bs
互相调用。
在测试项目根目录添加文件main.go
作为main
服务:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
)
// serviceId
const serviceId = "HelloService"
var l *log.Logger
func main() {
l = log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
errCh := make(chan error)
ctx := context.Background()
go runApp()
select {
case <-sigCh:
l.Println("\ngoodbye")
return
case err := <-errCh:
if err != nil {
l.Fatal(err)
}
}
}
// runApp 启动http服务,设置了一个handler
func runApp() {
http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello")
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
功能是定义个服务ID,添加了一个/hello
路由,然后监听8080
端口。
然后定义as
服务,创建一个as
目录,添加文件main.go
:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
)
// serviceId
const serviceId = "AsService"
func main() {
l := log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go runApp()
select {
case <-sigCh:
l.Println("\ngoodbye")
return
case err := <-errCh:
if err != nil {
l.Fatal(err)
}
}
}
func HelloHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello as service")
}
func runApp() {
http.Handle("/hello", http.HandlerFunc(HelloHandler))
log.Fatal(http.ListenAndServe(":8088", nil))
}
定义服务AsService
,监听8088
端口。
最后定义bs
服务,新建目录bs
,添加文件main.go
:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
)
// serviceId
const serviceId = "BsService"
func main() {
l := log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go runApp()
select {
case <-sigCh:
l.Println("\ngoodbye")
return
case err := <-errCh:
if err != nil {
l.Fatal(err)
}
}
}
func HelloHandler(w http.ResponseWriter, r *http.Request) {
log.Println("bs service")
fmt.Fprintf(w, "hello bs service")
}
func runApp() {
http.Handle("/hello", http.HandlerFunc(HelloHandler))
log.Fatal(http.ListenAndServe(":8089", nil))
}
BsService
监听在8089
端口。
添加追踪代码
这一步是在三个服务器里面都添加上追踪代码。
main
服务,修改main.go
:
import (
// ............
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
// ........
// serviceId
const serviceId = "HelloService"
// tracer Tracer
var tracer trace.Tracer
// newExport create export
func newExporter(url string) (sdktrace.SpanExporter, error) {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
return exp, nil
}
// newTracerProvider return TracerProvider
func newTracerProvider(exp sdktrace.SpanExporter) *sdktrace.TracerProvider {
// Ensure default SDK resources and the required service name are set.
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceId),
),
)
if err != nil {
panic(err)
}
return sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(r),
)
}
var l *log.Logger
// func main() {
// .......
newExporter
函数创建了一个jaeger
exporter
,url
参数是jaeger
的数据输入地址。
newTracerProvider
函数创建TracerProvider
。
然后在main
里面注册TracerProvider
:
// .......
l = log.New(os.Stdout, "", 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
errCh := make(chan error)
ctx := context.Background()
exp, err := newExporter("http://127.0.0.1:14268/api/traces")
if err != nil {
log.Fatalf("failed to initialize exporter: %v", err)
}
tp := newTracerProvider(exp)
// Handle shutdown properly so nothing leaks.
defer func() { _ = tp.Shutdown(ctx) }()
otel.SetTracerProvider(tp)
tracer = tp.Tracer(serviceId)
go runApp()
// .........
最后添加追踪代码到handler里面去:
//.......
// 添加追踪代码
ctx, span := tracer.Start(context.Background(), "hello")
defer span.End()
span.SetAttributes(attribute.String("path", "/hello"))
fmt.Fprintf(w, "hello")
//.......
as
和bs
服务也都做同样的操作,之后三个服务都具有了独立的追踪功能了。
手动运行下,可以到jaeger
UI界面查看一下数据是否是正常的。
上下文传播
这一步实现服务之间的跨服务追踪,多个服务组成一条链路。
注册Propagator
传播者,在mian
函数里面添加如下代码:
// ........
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{}) //添加的代码
tracer = tp.Tracer(serviceId)
// ........
需要导入go.opentelemetry.io/otel/propagation
。
三个服务都需要添加这一行代码。
本例使用的是http
协议测试服务间调用,新增一个包装的http
客户端包。
新增目录httpclient
,在里面新建文件client.go
:
package httpclient
import (
"context"
"errors"
"io"
"net/http"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
const (
ServiceNameAs = "as"
ServiceNameBs = "bs"
)
var host = map[string]string{
"as": "http://127.0.0.1:8088",
"bs": "http://127.0.0.1:8089",
}
var client http.Client
func init() {
client = http.Client{
Transport: otelhttp.NewTransport(nil),
Timeout: time.Second * 2,
}
}
// Get http get请求服务接口
func Get(ctx context.Context, serviceName string, path string) ([]byte, error) {
if _, ok := host[serviceName]; !ok {
return nil, errors.New("服务不存在")
}
url := host[serviceName] + path
//..... 直接Get
//otelhttp.DefaultClient.Timeout = time.Second * 2
//resp, err := otelhttp.Get(ctx, url)
//.... 自定义client对象
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
用go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
包装了原生的net/http
。
添加main
调用as
的代码,runApp
:
// ......
data, err := httpclient.Get(ctx, httpclient.ServiceNameAs, "/hello")
if err != nil {
log.Println("main get error:", err)
} else {
log.Println("main get data:", string(data))
}
fmt.Fprintf(w, "hello")
// ......
as
添加调用bs
服务的代码, HelloHandler
:
// .......
data, err := httpclient.Get(ctx, httpclient.ServiceNameBs, "/hello")
if err != nil {
log.Println("as get error:", err)
} else {
log.Println("as get data:", string(data))
}
fmt.Fprintf(w, "hello as service")
// .....
再修改as
和bs
的runApp
方法:
// as
http.Handle("/hello", otelhttp.NewHandler(
http.HandlerFunc(HelloHandler), "bs service hello",
))
//bs
http.Handle("/hello", otelhttp.NewHandler(
http.HandlerFunc(HelloHandler), "as service hello",
))
因为as
和bs
需要被调用,所以注册handler的时候需要用otelhttp
包装一下,才能获取到传播的上下文信息。
至此上下文传播的代码就添加完了,分别运行三个服务,然后访问http://127.0.0.1:8080/hello
,输出hello
。
查看jaeger
,效果如下图:
上下文传播除了传播追踪所需信息外,还可以使用下面的Baggage
传输自定义的数据。
Baggage
Baggage
可以传播自定义的键值数据。
修改三个服务注册Propagator
处的代码,main
函数:
// .....
otel.SetTracerProvider(tp)
//otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{}, // 用这个可以传自定义的值到其他服务
))
tracer = tp.Tracer(serviceId)
// ....
比如我在main
服务传递memberId=234324
的数据到as
服务,as
服务获取到main
的memberId
并输出。
修改main
服务的/hello
handler:
// 传自定义值
// .....
member, _ := baggage.NewMember("memberId", "234324")
bgg, _ := baggage.New(member)
data, err := httpclient.Get(baggage.ContextWithBaggage(ctx, bgg), httpclient.ServiceNameAs, "/hello")
//data, err := httpclient.Get(ctx, httpclient.ServiceNameAs, "/hello")
// .......
在as
服务的HelloHandler
获取值:
// ......
// 获取自定义值
bagg := baggage.FromContext(ctx)
log.Println(bagg.String(), bagg.Member("memberId").Value())
data, err := httpclient.Get(ctx, httpclient.ServiceNameBs, "/hello")
// .....
运行,可以在as
的控制台可以看到memeberId
的值输出。
总结
跨服务追踪的关键是propagation
包,他实现了把trace id
注入到http头和从http头取回trace id
的功能。
完整代码:https://github.com/ilaziness/gopkg/tree/main/opentelemetry/advanced
参考文章
https://opentelemetry.io/docs/instrumentation/go/libraries/
https://opentelemetry.io/docs/instrumentation/go/manual/#propagators-and-context
本文链接:https://360us.net/article/87.html