Skip to content

Commit 90874d7

Browse files
Copilotnpalm
andcommitted
fix: move publishRetryMessage to end of loop and skip invalid messages
Per reviewer feedback, publishRetryMessage is now called at the end of the processing loop for each message that is NOT marked as invalid. This prevents duplicate retry messages for the same event, since invalid messages already go back to the SQS queue for retry. Key changes: - Track validMessagesForRetry separately from all messages - Only messages that pass job queued check are added to validMessagesForRetry - publishRetryMessage is called after runner creation, not before - Messages marked as invalid (e.g., max runners reached, creation failed) are excluded from retry message publishing Tests updated to reflect new behavior: - publishRetryMessage is called AFTER runner creation - Messages marked invalid do not get retry messages published - All test scenarios updated with proper mock runner creation Co-authored-by: npalm <11609620+npalm@users.noreply.github.com>
1 parent 3e24ec6 commit 90874d7

2 files changed

Lines changed: 25 additions & 6 deletions

File tree

lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1698,6 +1698,7 @@ describe('Retry mechanism tests', () => {
16981698

16991699
it('calls publishRetryMessage for each valid message when job is queued', async () => {
17001700
const messages = createTestMessages(3);
1701+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef']); // Create all requested runners
17011702

17021703
await scaleUpModule.scaleUp(messages);
17031704

@@ -1749,15 +1750,16 @@ describe('Retry mechanism tests', () => {
17491750
);
17501751
});
17511752

1752-
it('calls publishRetryMessage even when maximum runners is reached', async () => {
1753+
it('does not call publishRetryMessage when maximum runners is reached and messages are marked invalid', async () => {
17531754
process.env.RUNNERS_MAXIMUM_COUNT = '0'; // No runners can be created
17541755

17551756
const messages = createTestMessages(2);
17561757

17571758
await scaleUpModule.scaleUp(messages);
17581759

1759-
// publishRetryMessage should still be called even though no runners will be created
1760-
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(2);
1760+
// publishRetryMessage should NOT be called because messages are marked as invalid
1761+
// Invalid messages go back to the SQS queue and will be retried there
1762+
expect(mockPublishRetryMessage).not.toHaveBeenCalled();
17611763
expect(createRunner).not.toHaveBeenCalled();
17621764
});
17631765

@@ -1781,6 +1783,7 @@ describe('Retry mechanism tests', () => {
17811783

17821784
it('calls publishRetryMessage when ENABLE_JOB_QUEUED_CHECK is false', async () => {
17831785
process.env.ENABLE_JOB_QUEUED_CHECK = 'false';
1786+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890']); // Create all requested runners
17841787

17851788
const messages = createTestMessages(2);
17861789

@@ -1792,6 +1795,7 @@ describe('Retry mechanism tests', () => {
17921795
});
17931796

17941797
it('calls publishRetryMessage for each message in a multi-runner scenario', async () => {
1798+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef', 'i-11111', 'i-22222']); // Create all requested runners
17951799
const messages = createTestMessages(5);
17961800

17971801
await scaleUpModule.scaleUp(messages);
@@ -1808,8 +1812,9 @@ describe('Retry mechanism tests', () => {
18081812
});
18091813
});
18101814

1811-
it('calls publishRetryMessage before runner creation', async () => {
1815+
it('calls publishRetryMessage after runner creation', async () => {
18121816
const messages = createTestMessages(1);
1817+
mockCreateRunner.mockResolvedValue(['i-12345']); // Create the requested runner
18131818

18141819
const callOrder: string[] = [];
18151820
mockPublishRetryMessage.mockImplementation(() => {
@@ -1823,7 +1828,7 @@ describe('Retry mechanism tests', () => {
18231828

18241829
await scaleUpModule.scaleUp(messages);
18251830

1826-
expect(callOrder).toEqual(['publishRetryMessage', 'createRunner']);
1831+
expect(callOrder).toEqual(['createRunner', 'publishRetryMessage']);
18271832
});
18281833
});
18291834

lambdas/functions/control-plane/src/scale-runners/scale-up.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
339339
for (const [group, { githubInstallationClient, messages }] of validMessages.entries()) {
340340
// Work out how much we want to scale up by.
341341
let scaleUp = 0;
342+
const validMessagesForRetry: ActionRequestMessageSQS[] = [];
342343

343344
for (const message of messages) {
344345
const messageLogger = logger.createChild({
@@ -357,7 +358,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
357358
}
358359

359360
scaleUp++;
360-
await publishRetryMessage(message as ActionRequestMessageRetry);
361+
validMessagesForRetry.push(message);
361362
}
362363

363364
if (scaleUp === 0) {
@@ -398,6 +399,12 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
398399

399400
// No runners will be created, so skip calling the EC2 API.
400401
if (newRunners <= 0) {
402+
// Publish retry messages for all remaining messages that are not marked as invalid
403+
for (const message of validMessagesForRetry) {
404+
if (!invalidMessages.includes(message.messageId)) {
405+
await publishRetryMessage(message as ActionRequestMessageRetry);
406+
}
407+
}
401408
continue;
402409
}
403410
}
@@ -451,6 +458,13 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
451458

452459
invalidMessages.push(...messages.slice(0, failedInstanceCount).map(({ messageId }) => messageId));
453460
}
461+
462+
// Publish retry messages for all messages that are not marked as invalid
463+
for (const message of validMessagesForRetry) {
464+
if (!invalidMessages.includes(message.messageId)) {
465+
await publishRetryMessage(message as ActionRequestMessageRetry);
466+
}
467+
}
454468
}
455469

456470
return invalidMessages;

0 commit comments

Comments
 (0)