improve tracing feature

This commit is contained in:
jianchenma
2021-01-26 16:06:20 +08:00
parent 52c17dfce0
commit c72c7dbe9a
13 changed files with 235 additions and 88 deletions

View File

@ -13,7 +13,7 @@ import (
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpClient"
ServiceName = "tracing-http-client"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.

View File

@ -11,7 +11,7 @@ import (
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpServer"
ServiceName = "tracing-http-server"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.

View File

@ -10,6 +10,27 @@ import (
type tracingApi struct{}
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "tracing-demo-db"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create and install Jaeger export pipeline.
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint(JaegerEndpoint),
jaeger.WithProcess(jaeger.Process{
ServiceName: ServiceName,
}),
jaeger.WithSDK(&sdkTrace.Config{DefaultSampler: sdkTrace.AlwaysSample()}),
)
if err != nil {
g.Log().Fatal(err)
}
return flush
}
func (api *tracingApi) Insert(r *ghttp.Request) {
result, err := g.Table("user").Ctx(r.Context()).Insert(g.Map{
"name": r.GetString("name"),
@ -29,27 +50,6 @@ func (api *tracingApi) Query(r *ghttp.Request) {
r.Response.Write("user:", one)
}
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpServerWithDB"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create and install Jaeger export pipeline.
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint(JaegerEndpoint),
jaeger.WithProcess(jaeger.Process{
ServiceName: ServiceName,
}),
jaeger.WithSDK(&sdkTrace.Config{DefaultSampler: sdkTrace.AlwaysSample()}),
)
if err != nil {
g.Log().Fatal(err)
}
return flush
}
func main() {
flush := initTracer()
defer flush()

View File

@ -12,7 +12,7 @@ type tracingApi struct{}
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpServerWithRedis"
ServiceName = "tracing-demo-redis"
)
func (api *tracingApi) Set(r *ghttp.Request) {

View File

@ -0,0 +1,71 @@
package main
import (
"context"
"github.com/gogf/gf/frame/g"
"github.com/gogf/gf/net/ghttp"
"github.com/gogf/gf/net/gtrace"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
)
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "tracing-http-client"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create and install Jaeger export pipeline.
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint(JaegerEndpoint),
jaeger.WithProcess(jaeger.Process{
ServiceName: ServiceName,
}),
jaeger.WithSDK(&sdkTrace.Config{DefaultSampler: sdkTrace.AlwaysSample()}),
)
if err != nil {
g.Log().Fatal(err)
}
return flush
}
func main() {
flush := initTracer()
defer flush()
ctx, span := gtrace.Tracer().Start(context.Background(), "root")
defer span.End()
client := g.Client().Use(ghttp.MiddlewareClientTracing)
//client := g.Client()
// Add user info.
idStr := client.Ctx(ctx).PostContent(
"http://127.0.0.1:8199/user/insert",
g.Map{
"name": "john",
},
)
if idStr == "" {
g.Log().Ctx(ctx).Fatal("retrieve empty id string")
}
g.Log().Ctx(ctx).Print("insert:", idStr)
// Query user info.
userJson := client.Ctx(ctx).GetContent(
"http://127.0.0.1:8199/user/query",
g.Map{
"id": idStr,
},
)
g.Log().Ctx(ctx).Print("query:", idStr, userJson)
// Delete user info.
deleteResult := client.Ctx(ctx).PostContent(
"http://127.0.0.1:8199/user/delete",
g.Map{
"id": idStr,
},
)
g.Log().Ctx(ctx).Print("delete:", idStr, deleteResult)
}

View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"github.com/gogf/gcache-adapter/adapter"
"github.com/gogf/gf/errors/gerror"
"github.com/gogf/gf/frame/g"
@ -14,7 +15,7 @@ type tracingApi struct{}
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpServerWithDBRedisLog"
ServiceName = "tracing-http-server"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.
@ -33,26 +34,76 @@ func initTracer() func() {
return flush
}
type userApiInsert struct {
Name string `v:"required#Please input user name."`
}
// Insert is a route handler for inserting user info into dtabase.
func (api *tracingApi) Insert(r *ghttp.Request) {
var (
dataReq *userApiInsert
)
if err := r.Parse(&dataReq); err != nil {
r.Response.WriteExit(gerror.Current(err))
}
result, err := g.Table("user").Ctx(r.Context()).Insert(g.Map{
"name": r.GetString("name"),
"name": dataReq.Name,
})
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
id, _ := result.LastInsertId()
r.Response.Write("id:", id)
r.Response.Write(id)
}
type userApiQuery struct {
Id int `v:"min:1#User id is required for querying."`
}
// Query is a route handler for querying user info. It firstly retrieves the info from redis,
// if there's nothing in the redis, it then does db select.
func (api *tracingApi) Query(r *ghttp.Request) {
var (
dataReq *userApiQuery
)
if err := r.Parse(&dataReq); err != nil {
r.Response.WriteExit(gerror.Current(err))
}
one, err := g.Table("user").
Ctx(r.Context()).
Cache(5 * time.Second).
FindOne(r.GetInt("id"))
Cache(5*time.Second, api.userCacheKey(dataReq.Id)).
FindOne(dataReq.Id)
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write("user:", one)
r.Response.WriteJson(one)
}
type userApiDelete struct {
Id int `v:"min:1#User id is required for deleting."`
}
// Delete is a route handler for deleting specified user info.
func (api *tracingApi) Delete(r *ghttp.Request) {
var (
dataReq *userApiDelete
)
if err := r.Parse(&dataReq); err != nil {
r.Response.WriteExit(gerror.Current(err))
}
_, err := g.Table("user").
Ctx(r.Context()).
Cache(-1, api.userCacheKey(dataReq.Id)).
WherePri(dataReq.Id).
Delete()
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write("ok")
}
func (api *tracingApi) userCacheKey(id int) string {
return fmt.Sprintf(`userInfo:%d`, id)
}
func main() {

View File

@ -11,13 +11,8 @@ import (
"context"
"database/sql"
"fmt"
"github.com/gogf/gf"
"github.com/gogf/gf/errors/gerror"
"github.com/gogf/gf/text/gstr"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/trace"
"reflect"
"strings"
@ -146,52 +141,6 @@ func (c *Core) DoQuery(link Link, sql string, args ...interface{}) (rows *sql.Ro
return nil, err
}
func (c *Core) addSqlToTracing(ctx context.Context, sql *Sql) {
if ctx == nil {
return
}
spanCtx := trace.SpanContextFromContext(ctx)
if traceId := spanCtx.TraceID; !traceId.IsValid() {
return
}
tr := otel.GetTracerProvider().Tracer(
"github.com/gogf/gf/database/gdb",
trace.WithInstrumentationVersion(fmt.Sprintf(`%s`, gf.VERSION)),
)
ctx, span := tr.Start(ctx, sql.Type)
defer span.End()
if sql.Error != nil {
span.SetStatus(codes.Error, fmt.Sprintf(`%+v`, sql.Error))
}
labels := make([]label.KeyValue, 0)
labels = append(labels, label.String("db.type", c.DB.GetConfig().Type))
if c.DB.GetConfig().Host != "" {
labels = append(labels, label.String("db.host", c.DB.GetConfig().Host))
}
if c.DB.GetConfig().Port != "" {
labels = append(labels, label.String("db.port", c.DB.GetConfig().Port))
}
if c.DB.GetConfig().Name != "" {
labels = append(labels, label.String("db.name", c.DB.GetConfig().Name))
}
if c.DB.GetConfig().User != "" {
labels = append(labels, label.String("db.user", c.DB.GetConfig().User))
}
if filteredLinkInfo := c.DB.FilteredLinkInfo(); filteredLinkInfo != "" {
labels = append(labels, label.String("db.link", c.DB.FilteredLinkInfo()))
}
if group := c.DB.GetGroup(); group != "" {
labels = append(labels, label.String("db.group", group))
}
span.SetAttributes(labels...)
span.AddEvent("db.execution", trace.WithAttributes(
label.String(`db.execution.sql`, sql.Format),
label.String(`db.execution.cost`, fmt.Sprintf(`%d ms`, sql.End-sql.Start)),
label.String(`db.execution.type`, sql.Type),
))
}
// Exec commits one query SQL to underlying driver and returns the execution result.
// It is most commonly used for data inserting and updating.
func (c *Core) Exec(sql string, args ...interface{}) (result sql.Result, err error) {

View File

@ -0,0 +1,65 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
//
package gdb
import (
"context"
"fmt"
"github.com/gogf/gf"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/trace"
)
// addSqlToTracing adds sql information to tracer if it's enabled.
func (c *Core) addSqlToTracing(ctx context.Context, sql *Sql) {
if ctx == nil {
return
}
spanCtx := trace.SpanContextFromContext(ctx)
if traceId := spanCtx.TraceID; !traceId.IsValid() {
return
}
tr := otel.GetTracerProvider().Tracer(
"github.com/gogf/gf/database/gdb",
trace.WithInstrumentationVersion(fmt.Sprintf(`%s`, gf.VERSION)),
)
ctx, span := tr.Start(ctx, sql.Type)
defer span.End()
if sql.Error != nil {
span.SetStatus(codes.Error, fmt.Sprintf(`%+v`, sql.Error))
}
labels := make([]label.KeyValue, 0)
labels = append(labels, label.String("db.type", c.DB.GetConfig().Type))
if c.DB.GetConfig().Host != "" {
labels = append(labels, label.String("db.host", c.DB.GetConfig().Host))
}
if c.DB.GetConfig().Port != "" {
labels = append(labels, label.String("db.port", c.DB.GetConfig().Port))
}
if c.DB.GetConfig().Name != "" {
labels = append(labels, label.String("db.name", c.DB.GetConfig().Name))
}
if c.DB.GetConfig().User != "" {
labels = append(labels, label.String("db.user", c.DB.GetConfig().User))
}
if filteredLinkInfo := c.DB.FilteredLinkInfo(); filteredLinkInfo != "" {
labels = append(labels, label.String("db.link", c.DB.FilteredLinkInfo()))
}
if group := c.DB.GetGroup(); group != "" {
labels = append(labels, label.String("db.group", group))
}
span.SetAttributes(labels...)
span.AddEvent("db.execution", trace.WithAttributes(
label.String(`db.execution.sql`, sql.Format),
label.String(`db.execution.cost`, fmt.Sprintf(`%d ms`, sql.End-sql.Start)),
label.String(`db.execution.type`, sql.Type),
))
}

View File

@ -80,7 +80,7 @@ func (c *Conn) do(timeout time.Duration, commandName string, args ...interface{}
if ctx == nil {
ctx = context.Background()
}
_, span := tr.Start(ctx, commandName)
_, span := tr.Start(ctx, "Redis."+commandName)
defer span.End()
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf(`%+v`, err))

View File

@ -52,7 +52,7 @@ func MiddlewareTracing(c *Client, r *http.Request) (response *Response, err erro
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf(`%+v`, err))
}
if response == nil {
if response == nil || response.Response == nil {
return
}
var resBodyContent string

View File

@ -38,6 +38,11 @@ func newClientTrace(ctx context.Context, span trace.Span, request *http.Request)
request: request,
headers: make(map[string]interface{}),
}
if ct.request.ContentLength <= tracingMaxContentLogSize {
reqBodyContent, _ := ioutil.ReadAll(ct.request.Body)
ct.requestBody = reqBodyContent
ct.request.Body = utils.NewReadCloser(reqBodyContent, false)
}
return &httptrace.ClientTrace{
GetConn: ct.getConn,
GotConn: ct.gotConn,
@ -131,11 +136,7 @@ func (ct *clientTracer) wroteHeaderField(k string, v []string) {
}
func (ct *clientTracer) wroteHeaders() {
if ct.request.ContentLength <= tracingMaxContentLogSize {
reqBodyContent, _ := ioutil.ReadAll(ct.request.Body)
ct.requestBody = reqBodyContent
ct.request.Body = utils.NewReadCloser(reqBodyContent, false)
}
}
func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {

View File

@ -25,6 +25,16 @@ func Tracer(name ...string) trace.Tracer {
return otel.Tracer(tracerName)
}
// GetTraceId retrieves and returns TraceId from context.
func GetTraceId(ctx context.Context) string {
return trace.SpanContextFromContext(ctx).TraceID.String()
}
// GetSpanId retrieves and returns SpanId from context.
func GetSpanId(ctx context.Context) string {
return trace.SpanContextFromContext(ctx).SpanID.String()
}
// SetBaggageValue is a convenient function for adding one key-value pair to baggage.
// Note that it uses label.Any to set the key-value pair.
func SetBaggageValue(ctx context.Context, key string, value interface{}) context.Context {