opentelemetry

This commit is contained in:
jflyfox
2021-01-22 23:04:29 +08:00
parent ce640048b8
commit c27ddb1d79
7 changed files with 372 additions and 7 deletions

View File

@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Command jaeger is an example program that creates spans
// and uploads to Jaeger.
package main
import (
"context"
"fmt"
"github.com/gogf/gf/frame/g"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
"go.opentelemetry.io/otel/trace"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/label"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create and install Jaeger export pipeline.
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
jaeger.WithProcess(jaeger.Process{
ServiceName: "http-trace-demo",
Tags: []label.KeyValue{
label.String("exporter", "jaeger"),
label.Float64("float", 312.23),
},
}),
jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {
log.Fatal(err)
}
return flush
}
func main() {
ctx := context.Background()
flush := initTracer()
defer flush()
ctx, span := otel.Tracer("component-main").Start(ctx, "foo")
defer span.End()
content := g.Client().Ctx(ctx).Header(g.MapStrStr{
"test": "123",
"john": "smith",
}).Cookie(g.MapStrStr{
"cookieKey":"cookieValue",
}).GetContent("http://baidu.com/?q=goframe")
fmt.Println(content)
}
func bar(ctx context.Context) {
_, span := otel.Tracer("test").Start(ctx, "bar")
defer span.End()
span.AddEvent("Nice operation!", trace.WithAttributes(label.Int("bogons", 100)))
span.SetAttributes(label.String("test2", "123"))
time.Sleep(time.Second * 2)
// Do bar...
}

11
go.mod
View File

@ -10,9 +10,16 @@ require (
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/websocket v1.4.1
github.com/grokify/html-strip-tags-go v0.0.0-20190921062105-daaa06bf1aaf
github.com/kr/pretty v0.1.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/olekukonko/tablewriter v0.0.1
golang.org/x/net v0.0.0-20200602114024-627f9648deb9
golang.org/x/text v0.3.2
go.opentelemetry.io/otel v0.16.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.16.0
go.opentelemetry.io/otel/sdk v0.16.0
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3 // indirect
golang.org/x/text v0.3.4
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
)

View File

