Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/core/consumer/RunMQConsumerCreator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import {RunMQFailedMessageRejecterProcessor} from "@src/core/consumer/processors/RunMQFailedMessageRejecterProcessor";
import {RunMQRetriesCheckerProcessor} from "@src/core/consumer/processors/RunMQRetriesCheckerProcessor";
import {RunMQFailureLoggerProcessor} from "@src/core/consumer/processors/RunMQFailureLoggerProcessor";
import {RunMQSchemaFailureProcessor} from "@src/core/consumer/processors/RunMQSchemaFailureProcessor";
import {RunMQBaseProcessor} from "@src/core/consumer/processors/RunMQBaseProcessor";
import {RunMQExceptionLoggerProcessor} from "@src/core/consumer/processors/RunMQExceptionLoggerProcessor";
import {RunMQLogger} from "@src/core/logging/RunMQLogger";
Expand Down Expand Up @@ -88,10 +89,15 @@ export class RunMQConsumerCreator {
new RunMQFailedMessageRejecterProcessor(
new RunMQRetriesCheckerProcessor(
new RunMQFailureLoggerProcessor(
new RunMQBaseProcessor<T>(
consumerConfiguration.processor,
new RunMQSchemaFailureProcessor(
new RunMQBaseProcessor<T>(
consumerConfiguration.processor,
consumerConfiguration.processorConfig,
new DefaultDeserializer<T>()
),
consumerConfiguration.processorConfig,
new DefaultDeserializer<T>()
DLQPublisher,
this.logger
),
this.logger
),
Expand Down
82 changes: 82 additions & 0 deletions src/core/consumer/processors/RunMQSchemaFailureProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import {RunMQConsumer, RunMQProcessorConfiguration, RunMQPublisher} from "@src/types";
import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage";
import {RunMQLogger} from "@src/core/logging/RunMQLogger";
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {RunMQMessage} from "@src/core/message/RunMQMessage";
import {RunMQSchemaValidationError} from "@src/core/serializers/deserializer/DefaultDeserializer";

/**
* Honors the configured `failureStrategy` for schema-validation failures.
*
* Without this processor, a schema-validation error takes the same path as
* any handler error: it gets retried `attempts` times before reaching the
* DLQ. That's wasteful — a malformed message will never become valid by
* retrying it.
*
* When `messageSchema.failureStrategy === 'dlq'`, this processor catches
* `RunMQSchemaValidationError` from the deserializer and routes the message
* straight to the DLQ. The original message is then acked (we return true,
* letting `RunMQSucceededMessageAcknowledgerProcessor` do the ack).
*
* Other errors (handler exceptions, JSON parse errors) propagate up the
* chain unchanged.
*/
export class RunMQSchemaFailureProcessor implements RunMQConsumer {
constructor(
private readonly consumer: RunMQConsumer,
private readonly config: RunMQProcessorConfiguration,
private readonly DLQPublisher: RunMQPublisher,
private readonly logger: RunMQLogger,
) {
}

public async consume(message: RabbitMQMessage): Promise<boolean> {
try {
return await this.consumer.consume(message);
} catch (e: unknown) {
if (this.shouldRouteToDLQ(e)) {
this.logger.warn('Schema validation failed — routing message to DLQ.', {
correlationId: message.correlationId,
error: e instanceof RunMQSchemaValidationError ? e.error : undefined,
});
this.routeToDLQ(message);
return true;
}
throw e;
}
}

private shouldRouteToDLQ(e: unknown): boolean {
if (!(e instanceof RunMQSchemaValidationError)) return false;
return this.config.messageSchema?.failureStrategy === 'dlq';
}

private routeToDLQ(message: RabbitMQMessage) {
const dlqMessage = new RabbitMQMessage(
this.extractOriginalPayload(message),
message.id,
message.correlationId,
message.channel,
message.amqpMessage,
message.headers,
);
this.DLQPublisher.publish(
ConsumerCreatorUtils.getDLQTopicName(this.config.name),
dlqMessage,
);
}

private extractOriginalPayload(message: RabbitMQMessage): any {
if (typeof message.message === 'string') {
try {
const parsed = JSON.parse(message.message);
if (RunMQMessage.isValid(parsed)) {
return parsed.message;
}
} catch {
// Not valid JSON, use as-is
}
}
return message.message;
}
}
23 changes: 14 additions & 9 deletions tests/e2e/RunMQ.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,35 @@ describe('RunMQ E2E Tests', () => {

describe('processing', () => {
it('Should end up in DLQ when message is not meeting the schema validation', async () => {
// simpleNoSchema's default messageSchema has failureStrategy: 'dlq',
// so per issue #23 a schema-validation failure must short-circuit
// straight to the DLQ — no retries, no handler invocation.
const configuration = RunMQProcessorConfigurationExample.simpleNoSchema()
const testingConnection = new RabbitMQClientAdapter(validConfig);
const channel = await testingConnection.getChannel();
await ChannelTestHelpers.deleteQueue(channel, configuration.name);

const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
await runMQ.process<TestingMessage>("ad.played", configuration,
let handlerCalls = 0;
await runMQ.process<TestingMessage>("runmq.e2e.ad.played", configuration,
(): Promise<void> => {
handlerCalls++;
return Promise.resolve();
}
)

channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'ad.played', MessageTestUtils.buffer(MessageExample.person()))
channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'runmq.e2e.ad.played', MessageTestUtils.buffer(MessageExample.person()))
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1)
await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0)
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0)

await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message processing failed', configuration.attempts as number)
await LoggerTestHelpers.assertLoggedWithCountAndParameters(MockedRunMQLogger.error, 'Message reached maximum attempts. Moving to dead-letter queue.', {
attempts: configuration.attempts,
max: configuration.attempts,
},
1
)
// Direct-to-DLQ path: the user handler is never reached and the
// retry pipeline is never engaged, so neither the per-attempt
// failure log nor the max-retries log should appear.
expect(handlerCalls).toBe(0);
await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.warn, 'Schema validation failed — routing message to DLQ.', 1)
await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message processing failed', 0)
await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message reached maximum attempts. Moving to dead-letter queue.', 0)
await runMQ.disconnect();
await testingConnection.disconnect();
})
Expand Down
115 changes: 115 additions & 0 deletions tests/e2e/RunMQ.schemaFailure.e2e.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import {RunMQ} from '@src/core/RunMQ';
import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter";
import {Constants} from "@src/core/constants";
import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers";
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger";
import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample";
import {RunMQProcessorConfigurationExample, MessageSchemaExample} from "@tests/Examples/RunMQProcessorConfigurationExample";
import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample";
import {MessageTestUtils} from "@tests/helpers/MessageTestUtils";
import {RunMQMessage, RunMQMessageMeta} from "@src/core/message/RunMQMessage";

/**
* Integration tests for issue #23: schema-validation `failureStrategy: 'dlq'`
* must route invalid messages straight to the DLQ on first delivery, with no
* retries. A malformed message will never become valid by retrying — that's
* just dead time and broker churn.
*/
describe('RunMQ Schema Failure Strategy E2E', () => {
const validConfig = RunMQConnectionConfigExample.valid();

beforeEach(() => {
jest.clearAllMocks();
});

it('routes schema-invalid message straight to DLQ on first delivery (no retries)', async () => {
const configuration = RunMQProcessorConfigurationExample.random(
'schema_dlq_strategy',
1,
5, // 5 attempts — the test must prove these aren't used
500, // 500ms retry delay — gives plenty of time to observe retries if they happen
MessageSchemaExample.simplePersonSchema(),
);

const testingConnection = new RabbitMQClientAdapter(validConfig);
const channel = await testingConnection.getChannel();
await ChannelTestHelpers.deleteQueue(channel, configuration.name);

const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
let handlerCalls = 0;
await runMQ.process('schemaFailure.user.created', configuration, () => {
handlerCalls++;
return Promise.resolve();
});

// Publish a message that is structurally a RunMQMessage but fails the
// person schema (age is a string, not an integer).
const invalidPayload = JSON.stringify(new RunMQMessage(
{name: 'Alice', age: 'not-a-number', email: 'a@b.com'} as any,
new RunMQMessageMeta('msg-id', Date.now(), 'corr-id'),
));
channel.publish(
Constants.ROUTER_EXCHANGE_NAME,
'schemaFailure.user.created',
Buffer.from(invalidPayload),
);

// Wait long enough that retries WOULD have fired (5 × 500ms = 2.5s).
// If failureStrategy is honoured the message lands in DLQ within the
// first ~200ms and stays there.
await RunMQUtils.delay(800);

// The user handler must NEVER have been invoked.
expect(handlerCalls).toBe(0);
// DLQ holds exactly the one message.
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1);
// Main queue is empty.
await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0);
// Retry-delay queue is empty — no retry attempts were scheduled.
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0);

await runMQ.disconnect();
await testingConnection.disconnect();
});

it('still retries handler errors normally (the fix only short-circuits schema errors)', async () => {
const configuration = RunMQProcessorConfigurationExample.random(
'schema_handler_retries',
1,
3, // 3 attempts
100,
MessageSchemaExample.simplePersonSchema(), // Schema configured, but message will be VALID
);

const testingConnection = new RabbitMQClientAdapter(validConfig);
const channel = await testingConnection.getChannel();
await ChannelTestHelpers.deleteQueue(channel, configuration.name);

const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
let handlerCalls = 0;
await runMQ.process('schemaFailure.user.created', configuration, () => {
handlerCalls++;
throw new Error('handler intentionally fails');
});

// Publish a SCHEMA-VALID message — the handler will be the thing failing.
channel.publish(
Constants.ROUTER_EXCHANGE_NAME,
'schemaFailure.user.created',
MessageTestUtils.buffer(RunMQMessageExample.person()),
);

await RunMQUtils.delay(800);

// Handler invoked 3 times (full retry budget), then DLQ'd.
expect(handlerCalls).toBe(3);
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1);
await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0);
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0);

await runMQ.disconnect();
await testingConnection.disconnect();
});
});
Loading
Loading