Skip to content
This repository was archived by the owner on Dec 18, 2023. It is now read-only.

Commit 8b012a0

Browse files
Liudmila MolkovaSergeyKanzhelev
authored andcommitted
Ocagent exporter implementation (#57)
* First ocagent exporter implementation * First ocagent exporter implementation * cleanup and more todos
1 parent 383f8d1 commit 8b012a0

File tree

14 files changed

+7426
-3
lines changed

14 files changed

+7426
-3
lines changed

OpenCensus.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "testdata", "testdata", "{77
5757
EndProject
5858
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestApp.AspNetCore.2.0", "test\TestApp.AspNetCore.2.0\TestApp.AspNetCore.2.0.csproj", "{F2F81E76-6A0E-466B-B673-EBBF1A9ED075}"
5959
EndProject
60+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenCensus.Exporter.Ocagent", "src\OpenCensus.Exporter.Ocagent\OpenCensus.Exporter.Ocagent.csproj", "{56B0ED25-8A14-4AA2-B59D-FAAFCBACDD4A}"
61+
EndProject
6062
Global
6163
GlobalSection(SolutionConfigurationPlatforms) = preSolution
6264
Debug|Any CPU = Debug|Any CPU
@@ -115,6 +117,10 @@ Global
115117
{F2F81E76-6A0E-466B-B673-EBBF1A9ED075}.Debug|Any CPU.Build.0 = Debug|Any CPU
116118
{F2F81E76-6A0E-466B-B673-EBBF1A9ED075}.Release|Any CPU.ActiveCfg = Release|Any CPU
117119
{F2F81E76-6A0E-466B-B673-EBBF1A9ED075}.Release|Any CPU.Build.0 = Release|Any CPU
120+
{56B0ED25-8A14-4AA2-B59D-FAAFCBACDD4A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
121+
{56B0ED25-8A14-4AA2-B59D-FAAFCBACDD4A}.Debug|Any CPU.Build.0 = Debug|Any CPU
122+
{56B0ED25-8A14-4AA2-B59D-FAAFCBACDD4A}.Release|Any CPU.ActiveCfg = Release|Any CPU
123+
{56B0ED25-8A14-4AA2-B59D-FAAFCBACDD4A}.Release|Any CPU.Build.0 = Release|Any CPU
118124
EndGlobalSection
119125
GlobalSection(SolutionProperties) = preSolution
120126
HideSolutionNode = FALSE

src/OpenCensus.Collector.AspNetCore/Implementation/HttpInListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public override void OnStartActivity(Activity activity, object payload)
5959
{
6060
span.PutServerSpanKindAttribute();
6161
span.PutHttpMethodAttribute(request.Method);
62-
span.PutHttpHostAttribute(request.Host.Value, request.Host.Port ?? 80);
62+
span.PutHttpHostAttribute(request.Host.Host, request.Host.Port ?? 80);
6363
span.PutHttpPathAttribute(request.Path);
6464
}
6565
}
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// <copyright file="SpanDataExtentions.cs" company="OpenCensus Authors">
2+
// Copyright 2018, OpenCensus Authors
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of theLicense at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
// </copyright>
16+
17+
namespace OpenCensus.Exporter.Ocagent.Implementation
18+
{
19+
using System;
20+
using System.Collections.Generic;
21+
using System.Linq;
22+
23+
using Google.Protobuf;
24+
using Google.Protobuf.WellKnownTypes;
25+
26+
using Opencensus.Proto.Trace.V1;
27+
using OpenCensus.Trace;
28+
using OpenCensus.Trace.Export;
29+
30+
internal static class SpanDataExtentions
31+
{
32+
internal static Span ToProtoSpan(this ISpanData spanData)
33+
{
34+
try
35+
{
36+
return new Span
37+
{
38+
Name = new TruncatableString { Value = spanData.Name },
39+
Kind = spanData.Kind == SpanKind.Client ? Span.Types.SpanKind.Client : Span.Types.SpanKind.Server,
40+
TraceId = ByteString.CopyFrom(spanData.Context.TraceId.Bytes),
41+
SpanId = ByteString.CopyFrom(spanData.Context.SpanId.Bytes),
42+
ParentSpanId =
43+
ByteString.CopyFrom(spanData.ParentSpanId?.Bytes ?? new byte[0]),
44+
45+
StartTime = new Timestamp
46+
{
47+
Nanos = spanData.StartTimestamp.Nanos,
48+
Seconds = spanData.StartTimestamp.Seconds,
49+
},
50+
EndTime = new Timestamp
51+
{
52+
Nanos = spanData.EndTimestamp.Nanos,
53+
Seconds = spanData.EndTimestamp.Seconds,
54+
},
55+
Status = spanData.Status == null
56+
? null
57+
: new Opencensus.Proto.Trace.V1.Status
58+
{
59+
Code = (int)spanData.Status.CanonicalCode,
60+
Message = spanData.Status.Description ?? string.Empty,
61+
},
62+
SameProcessAsParentSpan =
63+
!spanData.HasRemoteParent.GetValueOrDefault() && spanData.ParentSpanId != null,
64+
ChildSpanCount = spanData.ChildSpanCount.HasValue ? (uint)spanData.ChildSpanCount.Value : 0,
65+
Attributes = FromIAttributes(spanData.Attributes),
66+
TimeEvents = FromITimeEvents(spanData.MessageEvents, spanData.Annotations),
67+
Links = new Span.Types.Links
68+
{
69+
DroppedLinksCount = spanData.Links.DroppedLinksCount,
70+
Link = { spanData.Links.Links.Select(FromILink), },
71+
},
72+
};
73+
}
74+
catch (Exception e)
75+
{
76+
Console.WriteLine(e);
77+
78+
// TODO: log
79+
}
80+
81+
return null;
82+
}
83+
84+
private static Span.Types.Attributes FromIAttributes(IAttributes source)
85+
{
86+
var attributes = new Span.Types.Attributes
87+
{
88+
DroppedAttributesCount = source.DroppedAttributesCount,
89+
};
90+
91+
attributes.AttributeMap.Add(source.AttributeMap.ToDictionary(
92+
kvp => kvp.Key,
93+
kvp => new Opencensus.Proto.Trace.V1.AttributeValue
94+
{
95+
StringValue = new TruncatableString
96+
{
97+
Value = kvp.Value.Match(s => s, b => b.ToString(), l => l.ToString(), d => d.ToString(), o => o?.ToString()),
98+
},
99+
100+
// todo: how to determine AttributeValue type?
101+
}));
102+
103+
return attributes;
104+
}
105+
106+
private static Span.Types.TimeEvent FromITimeEvent(ITimedEvent<IMessageEvent> source)
107+
{
108+
return new Span.Types.TimeEvent
109+
{
110+
Time = new Timestamp
111+
{
112+
Nanos = source.Timestamp.Nanos,
113+
Seconds = source.Timestamp.Seconds,
114+
},
115+
MessageEvent = new Span.Types.TimeEvent.Types.MessageEvent
116+
{
117+
Type = source.Event.Type == MessageEventType.SENT ? Span.Types.TimeEvent.Types.MessageEvent.Types.Type.Sent : Span.Types.TimeEvent.Types.MessageEvent.Types.Type.Received,
118+
CompressedSize = (ulong)source.Event.CompressedMessageSize,
119+
UncompressedSize = (ulong)source.Event.UncompressedMessageSize,
120+
Id = (ulong)source.Event.MessageId,
121+
},
122+
};
123+
}
124+
125+
private static Span.Types.TimeEvents FromITimeEvents(ITimedEvents<IMessageEvent> messages, ITimedEvents<IAnnotation> annotations)
126+
{
127+
var timedEvents = new Span.Types.TimeEvents
128+
{
129+
DroppedMessageEventsCount = messages.DroppedEventsCount,
130+
DroppedAnnotationsCount = annotations.DroppedEventsCount,
131+
TimeEvent = { messages.Events.Select(FromITimeEvent), },
132+
};
133+
134+
timedEvents.TimeEvent.AddRange(annotations.Events.Select(FromITimeEvent));
135+
136+
return timedEvents;
137+
}
138+
139+
private static Span.Types.Link FromILink(ILink source)
140+
{
141+
return new Span.Types.Link
142+
{
143+
TraceId = ByteString.CopyFrom(source.TraceId.Bytes),
144+
SpanId = ByteString.CopyFrom(source.SpanId.Bytes),
145+
Type = source.Type == LinkType.CHILD_LINKED_SPAN ? Span.Types.Link.Types.Type.ChildLinkedSpan : Span.Types.Link.Types.Type.ParentLinkedSpan,
146+
Attributes = FromIAttributeMap(source.Attributes),
147+
};
148+
}
149+
150+
private static Span.Types.TimeEvent FromITimeEvent(ITimedEvent<IAnnotation> source)
151+
{
152+
return new Span.Types.TimeEvent
153+
{
154+
Time = new Timestamp
155+
{
156+
Nanos = source.Timestamp.Nanos,
157+
Seconds = source.Timestamp.Seconds,
158+
},
159+
Annotation = new Span.Types.TimeEvent.Types.Annotation
160+
{
161+
Description = new TruncatableString { Value = source.Event.Description },
162+
Attributes = FromIAttributeMap(source.Event.Attributes),
163+
},
164+
};
165+
}
166+
167+
private static Span.Types.Attributes FromIAttributeMap(IDictionary<string, IAttributeValue> source)
168+
{
169+
var attributes = new Span.Types.Attributes();
170+
171+
attributes.AttributeMap.Add(source.ToDictionary(
172+
kvp => kvp.Key,
173+
kvp => new Opencensus.Proto.Trace.V1.AttributeValue
174+
{
175+
StringValue = new TruncatableString
176+
{
177+
Value = kvp.Value.Match(s => s, b => b.ToString(), l => l.ToString(), d => d.ToString(), o => o?.ToString()),
178+
},
179+
180+
// todo: how to determine AttributeValue type?
181+
}));
182+
183+
return attributes;
184+
}
185+
}
186+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// <copyright file="TraceExporterHandler.cs" company="OpenCensus Authors">
2+
// Copyright 2018, OpenCensus Authors
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of theLicense at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
// </copyright>
16+
17+
namespace OpenCensus.Exporter.Ocagent.Implementation
18+
{
19+
using System;
20+
using System.Collections.Concurrent;
21+
using System.Collections.Generic;
22+
using System.Diagnostics;
23+
using System.Reflection;
24+
using System.Threading;
25+
using System.Threading.Tasks;
26+
27+
using Google.Protobuf.WellKnownTypes;
28+
29+
using Grpc.Core;
30+
using Grpc.Core.Utils;
31+
32+
using Opencensus.Proto.Agent.Common.V1;
33+
using Opencensus.Proto.Agent.Trace.V1;
34+
using OpenCensus.Trace.Export;
35+
36+
internal class TraceExporterHandler : IHandler, IDisposable
37+
{
38+
private readonly Channel channel;
39+
private readonly Opencensus.Proto.Agent.Trace.V1.TraceService.TraceServiceClient traceClient;
40+
private readonly ConcurrentQueue<ISpanData> spans = new ConcurrentQueue<ISpanData>();
41+
private readonly Node node;
42+
43+
private CancellationTokenSource cts;
44+
private Task runTask;
45+
46+
public TraceExporterHandler(string agentEndpoint, string hostName, string serviceName, ChannelCredentials credentials)
47+
{
48+
this.channel = new Channel(agentEndpoint, credentials);
49+
this.traceClient = new TraceService.TraceServiceClient(this.channel);
50+
51+
this.node = new Node
52+
{
53+
Identifier = new ProcessIdentifier
54+
{
55+
HostName = hostName,
56+
Pid = (uint)Process.GetCurrentProcess().Id,
57+
StartTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
58+
},
59+
LibraryInfo = new LibraryInfo
60+
{
61+
Language = LibraryInfo.Types.Language.CSharp,
62+
CoreLibraryVersion = GetAssemblyVersion(typeof(ISpanData).Assembly),
63+
ExporterVersion = GetAssemblyVersion(typeof(OcagentExporter).Assembly),
64+
},
65+
ServiceInfo = new ServiceInfo
66+
{
67+
Name = serviceName,
68+
},
69+
};
70+
71+
this.Start();
72+
}
73+
74+
public void Export(IList<ISpanData> spanDataList)
75+
{
76+
if (this.cts == null || this.cts.IsCancellationRequested)
77+
{
78+
return;
79+
}
80+
81+
foreach (var spanData in spanDataList)
82+
{
83+
this.spans.Enqueue(spanData);
84+
}
85+
}
86+
87+
public void Dispose()
88+
{
89+
this.Stop().Wait();
90+
}
91+
92+
private static string GetAssemblyVersion(Assembly assembly)
93+
{
94+
AssemblyFileVersionAttribute fileVersionAttr = assembly.GetCustomAttribute<AssemblyFileVersionAttribute>();
95+
return fileVersionAttr?.Version ?? "0.0.0";
96+
}
97+
98+
private void Start()
99+
{
100+
// TODO Config
101+
// TODO handle connection errors & retries
102+
103+
this.cts = new CancellationTokenSource();
104+
this.runTask = this.RunAsync(this.cts.Token);
105+
}
106+
107+
private async Task Stop()
108+
{
109+
if (this.cts != null)
110+
{
111+
this.cts.Cancel(false);
112+
113+
// ignore all exceptions
114+
await this.runTask.ContinueWith(t => { }).ConfigureAwait(false);
115+
116+
this.cts.Dispose();
117+
this.cts = null;
118+
this.runTask = null;
119+
}
120+
}
121+
122+
private async Task RunAsync(CancellationToken cancellationToken)
123+
{
124+
try
125+
{
126+
// TODO backpressure on the queue
127+
128+
while (!cancellationToken.IsCancellationRequested)
129+
{
130+
// Spans
131+
if (this.spans.TryDequeue(out var spanData))
132+
{
133+
var protoSpan = spanData.ToProtoSpan();
134+
if (protoSpan == null)
135+
{
136+
continue;
137+
}
138+
139+
var spanExport = new ExportTraceServiceRequest();
140+
spanExport.Node = this.node;
141+
spanExport.Spans.Add(protoSpan);
142+
143+
// TODO:
144+
// write stream and read response stream (do not close)
145+
// add node to the first request only
146+
// workaround for https://github.com/Microsoft/ApplicationInsights-LocalForwarder/issues/31
147+
var duplexCall = this.traceClient.Export();
148+
await duplexCall.RequestStream.WriteAllAsync(new ExportTraceServiceRequest[] { spanExport }).ConfigureAwait(false);
149+
}
150+
else
151+
{
152+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
153+
}
154+
}
155+
}
156+
catch (RpcException)
157+
{
158+
// TODO: log
159+
160+
throw;
161+
}
162+
}
163+
}
164+
}

0 commit comments

Comments
 (0)