@ -9,6 +9,8 @@ package ghttp
import (
"context"
"crypto/tls"
"fmt"
"github.com/gogf/gf"
"github.com/gogf/gf/text/gstr"
"golang.org/x/net/proxy"
"net"
@ -24,6 +26,7 @@ import (
type Client struct {
http.Client // Underlying HTTP Client.
ctx context.Context // Context for each request.
agent string // Client agent.
parent *Client // Parent http client, this is used for chaining operations.
header map[string]string // Custom header map.
cookies map[string]string // Custom cookie map.
@ -36,6 +39,10 @@ type Client struct {
middlewareHandler []ClientHandlerFunc // Interceptor handlers
}
var (
defaultClientAgent = fmt.Sprintf(`GoFrameHTTPClient %s`, gf.VERSION)
)
// NewClient creates and returns a new HTTP client object.
func NewClient() *Client {
return &Client{
@ -50,6 +57,7 @@ func NewClient() *Client {
},
header: make(map[string]string),
cookies: make(map[string]string),
agent: defaultClientAgent,
}
}
@ -90,6 +98,12 @@ func (c *Client) SetHeaderMap(m map[string]string) *Client {
return c
}
// SetAgent sets the User-Agent header for client.
func (c *Client) SetAgent(agent string) *Client {
c.header["User-Agent"] = agent
return c
}
// SetContentType sets HTTP content type for the client.
func (c *Client) SetContentType(contentType string) *Client {
c.header["Content-Type"] = contentType

View File

@ -11,9 +11,13 @@ import (
"context"
"errors"
"fmt"
"github.com/gogf/gf"
"github.com/gogf/gf/internal/intlog"
"github.com/gogf/gf/internal/json"
"github.com/gogf/gf/internal/utils"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/trace"
"io"
"io/ioutil"
"mime/multipart"
@ -97,6 +101,19 @@ func (c *Client) DoRequest(method, url string, data ...interface{}) (resp *Clien
return nil, err
}
// Tracing.
tr := otel.GetTracerProvider().Tracer(
"github.com/gogf/gf/net/ghttp.client",
trace.WithInstrumentationVersion(fmt.Sprintf(`%s`, gf.VERSION)),
)
ctx, span := tr.Start(req.Context(), req.URL.String())
defer span.End()
// Header (Cookie is in it).
if len(req.Header) > 0 {
span.SetAttributes(label.Any(`http.headers`, req.Header))
}
req = req.WithContext(ctx)
// Client middleware.
if len(c.middlewareHandler) > 0 {
mdlHandlers := make([]ClientHandlerFunc, 0, len(c.middlewareHandler)+1)
@ -278,6 +295,10 @@ func (c *Client) prepareRequest(method, url string, data ...interface{}) (req *h
if len(c.authUser) > 0 {
req.SetBasicAuth(c.authUser, c.authPass)
}
// Client agent.
if c.agent != "" {
req.Header.Set("User-Agent", c.agent)
}
return req, nil
}

View File

@ -0,0 +1,242 @@
package gtrace
import (
"context"
"crypto/tls"
"fmt"
"github.com/gogf/gf"
"net/http/httptrace"
"net/textproto"
"strings"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
)
var (
HTTPStatus = label.Key("http.status")
HTTPHeaderMIME = label.Key("http.mime")
HTTPRemoteAddr = label.Key("http.remote")
HTTPLocalAddr = label.Key("http.local")
)
var (
hookMap = map[string]string{
"http.dns": "http.getconn",
"http.connect": "http.getconn",
"http.tls": "http.getconn",
}
)
func parentHook(hook string) string {
if strings.HasPrefix(hook, "http.connect") {
return hookMap["http.connect"]
}
return hookMap[hook]
}
type clientTracer struct {
context.Context
tr trace.Tracer
activeHooks map[string]context.Context
root trace.Span
mtx sync.Mutex
}
func NewClientTrace(ctx context.Context) *httptrace.ClientTrace {
ct := &clientTracer{
Context: ctx,
activeHooks: make(map[string]context.Context),
}
ct.tr = otel.GetTracerProvider().Tracer(
"github.com/gogf/gf/net/ghttp.client",
trace.WithInstrumentationVersion(fmt.Sprintf(`%s`, gf.VERSION)),
)
return &httptrace.ClientTrace{
GetConn: ct.getConn,
GotConn: ct.gotConn,
PutIdleConn: ct.putIdleConn,
GotFirstResponseByte: ct.gotFirstResponseByte,
Got100Continue: ct.got100Continue,
Got1xxResponse: ct.got1xxResponse,
DNSStart: ct.dnsStart,
DNSDone: ct.dnsDone,
ConnectStart: ct.connectStart,
ConnectDone: ct.connectDone,
TLSHandshakeStart: ct.tlsHandshakeStart,
TLSHandshakeDone: ct.tlsHandshakeDone,
WroteHeaderField: ct.wroteHeaderField,
WroteHeaders: ct.wroteHeaders,
Wait100Continue: ct.wait100Continue,
WroteRequest: ct.wroteRequest,
}
}
func (ct *clientTracer) start(hook, spanName string, attrs ...label.KeyValue) {
ct.mtx.Lock()
defer ct.mtx.Unlock()
if hookCtx, found := ct.activeHooks[hook]; !found {
var sp trace.Span
ct.activeHooks[hook], sp = ct.tr.Start(ct.getParentContext(hook), spanName, trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindClient))
if ct.root == nil {
ct.root = sp
}
} else {
// end was called before start finished, add the start attributes and end the span here
span := trace.SpanFromContext(hookCtx)
span.SetAttributes(attrs...)
span.End()
delete(ct.activeHooks, hook)
}
}
func (ct *clientTracer) end(hook string, err error, attrs ...label.KeyValue) {
ct.mtx.Lock()
defer ct.mtx.Unlock()
if ctx, ok := ct.activeHooks[hook]; ok {
span := trace.SpanFromContext(ctx)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
span.SetAttributes(attrs...)
span.End()
delete(ct.activeHooks, hook)
} else {
// start is not finished before end is called.
// Start a span here with the ending attributes that will be finished when start finishes.
// Yes, it's backwards. v0v
ctx, span := ct.tr.Start(ct.getParentContext(hook), hook, trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindClient))
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
ct.activeHooks[hook] = ctx
}
}
func (ct *clientTracer) getParentContext(hook string) context.Context {
ctx, ok := ct.activeHooks[parentHook(hook)]
if !ok {
return ct.Context
}
return ctx
}
func (ct *clientTracer) span(hook string) trace.Span {
ct.mtx.Lock()
defer ct.mtx.Unlock()
if ctx, ok := ct.activeHooks[hook]; ok {
return trace.SpanFromContext(ctx)
}
return nil
}
func (ct *clientTracer) getConn(host string) {
ct.start("http.getconn", "http.getconn", semconv.HTTPHostKey.String(host))
}
func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) {
ct.end("http.getconn",
nil,
HTTPRemoteAddr.String(info.Conn.RemoteAddr().String()),
HTTPLocalAddr.String(info.Conn.LocalAddr().String()),
)
}
func (ct *clientTracer) putIdleConn(err error) {
ct.end("http.receive", err)
}
func (ct *clientTracer) gotFirstResponseByte() {
ct.start("http.receive", "http.receive")
}
func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) {
ct.start("http.dns", "http.dns", semconv.HTTPHostKey.String(info.Host))
}
func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) {
ct.end("http.dns", info.Err)
}
func (ct *clientTracer) connectStart(network, addr string) {
ct.start("http.connect."+addr, "http.connect", HTTPRemoteAddr.String(addr))
}
func (ct *clientTracer) connectDone(network, addr string, err error) {
ct.end("http.connect."+addr, err)
}
func (ct *clientTracer) tlsHandshakeStart() {
ct.start("http.tls", "http.tls")
}
func (ct *clientTracer) tlsHandshakeDone(_ tls.ConnectionState, err error) {
ct.end("http.tls", err)
}
func (ct *clientTracer) wroteHeaderField(k string, v []string) {
if ct.span("http.headers") == nil {
ct.start("http.headers", "http.headers")
}
ct.root.SetAttributes(label.String("http."+strings.ToLower(k), sliceToString(v)))
}
func (ct *clientTracer) wroteHeaders() {
if ct.span("http.headers") != nil {
ct.end("http.headers", nil)
}
ct.start("http.send", "http.send")
}
func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {
if info.Err != nil {
ct.root.SetStatus(codes.Error, info.Err.Error())
}
ct.end("http.send", info.Err)
}
func (ct *clientTracer) got100Continue() {
ct.span("http.receive").AddEvent("GOT 100 - Continue")
}
func (ct *clientTracer) wait100Continue() {
ct.span("http.receive").AddEvent("GOT 100 - Wait")
}
func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error {
ct.span("http.receive").AddEvent("GOT 1xx", trace.WithAttributes(
HTTPStatus.Int(code),
HTTPHeaderMIME.String(sm2s(header)),
))
return nil
}
func sliceToString(value []string) string {
if len(value) == 0 {
return "undefined"
}
return strings.Join(value, ",")
}
func sm2s(value map[string][]string) string {
var buf strings.Builder
for k, v := range value {
if buf.Len() != 0 {
buf.WriteString(",")
}
buf.WriteString(k)
buf.WriteString("=")
buf.WriteString(sliceToString(v))
}
return buf.String()
}

View File

@ -7,6 +7,7 @@
package gcron_test
import (
"github.com/gogf/gf/os/glog"
"testing"
"time"
@ -20,18 +21,19 @@ func TestCron_Add_Close(t *testing.T) {
cron := gcron.New()
array := garray.New(true)
_, err1 := cron.Add("* * * * * *", func() {
//glog.Println("cron1")
glog.Println("cron1")
array.Append(1)
})
_, err2 := cron.Add("* * * * * *", func() {
//glog.Println("cron2")
glog.Println("cron2")
array.Append(1)
}, "test")
_, err3 := cron.Add("* * * * * *", func() {
glog.Println("cron3")
array.Append(1)
}, "test")
_, err4 := cron.Add("@every 2s", func() {
//glog.Println("cron3")
glog.Println("cron4")
array.Append(1)
})
t.Assert(err1, nil)
@ -39,7 +41,7 @@ func TestCron_Add_Close(t *testing.T) {
t.AssertNE(err3, nil)
t.Assert(err4, nil)
t.Assert(cron.Size(), 3)
time.Sleep(1200 * time.Millisecond)
time.Sleep(1300 * time.Millisecond)
t.Assert(array.Len(), 2)
time.Sleep(1400 * time.Millisecond)
t.Assert(array.Len(), 5)

View File

@ -47,7 +47,7 @@ func TestCron_Entry_Operations(t *testing.T) {
t.Assert(array.Len(), 1)
t.Assert(cron.Size(), 1)
entry.Stop()
time.Sleep(2000 * time.Millisecond)
time.Sleep(5000 * time.Millisecond)
t.Assert(array.Len(), 1)
t.Assert(cron.Size(), 1)
entry.Start()