Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit afdb3a1

Browse files
author
Steven Karis
authored
nodebatcher: Allow batcher pre-allocations to reduce on new batch (#419)
* Allow batcher pre-allocations to reduce on new batch Currently, we re-use the previous capacity size when pre-allocating space for the new batch in nodebatcher. This doesn't work well when a node has had high throughput and then stops sending traffic, or maybe the process that was sending a lot of traffic shifts to another node. This PR reduces allocation space based not just on current cap, but also current size to fix this. * Move comment placement * Refactor marginally * PR comments
1 parent 9b5a4ab commit afdb3a1

1 file changed

Lines changed: 9 additions & 2 deletions

File tree

internal/collector/processor/nodebatcher/node_batcher.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import (
3737
)
3838

3939
const (
40-
initialBatchCapacity = uint32(1024)
40+
initialBatchCapacity = uint32(512)
4141
nodeStatusDead = uint32(1)
4242
batchStatusClosed = uint32(1)
4343
tickerPendingNodesBuffer = 16
@@ -237,13 +237,20 @@ func (nb *nodeBatcher) add(spans []*tracepb.Span) {
237237

238238
// cutBatch cuts the provided batch, and sets a new batch on this nodeBatcher
239239
func (nb *nodeBatcher) cutBatch(b *batch) {
240+
initialCap := b.getCurrentCap()
241+
currSize := b.getCurrentItemCount()
242+
reducedCap := initialCap >> 1
243+
for currSize < reducedCap && reducedCap > initialBatchCapacity {
244+
initialCap = reducedCap
245+
reducedCap = reducedCap >> 1
246+
}
240247
// atomic.CompareAndSwapPointer only takes unsafe.Pointer interfaces. We do not use unsafe
241248
// to skirt around the golang type system.
242249
currBatchPtr := (*unsafe.Pointer)(unsafe.Pointer(&nb.currBatch))
243250
swapped := atomic.CompareAndSwapPointer(
244251
currBatchPtr,
245252
unsafe.Pointer(b),
246-
unsafe.Pointer(newBatch(b.getCurrentCap(), nb.sendBatchSize)),
253+
unsafe.Pointer(newBatch(initialCap, nb.sendBatchSize)),
247254
)
248255
// Since we are doing an atomic compare and swap, this batch will only be sent once.
249256
if swapped {

0 commit comments

Comments
 (0)