Skip to content
This repository was archived by the owner on Dec 18, 2023. It is now read-only.

Commit e14213a

Browse files
Liudmila MolkovaSergeyKanzhelev
authored andcommitted
Send spans in batches and stream export requests (#60)
1 parent 8b012a0 commit e14213a

File tree

1 file changed

+32
-18
lines changed

1 file changed

+32
-18
lines changed

src/OpenCensus.Exporter.Ocagent/Implementation/TraceExporterHandler.cs

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,27 @@ namespace OpenCensus.Exporter.Ocagent.Implementation
2727
using Google.Protobuf.WellKnownTypes;
2828

2929
using Grpc.Core;
30-
using Grpc.Core.Utils;
3130

3231
using Opencensus.Proto.Agent.Common.V1;
3332
using Opencensus.Proto.Agent.Trace.V1;
3433
using OpenCensus.Trace.Export;
3534

3635
internal class TraceExporterHandler : IHandler, IDisposable
3736
{
37+
private const uint MaxSpanBatchSize = 32;
3838
private readonly Channel channel;
3939
private readonly Opencensus.Proto.Agent.Trace.V1.TraceService.TraceServiceClient traceClient;
4040
private readonly ConcurrentQueue<ISpanData> spans = new ConcurrentQueue<ISpanData>();
4141
private readonly Node node;
42-
42+
private readonly uint spanBatchSize;
4343
private CancellationTokenSource cts;
4444
private Task runTask;
4545

46-
public TraceExporterHandler(string agentEndpoint, string hostName, string serviceName, ChannelCredentials credentials)
46+
public TraceExporterHandler(string agentEndpoint, string hostName, string serviceName, ChannelCredentials credentials, uint spanBatchSize = MaxSpanBatchSize)
4747
{
4848
this.channel = new Channel(agentEndpoint, credentials);
4949
this.traceClient = new TraceService.TraceServiceClient(this.channel);
50+
this.spanBatchSize = spanBatchSize;
5051

5152
this.node = new Node
5253
{
@@ -80,13 +81,14 @@ public void Export(IList<ISpanData> spanDataList)
8081

8182
foreach (var spanData in spanDataList)
8283
{
84+
// TODO back-pressure on the queue
8385
this.spans.Enqueue(spanData);
8486
}
8587
}
8688

8789
public void Dispose()
8890
{
89-
this.Stop().Wait();
91+
this.StopAsync().Wait();
9092
}
9193

9294
private static string GetAssemblyVersion(Assembly assembly)
@@ -104,7 +106,7 @@ private void Start()
104106
this.runTask = this.RunAsync(this.cts.Token);
105107
}
106108

107-
private async Task Stop()
109+
private async Task StopAsync()
108110
{
109111
if (this.cts != null)
110112
{
@@ -121,42 +123,54 @@ private async Task Stop()
121123

122124
private async Task RunAsync(CancellationToken cancellationToken)
123125
{
126+
var duplexCall = this.traceClient.Export();
124127
try
125128
{
126-
// TODO backpressure on the queue
127-
129+
bool firstRequest = true;
128130
while (!cancellationToken.IsCancellationRequested)
129131
{
132+
var spanExportRequest = new ExportTraceServiceRequest();
133+
if (firstRequest)
134+
{
135+
spanExportRequest.Node = this.node;
136+
}
137+
130138
// Spans
131-
if (this.spans.TryDequeue(out var spanData))
139+
bool hasSpans = false;
140+
141+
while (spanExportRequest.Spans.Count < this.spanBatchSize)
132142
{
143+
if (!this.spans.TryDequeue(out var spanData))
144+
{
145+
break;
146+
}
147+
133148
var protoSpan = spanData.ToProtoSpan();
134149
if (protoSpan == null)
135150
{
136151
continue;
137152
}
138153

139-
var spanExport = new ExportTraceServiceRequest();
140-
spanExport.Node = this.node;
141-
spanExport.Spans.Add(protoSpan);
154+
spanExportRequest.Spans.Add(protoSpan);
155+
hasSpans = true;
156+
}
142157

143-
// TODO:
144-
// write stream and read response stream (do not close)
145-
// add node to the first request only
146-
// workaround for https://github.com/Microsoft/ApplicationInsights-LocalForwarder/issues/31
147-
var duplexCall = this.traceClient.Export();
148-
await duplexCall.RequestStream.WriteAllAsync(new ExportTraceServiceRequest[] { spanExport }).ConfigureAwait(false);
158+
if (hasSpans)
159+
{
160+
await duplexCall.RequestStream.WriteAsync(spanExportRequest).ConfigureAwait(false);
161+
firstRequest = false;
149162
}
150163
else
151164
{
152165
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
153166
}
154167
}
168+
169+
await duplexCall.RequestStream.CompleteAsync().ConfigureAwait(false);
155170
}
156171
catch (RpcException)
157172
{
158173
// TODO: log
159-
160174
throw;
161175
}
162176
}

0 commit comments

Comments
 (0)