1717package io .opencensus .exporter .trace .elasticsearch ;
1818
1919import com .google .common .io .BaseEncoding ;
20- import com .google .common .util .concurrent .SimpleTimeLimiter ;
21- import com .google .common .util .concurrent .TimeLimiter ;
22- import io .opencensus .common .Duration ;
23- import io .opencensus .common .Scope ;
24- import io .opencensus .trace .Sampler ;
25- import io .opencensus .trace .Status ;
26- import io .opencensus .trace .Tracer ;
27- import io .opencensus .trace .Tracing ;
20+ import io .opencensus .exporter .trace .util .TimeLimitedHandler ;
2821import io .opencensus .trace .export .SpanData ;
29- import io .opencensus .trace .export .SpanExporter ;
30- import io .opencensus .trace .samplers .Samplers ;
22+ import java .io .Closeable ;
3123import java .io .IOException ;
3224import java .io .InputStream ;
3325import java .io .OutputStream ;
3729import java .nio .charset .Charset ;
3830import java .util .Collection ;
3931import java .util .List ;
40- import java .util .concurrent .Callable ;
41- import java .util .concurrent .Executors ;
42- import java .util .concurrent .TimeUnit ;
43- import java .util .concurrent .TimeoutException ;
32+ import javax .annotation .Nullable ;
4433
45- final class ElasticsearchTraceHandler extends SpanExporter . Handler {
34+ final class ElasticsearchTraceHandler extends TimeLimitedHandler {
4635
4736 private final ElasticsearchTraceConfiguration elasticsearchTraceConfiguration ;
4837 private final String appName ;
4938 private final URL indexUrl ;
50- private final Duration deadline ;
5139 private static final String CONTENT_TYPE = "application/json" ;
5240 private static final String REQUEST_METHOD = "POST" ;
5341 private static final int CONNECTION_TIMEOUT_MILLISECONDS = 6000 ;
54- private static final Tracer tracer = Tracing .getTracer ();
55- private static final Sampler probabilitySampler = Samplers .probabilitySampler (0.0001 );
42+ private static final String EXPORT_SPAN_NAME = "ExportElasticsearchTraces" ;
5643
5744 ElasticsearchTraceHandler (ElasticsearchTraceConfiguration elasticsearchTraceConfiguration )
5845 throws MalformedURLException {
59-
46+ super ( elasticsearchTraceConfiguration . getDeadline (), EXPORT_SPAN_NAME );
6047 this .elasticsearchTraceConfiguration = elasticsearchTraceConfiguration ;
6148 StringBuilder sb = new StringBuilder ();
6249 sb .append (elasticsearchTraceConfiguration .getElasticsearchUrl ()).append ("/" );
6350 sb .append (elasticsearchTraceConfiguration .getElasticsearchIndex ()).append ("/" );
6451 sb .append (elasticsearchTraceConfiguration .getElasticsearchType ()).append ("/" );
6552 indexUrl = new URL (sb .toString ());
6653 appName = elasticsearchTraceConfiguration .getAppName ();
67- deadline = elasticsearchTraceConfiguration .getDeadline ();
6854 }
6955
7056 /**
@@ -73,33 +59,7 @@ final class ElasticsearchTraceHandler extends SpanExporter.Handler {
7359 * @param spanDataList Collection of {@code SpanData} to be exported.
7460 */
7561 @ Override
76- public void export (final Collection <SpanData > spanDataList ) {
77- Scope scope =
78- tracer
79- .spanBuilder ("ExportElasticsearchTraces" )
80- .setSampler (probabilitySampler )
81- .setRecordEvents (true )
82- .startScopedSpan ();
83- try {
84- TimeLimiter timeLimiter = SimpleTimeLimiter .create (Executors .newSingleThreadExecutor ());
85- timeLimiter .callWithTimeout (
86- new Callable <Void >() {
87- @ Override
88- public Void call () {
89- doExport (spanDataList );
90- return null ;
91- }
92- },
93- deadline .toMillis (),
94- TimeUnit .MILLISECONDS );
95- } catch (Exception e ) {
96- handleException (e );
97- } finally {
98- scope .close ();
99- }
100- }
101-
102- private void doExport (Collection <SpanData > spanDataList ) {
62+ public void timeLimitedExport (Collection <SpanData > spanDataList ) throws Exception {
10363 List <String > jsonList = JsonConversionUtils .convertToJson (appName , spanDataList );
10464 if (jsonList .isEmpty ()) {
10565 return ;
@@ -130,36 +90,23 @@ private void doExport(Collection<SpanData> spanDataList) {
13090 outputStream .flush ();
13191 inputStream = connection .getInputStream ();
13292 if (connection .getResponseCode () != 200 ) {
133- handleException ( new Exception ("Response " + connection .getResponseCode () ));
93+ throw new Exception ("Response " + connection .getResponseCode ());
13494 }
135- } catch (IOException e ) {
136- handleException (e );
137- // dropping span batch
13895 } finally {
139- if (inputStream != null ) {
140- try {
141- inputStream .close ();
142- } catch (IOException e ) {
143- // ignore
144- }
145- }
146- if (outputStream != null ) {
147- try {
148- outputStream .close ();
149- } catch (IOException e ) {
150- // ignore
151- }
152- }
96+ closeStream (inputStream );
97+ closeStream (outputStream );
15398 }
15499 }
155100 }
156101
157- private static void handleException (Exception e ) {
158- Status status = e instanceof TimeoutException ? Status .DEADLINE_EXCEEDED : Status .UNKNOWN ;
159- tracer
160- .getCurrentSpan ()
161- .setStatus (
162- status .withDescription (
163- e .getMessage () == null ? e .getClass ().getSimpleName () : e .getMessage ()));
102+ // Closes an input or output stream and ignores potential IOException.
103+ private static void closeStream (@ Nullable Closeable stream ) {
104+ if (stream != null ) {
105+ try {
106+ stream .close ();
107+ } catch (IOException e ) {
108+ // ignore
109+ }
110+ }
164111 }
165112}
0 commit comments