@@ -118,7 +118,7 @@ private static final class Worker implements Runnable {
118118 private void addSpan (RecordEventsSpanImpl span ) {
119119 synchronized (monitor ) {
120120 this .spans .add (span );
121- if (spans .size () > bufferSize ) {
121+ if (spans .size () >= bufferSize ) {
122122 monitor .notifyAll ();
123123 }
124124 }
@@ -152,27 +152,17 @@ private void onBatchExport(List<SpanData> spanDataList) {
152152 }
153153
154154 private Worker (int bufferSize , Duration scheduleDelay ) {
155- spans = new ArrayList <RecordEventsSpanImpl >(bufferSize );
155+ spans = new ArrayList <>(bufferSize );
156156 this .bufferSize = bufferSize ;
157157 this .scheduleDelayMillis = scheduleDelay .toMillis ();
158158 }
159159
160- // Returns an unmodifiable list of all buffered spans data to ensure that any registered
161- // service handler cannot modify the list.
162- private static List <SpanData > fromSpanImplToSpanData (List <RecordEventsSpanImpl > spans ) {
163- List <SpanData > spanDatas = new ArrayList <SpanData >(spans .size ());
164- for (RecordEventsSpanImpl span : spans ) {
165- spanDatas .add (span .toSpanData ());
166- }
167- return Collections .unmodifiableList (spanDatas );
168- }
169-
170160 @ Override
171161 public void run () {
172162 while (true ) {
173163 // Copy all the batched spans in a separate list to release the monitor lock asap to
174164 // avoid blocking the producer thread.
175- List <RecordEventsSpanImpl > spansCopy ;
165+ ArrayList <RecordEventsSpanImpl > spansCopy ;
176166 synchronized (monitor ) {
177167 if (spans .size () < bufferSize ) {
178168 do {
@@ -187,27 +177,44 @@ public void run() {
187177 }
188178 } while (spans .isEmpty ());
189179 }
190- spansCopy = new ArrayList <RecordEventsSpanImpl >(spans );
180+ spansCopy = new ArrayList <>(spans );
191181 spans .clear ();
192182 }
193183 // Execute the batch export outside the synchronized to not block all producers.
194- final List <SpanData > spanDataList = fromSpanImplToSpanData (spansCopy );
195- if (!spanDataList .isEmpty ()) {
196- onBatchExport (spanDataList );
197- }
184+ exportBatches (spansCopy );
198185 }
199186 }
200187
201- void flush () {
202- List <RecordEventsSpanImpl > spansCopy ;
188+ private void flush () {
189+ ArrayList <RecordEventsSpanImpl > spansCopy ;
203190 synchronized (monitor ) {
204- spansCopy = new ArrayList <RecordEventsSpanImpl >(spans );
191+ spansCopy = new ArrayList <>(spans );
205192 spans .clear ();
206193 }
194+ // Execute the batch export outside the synchronized to not block all producers.
195+ exportBatches (spansCopy );
196+ }
207197
208- final List <SpanData > spanDataList = fromSpanImplToSpanData (spansCopy );
198+ @ SuppressWarnings ("argument.type.incompatible" )
199+ private void exportBatches (ArrayList <RecordEventsSpanImpl > spanList ) {
200+ ArrayList <SpanData > spanDataList = new ArrayList <>(bufferSize );
201+ for (int i = 0 ; i < spanList .size (); i ++) {
202+ spanDataList .add (spanList .get (i ).toSpanData ());
203+ // Remove the reference to the RecordEventsSpanImpl to allow GC to free the memory.
204+ spanList .set (i , null );
205+ if (spanDataList .size () == bufferSize ) {
206+ // One full batch, export it now. Wrap the list with unmodifiableList to ensure exporter
207+ // does not change the list.
208+ onBatchExport (Collections .unmodifiableList (spanDataList ));
209+ // Cannot clear because the exporter may still have a reference to this list (e.g. async
210+ // scheduled work), so just create a new list.
211+ spanDataList = new ArrayList <>(bufferSize );
212+ }
213+ }
214+ // Last incomplete batch, send this as well.
209215 if (!spanDataList .isEmpty ()) {
210- onBatchExport (spanDataList );
216+ // Wrap the list with unmodifiableList to ensure exporter does not change the list.
217+ onBatchExport (Collections .unmodifiableList (spanDataList ));
211218 }
212219 }
213220 }
0 commit comments