Skip to content

Commit 5039ae5

Browse files
stuartp44Brend-SmitsCopilotnpalmCopilot
authored
fix: job retry mechanism not triggering (#4961)
This pull request adds comprehensive tests for the retry mechanism in the `scaleUp` functionality and **re**introduces the `publishRetryMessage` call to the scale-up process. The tests ensure that the retry logic works correctly under various scenarios, such as when jobs are queued, when the maximum number of runners is reached, and when queue checks are disabled. **Testing and Retry Mechanism Enhancements:** * Added a new test suite "Retry mechanism tests" in `scale-up.test.ts` to cover scenarios where `publishRetryMessage` should be called, including: when jobs are queued, when maximum runners are reached, with correct message structure, and when job queue checks are disabled. **Other code Updates:** * Fixed logic to skip runner creation if no new runners are needed by checking if `newRunners <= 0` instead of comparing counts, improving clarity and correctness. <details><summary>Example scenarios for the above bug</summary> <p> Scenario 1 - Admin sets RUNNERS_MAXIMUM_COUNT=20 - System scales up to 15 active runners - Admin reduces RUNNERS_MAXIMUM_COUNT=10 (cost control, policy change) - Before those 15 runners terminate, new jobs arrive - Bug triggers: newRunners = Math.min(scaleUp, 10-15) = -5 - Code tries to call createRunners({numberOfRunners: -5}) and fails Scenario 2 - RUNNERS_MAXIMUM_COUNT=5 - Someone manually launches 8 EC2 instances with runner tags - New jobs arrive - Bug triggers: newRunners = Math.min(2, 5-8) = -3 - Code tries to call createRunners({numberOfRunners: -3}) and fails Scenario 3 - Admin sets RUNNERS_MAXIMUM_COUNT=20 - System scales up to 15 active runners - Admin reduces RUNNERS_MAXIMUM_COUNT=10 (cost control, policy change) - Before those 15 runners terminate, new jobs arrive - Bug triggers: newRunners = Math.min(scaleUp, 10-15) = -5 - Code tries to call createRunners({numberOfRunners: -5}) and fails </p> </details> We tested this in our staging environment and verified it's working. Closes #4960 --------- Co-authored-by: Brend Smits <brend.smits@philips.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: npalm <11609620+npalm@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Brend-Smits <15904543+Brend-Smits@users.noreply.github.com> Co-authored-by: Niek Palm <npalm@users.noreply.github.com>
1 parent e80baa2 commit 5039ae5

File tree

2 files changed

+196
-6
lines changed

2 files changed

+196
-6
lines changed

lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { createRunner, listEC2Runners } from './../aws/runners';
1010
import { RunnerInputParameters } from './../aws/runners.d';
1111
import * as scaleUpModule from './scale-up';
1212
import { getParameter } from '@aws-github-runner/aws-ssm-util';
13+
import { publishRetryMessage } from './job-retry';
1314
import { describe, it, expect, beforeEach, vi } from 'vitest';
1415
import type { Octokit } from '@octokit/rest';
1516

@@ -33,6 +34,7 @@ const mockCreateRunner = vi.mocked(createRunner);
3334
const mockListRunners = vi.mocked(listEC2Runners);
3435
const mockSSMClient = mockClient(SSMClient);
3536
const mockSSMgetParameter = vi.mocked(getParameter);
37+
const mockPublishRetryMessage = vi.mocked(publishRetryMessage);
3638

3739
vi.mock('@octokit/rest', () => ({
3840
Octokit: vi.fn().mockImplementation(function () {
@@ -63,6 +65,11 @@ vi.mock('@aws-github-runner/aws-ssm-util', async () => {
6365
};
6466
});
6567

68+
vi.mock('./job-retry', () => ({
69+
publishRetryMessage: vi.fn(),
70+
checkAndRetryJob: vi.fn(),
71+
}));
72+
6673
export type RunnerType = 'ephemeral' | 'non-ephemeral';
6774

6875
// for ephemeral and non-ephemeral runners
@@ -1680,6 +1687,171 @@ describe('scaleUp with Github Data Residency', () => {
16801687
});
16811688
});
16821689

1690+
describe('Retry mechanism tests', () => {
1691+
beforeEach(() => {
1692+
process.env.ENABLE_ORGANIZATION_RUNNERS = 'true';
1693+
process.env.ENABLE_EPHEMERAL_RUNNERS = 'true';
1694+
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
1695+
process.env.RUNNERS_MAXIMUM_COUNT = '10';
1696+
expectedRunnerParams = { ...EXPECTED_RUNNER_PARAMS };
1697+
mockSSMClient.reset();
1698+
});
1699+
1700+
const createTestMessages = (
1701+
count: number,
1702+
overrides: Partial<scaleUpModule.ActionRequestMessageSQS>[] = [],
1703+
): scaleUpModule.ActionRequestMessageSQS[] => {
1704+
return Array.from({ length: count }, (_, i) => ({
1705+
...TEST_DATA_SINGLE,
1706+
id: i + 1,
1707+
messageId: `message-${i + 1}`,
1708+
...overrides[i],
1709+
}));
1710+
};
1711+
1712+
it('calls publishRetryMessage for each valid message when job is queued', async () => {
1713+
const messages = createTestMessages(3);
1714+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef']); // Create all requested runners
1715+
1716+
await scaleUpModule.scaleUp(messages);
1717+
1718+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(3);
1719+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1720+
1,
1721+
expect.objectContaining({
1722+
id: 1,
1723+
messageId: 'message-1',
1724+
}),
1725+
);
1726+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1727+
2,
1728+
expect.objectContaining({
1729+
id: 2,
1730+
messageId: 'message-2',
1731+
}),
1732+
);
1733+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1734+
3,
1735+
expect.objectContaining({
1736+
id: 3,
1737+
messageId: 'message-3',
1738+
}),
1739+
);
1740+
});
1741+
1742+
it('does not call publishRetryMessage when job is not queued', async () => {
1743+
mockOctokit.actions.getJobForWorkflowRun.mockImplementation((params) => {
1744+
const isQueued = params.job_id === 1; // Only job 1 is queued
1745+
return {
1746+
data: {
1747+
status: isQueued ? 'queued' : 'completed',
1748+
},
1749+
};
1750+
});
1751+
1752+
const messages = createTestMessages(3);
1753+
1754+
await scaleUpModule.scaleUp(messages);
1755+
1756+
// Only message with id 1 should trigger retry
1757+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(1);
1758+
expect(mockPublishRetryMessage).toHaveBeenCalledWith(
1759+
expect.objectContaining({
1760+
id: 1,
1761+
messageId: 'message-1',
1762+
}),
1763+
);
1764+
});
1765+
1766+
it('does not call publishRetryMessage when maximum runners is reached and messages are marked invalid', async () => {
1767+
process.env.RUNNERS_MAXIMUM_COUNT = '0'; // No runners can be created
1768+
1769+
const messages = createTestMessages(2);
1770+
1771+
await scaleUpModule.scaleUp(messages);
1772+
1773+
// Verify listEC2Runners is called to check current runner count
1774+
expect(listEC2Runners).toHaveBeenCalledWith({
1775+
environment: 'unit-test-environment',
1776+
runnerType: 'Org',
1777+
runnerOwner: TEST_DATA_SINGLE.repositoryOwner,
1778+
});
1779+
1780+
// publishRetryMessage should NOT be called because messages are marked as invalid
1781+
// Invalid messages go back to the SQS queue and will be retried there
1782+
expect(mockPublishRetryMessage).not.toHaveBeenCalled();
1783+
expect(createRunner).not.toHaveBeenCalled();
1784+
});
1785+
1786+
it('calls publishRetryMessage with correct message structure including retry counter', async () => {
1787+
const message = {
1788+
...TEST_DATA_SINGLE,
1789+
messageId: 'test-message-id',
1790+
retryCounter: 2,
1791+
};
1792+
1793+
await scaleUpModule.scaleUp([message]);
1794+
1795+
expect(mockPublishRetryMessage).toHaveBeenCalledWith(
1796+
expect.objectContaining({
1797+
id: message.id,
1798+
messageId: 'test-message-id',
1799+
retryCounter: 2,
1800+
}),
1801+
);
1802+
});
1803+
1804+
it('calls publishRetryMessage when ENABLE_JOB_QUEUED_CHECK is false', async () => {
1805+
process.env.ENABLE_JOB_QUEUED_CHECK = 'false';
1806+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890']); // Create all requested runners
1807+
1808+
const messages = createTestMessages(2);
1809+
1810+
await scaleUpModule.scaleUp(messages);
1811+
1812+
// Should always call publishRetryMessage when queue check is disabled
1813+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(2);
1814+
expect(mockOctokit.actions.getJobForWorkflowRun).not.toHaveBeenCalled();
1815+
});
1816+
1817+
it('calls publishRetryMessage for each message in a multi-runner scenario', async () => {
1818+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef', 'i-11111', 'i-22222']); // Create all requested runners
1819+
const messages = createTestMessages(5);
1820+
1821+
await scaleUpModule.scaleUp(messages);
1822+
1823+
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(5);
1824+
messages.forEach((msg, index) => {
1825+
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1826+
index + 1,
1827+
expect.objectContaining({
1828+
id: msg.id,
1829+
messageId: msg.messageId,
1830+
}),
1831+
);
1832+
});
1833+
});
1834+
1835+
it('calls publishRetryMessage after runner creation', async () => {
1836+
const messages = createTestMessages(1);
1837+
mockCreateRunner.mockResolvedValue(['i-12345']); // Create the requested runner
1838+
1839+
const callOrder: string[] = [];
1840+
mockPublishRetryMessage.mockImplementation(() => {
1841+
callOrder.push('publishRetryMessage');
1842+
return Promise.resolve();
1843+
});
1844+
mockCreateRunner.mockImplementation(async () => {
1845+
callOrder.push('createRunner');
1846+
return ['i-12345'];
1847+
});
1848+
1849+
await scaleUpModule.scaleUp(messages);
1850+
1851+
expect(callOrder).toEqual(['createRunner', 'publishRetryMessage']);
1852+
});
1853+
});
1854+
16831855
function defaultOctokitMockImpl() {
16841856
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
16851857
data: {

lambdas/functions/control-plane/src/scale-runners/scale-up.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { createGithubAppAuth, createGithubInstallationAuth, createOctokitClient
77
import { createRunner, listEC2Runners, tag } from './../aws/runners';
88
import { RunnerInputParameters } from './../aws/runners.d';
99
import { metricGitHubAppRateLimit } from '../github/rate-limit';
10+
import { publishRetryMessage } from './job-retry';
1011

1112
const logger = createChildLogger('scale-up');
1213

@@ -315,7 +316,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
315316
};
316317

317318
const validMessages = new Map<string, MessagesWithClient>();
318-
const invalidMessages: string[] = [];
319+
const rejectedMessageIds = new Set<string>();
319320
for (const payload of payloads) {
320321
const { eventType, messageId, repositoryName, repositoryOwner } = payload;
321322
if (ephemeralEnabled && eventType !== 'workflow_job') {
@@ -324,7 +325,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
324325
{ eventType, messageId },
325326
);
326327

327-
invalidMessages.push(messageId);
328+
rejectedMessageIds.add(messageId);
328329

329330
continue;
330331
}
@@ -379,6 +380,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
379380
for (const [group, { githubInstallationClient, messages }] of validMessages.entries()) {
380381
// Work out how much we want to scale up by.
381382
let scaleUp = 0;
383+
const queuedMessages: ActionRequestMessageSQS[] = [];
382384

383385
for (const message of messages) {
384386
const messageLogger = logger.createChild({
@@ -397,6 +399,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
397399
}
398400

399401
scaleUp++;
402+
queuedMessages.push(message);
400403
}
401404

402405
if (scaleUp === 0) {
@@ -432,11 +435,18 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
432435
if (ephemeralEnabled) {
433436
// This removes `missingInstanceCount` items from the start of the array
434437
// so that, if we retry more messages later, we pick fresh ones.
435-
invalidMessages.push(...messages.splice(0, missingInstanceCount).map(({ messageId }) => messageId));
438+
const removedMessages = messages.splice(0, missingInstanceCount);
439+
removedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
436440
}
437441

438442
// No runners will be created, so skip calling the EC2 API.
439-
if (missingInstanceCount === scaleUp) {
443+
if (newRunners <= 0) {
444+
// Publish retry messages for messages that are not rejected
445+
for (const message of queuedMessages) {
446+
if (!rejectedMessageIds.has(message.messageId)) {
447+
await publishRetryMessage(message as ActionRequestMessageRetry);
448+
}
449+
}
440450
continue;
441451
}
442452
}
@@ -490,11 +500,19 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
490500
failedInstanceCount,
491501
});
492502

493-
invalidMessages.push(...messages.slice(0, failedInstanceCount).map(({ messageId }) => messageId));
503+
const failedMessages = messages.slice(0, failedInstanceCount);
504+
failedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
505+
}
506+
507+
// Publish retry messages for messages that are not rejected
508+
for (const message of queuedMessages) {
509+
if (!rejectedMessageIds.has(message.messageId)) {
510+
await publishRetryMessage(message as ActionRequestMessageRetry);
511+
}
494512
}
495513
}
496514

497-
return invalidMessages;
515+
return Array.from(rejectedMessageIds);
498516
}
499517

500518
export function getGitHubEnterpriseApiUrl() {

0 commit comments

Comments
 (0)