Skip to content

Commit 6698af4

Browse files
stuartp44Brend-SmitsCopilotnpalmCopilot
committed
fix: job retry mechanism not triggering (#4961)
This pull request adds comprehensive tests for the retry mechanism in the `scaleUp` functionality and **re**introduces the `publishRetryMessage` call to the scale-up process. The tests ensure that the retry logic works correctly under various scenarios, such as when jobs are queued, when the maximum number of runners is reached, and when queue checks are disabled. **Testing and Retry Mechanism Enhancements:** * Added a new test suite "Retry mechanism tests" in `scale-up.test.ts` to cover scenarios where `publishRetryMessage` should be called, including: when jobs are queued, when maximum runners are reached, with correct message structure, and when job queue checks are disabled. **Other code Updates:** * Fixed logic to skip runner creation if no new runners are needed by checking if `newRunners <= 0` instead of comparing counts, improving clarity and correctness. <details><summary>Example scenarios for the above bug</summary> <p> Scenario 1 - Admin sets RUNNERS_MAXIMUM_COUNT=20 - System scales up to 15 active runners - Admin reduces RUNNERS_MAXIMUM_COUNT=10 (cost control, policy change) - Before those 15 runners terminate, new jobs arrive - Bug triggers: newRunners = Math.min(scaleUp, 10-15) = -5 - Code tries to call createRunners({numberOfRunners: -5}) and fails Scenario 2 - RUNNERS_MAXIMUM_COUNT=5 - Someone manually launches 8 EC2 instances with runner tags - New jobs arrive - Bug triggers: newRunners = Math.min(2, 5-8) = -3 - Code tries to call createRunners({numberOfRunners: -3}) and fails Scenario 3 - Admin sets RUNNERS_MAXIMUM_COUNT=20 - System scales up to 15 active runners - Admin reduces RUNNERS_MAXIMUM_COUNT=10 (cost control, policy change) - Before those 15 runners terminate, new jobs arrive - Bug triggers: newRunners = Math.min(scaleUp, 10-15) = -5 - Code tries to call createRunners({numberOfRunners: -5}) and fails </p> </details> We tested this in our staging environment and verified it's working. Closes #4960 --------- Co-authored-by: Brend Smits <brend.smits@philips.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: npalm <11609620+npalm@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Brend-Smits <15904543+Brend-Smits@users.noreply.github.com> Co-authored-by: Niek Palm <npalm@users.noreply.github.com>
1 parent 06f1841 commit 6698af4

File tree

2 files changed

+196
-6
lines changed

2 files changed

+196
-6
lines changed

lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { createRunner, listEC2Runners } from './../aws/runners';
1010
import { RunnerInputParameters } from './../aws/runners.d';
1111
import * as scaleUpModule from './scale-up';
1212
import { getParameter } from '@aws-github-runner/aws-ssm-util';
13+
import { publishRetryMessage } from './job-retry';
1314
import { describe, it, expect, beforeEach, vi } from 'vitest';
1415
import type { Octokit } from '@octokit/rest';
1516

@@ -33,6 +34,7 @@ const mockCreateRunner = vi.mocked(createRunner);
3334
const mockListRunners = vi.mocked(listEC2Runners);
3435
const mockSSMClient = mockClient(SSMClient);
3536
const mockSSMgetParameter = vi.mocked(getParameter);
37+
const mockPublishRetryMessage = vi.mocked(publishRetryMessage);
3638

3739
vi.mock('@octokit/rest', () => ({
3840
Octokit: vi.fn().mockImplementation(function () {
@@ -63,6 +65,11 @@ vi.mock('@aws-github-runner/aws-ssm-util', async () => {
6365
};
6466
});
6567

68+
vi.mock('./job-retry', () => ({
69+
publishRetryMessage: vi.fn(),
70+
checkAndRetryJob: vi.fn(),
71+
}));
72+
6673
export type RunnerType = 'ephemeral' | 'non-ephemeral';
6774

6875
// for ephemeral and non-ephemeral runners
@@ -1846,6 +1853,171 @@ describe('scaleUp with Github Data Residency', () => {
18461853
});
18471854
});
18481855

1856+
describe('Retry mechanism tests', () => {
1857+
beforeEach(() => {
1858+
process.env.ENABLE_ORGANIZATION_RUNNERS = 'true';
1859+
process.env.ENABLE_EPHEMERAL_RUNNERS = 'true';
1860+
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
1861+
process.env.RUNNERS_MAXIMUM_COUNT = '10';
1862+
expectedRunnerParams = { ...EXPECTED_RUNNER_PARAMS };
1863+
mockSSMClient.reset();
1864+
});
1865+
1866+
const createTestMessages = (
1867+
count: number,
1868+
overrides: Partial<scaleUpModule.ActionRequestMessageSQS>[] = [],
1869+
): scaleUpModule.ActionRequestMessageSQS[] => {
1870+
return Array.from({ length: count }, (_, i) => ({
1871+
...TEST_DATA_SINGLE,
1872+
id: i + 1,
1873+
messageId: `message-${i + 1}`,
1874+
...overrides[i],
1875+
}));
1876+
};
1877+
1878+
it('calls publishRetryMessage for each valid message when job is queued', async () => {
1879+
const messages = createTestMessages(3);
1880+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef']); // Create all requested runners
1881+
1882+
await scaleUpModule.scaleUp(messages);
1883+
1884+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(3);
1885+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1886+
1,
1887+
expect.objectContaining({
1888+
id: 1,
1889+
messageId: 'message-1',
1890+
}),
1891+
);
1892+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1893+
2,
1894+
expect.objectContaining({
1895+
id: 2,
1896+
messageId: 'message-2',
1897+
}),
1898+
);
1899+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1900+
3,
1901+
expect.objectContaining({
1902+
id: 3,
1903+
messageId: 'message-3',
1904+
}),
1905+
);
1906+
});
1907+
1908+
it('does not call publishRetryMessage when job is not queued', async () => {
1909+
mockOctokit.actions.getJobForWorkflowRun.mockImplementation((params) => {
1910+
const isQueued = params.job_id === 1; // Only job 1 is queued
1911+
return {
1912+
data: {
1913+
status: isQueued ? 'queued' : 'completed',
1914+
},
1915+
};
1916+
});
1917+
1918+
const messages = createTestMessages(3);
1919+
1920+
await scaleUpModule.scaleUp(messages);
1921+
1922+
// Only message with id 1 should trigger retry
1923+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(1);
1924+
expect(mockPublishRetryMessage).toHaveBeenCalledWith(
1925+
expect.objectContaining({
1926+
id: 1,
1927+
messageId: 'message-1',
1928+
}),
1929+
);
1930+
});
1931+
1932+
it('does not call publishRetryMessage when maximum runners is reached and messages are marked invalid', async () => {
1933+
process.env.RUNNERS_MAXIMUM_COUNT = '0'; // No runners can be created
1934+
1935+
const messages = createTestMessages(2);
1936+
1937+
await scaleUpModule.scaleUp(messages);
1938+
1939+
// Verify listEC2Runners is called to check current runner count
1940+
expect(listEC2Runners).toHaveBeenCalledWith({
1941+
environment: 'unit-test-environment',
1942+
runnerType: 'Org',
1943+
runnerOwner: TEST_DATA_SINGLE.repositoryOwner,
1944+
});
1945+
1946+
// publishRetryMessage should NOT be called because messages are marked as invalid
1947+
// Invalid messages go back to the SQS queue and will be retried there
1948+
expect(mockPublishRetryMessage).not.toHaveBeenCalled();
1949+
expect(createRunner).not.toHaveBeenCalled();
1950+
});
1951+
1952+
it('calls publishRetryMessage with correct message structure including retry counter', async () => {
1953+
const message = {
1954+
...TEST_DATA_SINGLE,
1955+
messageId: 'test-message-id',
1956+
retryCounter: 2,
1957+
};
1958+
1959+
await scaleUpModule.scaleUp([message]);
1960+
1961+
expect(mockPublishRetryMessage).toHaveBeenCalledWith(
1962+
expect.objectContaining({
1963+
id: message.id,
1964+
messageId: 'test-message-id',
1965+
retryCounter: 2,
1966+
}),
1967+
);
1968+
});
1969+
1970+
it('calls publishRetryMessage when ENABLE_JOB_QUEUED_CHECK is false', async () => {
1971+
process.env.ENABLE_JOB_QUEUED_CHECK = 'false';
1972+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890']); // Create all requested runners
1973+
1974+
const messages = createTestMessages(2);
1975+
1976+
await scaleUpModule.scaleUp(messages);
1977+
1978+
// Should always call publishRetryMessage when queue check is disabled
1979+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(2);
1980+
expect(mockOctokit.actions.getJobForWorkflowRun).not.toHaveBeenCalled();
1981+
});
1982+
1983+
it('calls publishRetryMessage for each message in a multi-runner scenario', async () => {
1984+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef', 'i-11111', 'i-22222']); // Create all requested runners
1985+
const messages = createTestMessages(5);
1986+
1987+
await scaleUpModule.scaleUp(messages);
1988+
1989+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(5);
1990+
messages.forEach((msg, index) => {
1991+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1992+
index + 1,
1993+
expect.objectContaining({
1994+
id: msg.id,
1995+
messageId: msg.messageId,
1996+
}),
1997+
);
1998+
});
1999+
});
2000+
2001+
it('calls publishRetryMessage after runner creation', async () => {
2002+
const messages = createTestMessages(1);
2003+
mockCreateRunner.mockResolvedValue(['i-12345']); // Create the requested runner
2004+
2005+
const callOrder: string[] = [];
2006+
mockPublishRetryMessage.mockImplementation(() => {
2007+
callOrder.push('publishRetryMessage');
2008+
return Promise.resolve();
2009+
});
2010+
mockCreateRunner.mockImplementation(async () => {
2011+
callOrder.push('createRunner');
2012+
return ['i-12345'];
2013+
});
2014+
2015+
await scaleUpModule.scaleUp(messages);
2016+
2017+
expect(callOrder).toEqual(['createRunner', 'publishRetryMessage']);
2018+
});
2019+
});
2020+
18492021
function defaultOctokitMockImpl() {
18502022
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
18512023
data: {

lambdas/functions/control-plane/src/scale-runners/scale-up.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { createGithubAppAuth, createGithubInstallationAuth, createOctokitClient
77
import { createRunner, listEC2Runners, tag, terminateRunner } from './../aws/runners';
88
import { RunnerInputParameters } from './../aws/runners.d';
99
import { metricGitHubAppRateLimit } from '../github/rate-limit';
10+
import { publishRetryMessage } from './job-retry';
1011

1112
const logger = createChildLogger('scale-up');
1213

@@ -337,7 +338,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
337338
};
338339

339340
const validMessages = new Map<string, MessagesWithClient>();
340-
const invalidMessages: string[] = [];
341+
const rejectedMessageIds = new Set<string>();
341342
for (const payload of payloads) {
342343
const { eventType, messageId, repositoryName, repositoryOwner } = payload;
343344
if (ephemeralEnabled && eventType !== 'workflow_job') {
@@ -346,7 +347,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
346347
{ eventType, messageId },
347348
);
348349

349-
invalidMessages.push(messageId);
350+
rejectedMessageIds.add(messageId);
350351

351352
continue;
352353
}
@@ -401,6 +402,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
401402
for (const [group, { githubInstallationClient, messages }] of validMessages.entries()) {
402403
// Work out how much we want to scale up by.
403404
let scaleUp = 0;
405+
const queuedMessages: ActionRequestMessageSQS[] = [];
404406

405407
for (const message of messages) {
406408
const messageLogger = logger.createChild({
@@ -419,6 +421,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
419421
}
420422

421423
scaleUp++;
424+
queuedMessages.push(message);
422425
}
423426

424427
if (scaleUp === 0) {
@@ -454,11 +457,18 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
454457
if (ephemeralEnabled) {
455458
// This removes `missingInstanceCount` items from the start of the array
456459
// so that, if we retry more messages later, we pick fresh ones.
457-
invalidMessages.push(...messages.splice(0, missingInstanceCount).map(({ messageId }) => messageId));
460+
const removedMessages = messages.splice(0, missingInstanceCount);
461+
removedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
458462
}
459463

460464
// No runners will be created, so skip calling the EC2 API.
461-
if (missingInstanceCount === scaleUp) {
465+
if (newRunners <= 0) {
466+
// Publish retry messages for messages that are not rejected
467+
for (const message of queuedMessages) {
468+
if (!rejectedMessageIds.has(message.messageId)) {
469+
await publishRetryMessage(message as ActionRequestMessageRetry);
470+
}
471+
}
462472
continue;
463473
}
464474
}
@@ -512,11 +522,19 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
512522
failedInstanceCount,
513523
});
514524

515-
invalidMessages.push(...messages.slice(0, failedInstanceCount).map(({ messageId }) => messageId));
525+
const failedMessages = messages.slice(0, failedInstanceCount);
526+
failedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
527+
}
528+
529+
// Publish retry messages for messages that are not rejected
530+
for (const message of queuedMessages) {
531+
if (!rejectedMessageIds.has(message.messageId)) {
532+
await publishRetryMessage(message as ActionRequestMessageRetry);
533+
}
516534
}
517535
}
518536

519-
return invalidMessages;
537+
return Array.from(rejectedMessageIds);
520538
}
521539

522540
export function getGitHubEnterpriseApiUrl() {

0 commit comments

Comments
 (0)