Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 2e6b582

Browse files
author
Paulo Janotti
authored
Add zipkinv1 thrift 01 (#350)
* Add support to zipkin V1 thrift * Revert go.mod and go.sum * PR Feedback Main change: specialized functions to convert numerical binary annotations. * Remove no more spans sentinel error * Use slices instead of indexer function
1 parent 904c6ad commit 2e6b582

5 files changed

Lines changed: 657 additions & 16 deletions

File tree

receiver/zipkin/trace_receiver.go

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,19 @@ import (
2929
"strings"
3030
"sync"
3131

32-
"github.com/census-instrumentation/opencensus-service/translator/trace"
33-
34-
"go.opencensus.io/trace"
35-
36-
zipkinmodel "github.com/openzipkin/zipkin-go/model"
37-
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
38-
32+
"github.com/apache/thrift/lib/go/thrift"
3933
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
4034
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
4135
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
36+
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
37+
zipkinmodel "github.com/openzipkin/zipkin-go/model"
38+
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
39+
"go.opencensus.io/trace"
4240

4341
"github.com/census-instrumentation/opencensus-service/data"
4442
"github.com/census-instrumentation/opencensus-service/internal"
4543
"github.com/census-instrumentation/opencensus-service/receiver"
44+
"github.com/census-instrumentation/opencensus-service/translator/trace"
4645
)
4746

4847
// ZipkinReceiver type is used to handle spans received in the Zipkin format.
@@ -113,10 +112,46 @@ func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink rece
113112
}
114113

115114
// v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans.
116-
func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
115+
func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
116+
if hdr.Get("Content-Type") == "application/x-thrift" {
117+
zSpans, err := deserializeThrift(blob)
118+
if err != nil {
119+
return nil, err
120+
}
121+
122+
return tracetranslator.ZipkinV1ThriftBatchToOCProto(zSpans)
123+
}
117124
return tracetranslator.ZipkinV1JSONBatchToOCProto(blob)
118125
}
119126

127+
// deserializeThrift decodes Thrift bytes to a list of spans.
128+
// This code comes from jaegertracing/jaeger, ideally we should have imported
129+
// it but this was creating many conflicts so brought the code to here.
130+
// https://github.com/jaegertracing/jaeger/blob/6bc0c122bfca8e737a747826ae60a22a306d7019/model/converter/thrift/zipkin/deserialize.go#L36
131+
func deserializeThrift(b []byte) ([]*zipkincore.Span, error) {
132+
buffer := thrift.NewTMemoryBuffer()
133+
buffer.Write(b)
134+
135+
transport := thrift.NewTBinaryProtocolTransport(buffer)
136+
_, size, err := transport.ReadListBegin() // Ignore the returned element type
137+
if err != nil {
138+
return nil, err
139+
}
140+
141+
// We don't depend on the size returned by ReadListBegin to preallocate the array because it
142+
// sometimes returns a nil error on bad input and provides an unreasonably large int for size
143+
var spans []*zipkincore.Span
144+
for i := 0; i < size; i++ {
145+
zs := &zipkincore.Span{}
146+
if err = zs.Read(transport); err != nil {
147+
return nil, err
148+
}
149+
spans = append(spans, zs)
150+
}
151+
152+
return spans, nil
153+
}
154+
120155
// v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans.
121156
func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
122157
// This flag's reference is from:
@@ -253,7 +288,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
253288

254289
var receiverNameTag string
255290
if asZipkinv1 {
256-
ereqs, err = zr.v1ToTraceSpans(slurp)
291+
ereqs, err = zr.v1ToTraceSpans(slurp, r.Header)
257292
receiverNameTag = "zipkinV1"
258293
} else {
259294
ereqs, err = zr.v2ToTraceSpans(slurp, r.Header)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
[
2+
{
3+
"trace_id": 1068169210207794600,
4+
"name": "checkAvailability",
5+
"id": 1068169210207794600,
6+
"annotations": [
7+
{
8+
"timestamp": 1544805927448081,
9+
"value": "sr",
10+
"host": {
11+
"ipv4": -1407254524,
12+
"port": 0,
13+
"service_name": "service1"
14+
}
15+
},
16+
{
17+
"timestamp": 1544805927460102,
18+
"value": "ss",
19+
"host": {
20+
"ipv4": -1407254524,
21+
"port": 0,
22+
"service_name": "service1"
23+
}
24+
}
25+
]
26+
},
27+
{
28+
"trace_id": 1068169210207794600,
29+
"name": "checkStock",
30+
"id": -438055438563385046,
31+
"parent_id": 1068169210207794600,
32+
"timestamp": 1544805927453923,
33+
"duration": 3740,
34+
"annotations": [
35+
{
36+
"timestamp": 1544805927453923,
37+
"value": "cs",
38+
"host": {
39+
"ipv4": -1407254524,
40+
"port": 0,
41+
"service_name": "service1"
42+
}
43+
},
44+
{
45+
"timestamp": 1544805927457717,
46+
"value": "cr",
47+
"host": {
48+
"ipv4": -1407254524,
49+
"port": 0,
50+
"service_name": "service1"
51+
}
52+
}
53+
]
54+
},
55+
{
56+
"trace_id": 1068169210207794600,
57+
"name": "checkAvailability",
58+
"id": 1068169210207794600,
59+
"timestamp": 1544805927446743,
60+
"duration": 12956,
61+
"annotations": [
62+
{
63+
"timestamp": 1544805927446743,
64+
"value": "cs",
65+
"host": {
66+
"ipv4": -1407254526,
67+
"port": 0,
68+
"service_name": "front-proxy"
69+
}
70+
},
71+
{
72+
"timestamp": 1544805927460510,
73+
"value": "cr",
74+
"host": {
75+
"ipv4": -1407254526,
76+
"port": 0,
77+
"service_name": "front-proxy"
78+
}
79+
}
80+
]
81+
},
82+
{
83+
"trace_id": 1068169210207794600,
84+
"name": "checkStock",
85+
"id": -438055438563385046,
86+
"parent_id": 1068169210207794600,
87+
"annotations": [
88+
{
89+
"timestamp": 1544805927454487,
90+
"value": "sr",
91+
"host": {
92+
"ipv4": -1407254521,
93+
"port": 0,
94+
"service_name": "service2"
95+
}
96+
},
97+
{
98+
"timestamp": 1544805927457320,
99+
"value": "ss",
100+
"host": {
101+
"ipv4": -1407254521,
102+
"port": 0,
103+
"service_name": "service2"
104+
}
105+
}
106+
],
107+
"binary_annotations": [
108+
{
109+
"key": "http.url",
110+
"annotation_type": "STRING",
111+
"value": "aHR0cDovL2xvY2FsaG9zdDo5MDAwL3RyYWNlLzI="
112+
},
113+
{
114+
"key": "http.status_code",
115+
"annotation_type": "I64",
116+
"value": "AAAAAAAAAMgAAA=="
117+
},
118+
{
119+
"key": "success",
120+
"annotation_type": "BOOL",
121+
"value": "AQ=="
122+
}
123+
]
124+
},
125+
{
126+
"trace_id": 1068169210207794600,
127+
"name": "checkStock",
128+
"id": -129168404463703009,
129+
"parent_id": 1068169210207794600,
130+
"timestamp": 1544805927453923,
131+
"duration": 3740,
132+
"annotations": []
133+
}
134+
]

0 commit comments

Comments
 (0)