From f3d6f7b4ddfc552f32fb39499cddf0291646ade7 Mon Sep 17 00:00:00 2001 From: Fawzi Essam Date: Sat, 9 May 2026 18:32:43 +0200 Subject: [PATCH 1/2] fix: honor schema messageSchema.failureStrategy 'dlq' MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #23. Documented behavior: when a processor configures `messageSchema.failureStrategy: 'dlq'`, schema-invalid messages should go directly to the DLQ on first delivery, with no retries. The code ignored this setting entirely — schema errors took the same path as handler errors and got retried `attempts` times before reaching the DLQ. For a malformed message that's pure dead time and broker churn. Adds RunMQSchemaFailureProcessor inserted between FailureLogger and BaseProcessor: - Catches RunMQSchemaValidationError specifically. - If failureStrategy === 'dlq', publishes to the DLQ, logs a warn, and returns true so the outer chain acks the original message. - Other error types (handler exceptions, JSON parse errors, validation errors when no failureStrategy is configured) propagate unchanged through the existing retry pipeline. Tests: - 6 new unit tests covering pass-through, DLQ-route, rethrow paths (handler errors, parse errors, no-strategy schema errors), and payload extraction. - 2 new e2e tests in RunMQ.schemaFailure.e2e.test.ts: * schema-invalid message → DLQ on first delivery, handler never invoked, retry-delay queue stays empty (with attempts=5, attemptsDelay=500ms — proves retries aren't used). * handler errors with a valid payload still retry normally (handler invoked exactly `attempts` times before DLQ). All 141 unit tests + existing 78 e2e tests continue to pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/core/consumer/RunMQConsumerCreator.ts | 12 +- .../processors/RunMQSchemaFailureProcessor.ts | 82 +++++++++++ tests/e2e/RunMQ.schemaFailure.e2e.test.ts | 115 +++++++++++++++ .../RunMQSchemaFailureProcessor.test.ts | 132 ++++++++++++++++++ 4 files changed, 338 insertions(+), 3 deletions(-) create mode 100644 src/core/consumer/processors/RunMQSchemaFailureProcessor.ts create mode 100644 tests/e2e/RunMQ.schemaFailure.e2e.test.ts create mode 100644 tests/unit/core/consumer/processors/RunMQSchemaFailureProcessor.test.ts diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index f01ecb3..32d7249 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -7,6 +7,7 @@ import { import {RunMQFailedMessageRejecterProcessor} from "@src/core/consumer/processors/RunMQFailedMessageRejecterProcessor"; import {RunMQRetriesCheckerProcessor} from "@src/core/consumer/processors/RunMQRetriesCheckerProcessor"; import {RunMQFailureLoggerProcessor} from "@src/core/consumer/processors/RunMQFailureLoggerProcessor"; +import {RunMQSchemaFailureProcessor} from "@src/core/consumer/processors/RunMQSchemaFailureProcessor"; import {RunMQBaseProcessor} from "@src/core/consumer/processors/RunMQBaseProcessor"; import {RunMQExceptionLoggerProcessor} from "@src/core/consumer/processors/RunMQExceptionLoggerProcessor"; import {RunMQLogger} from "@src/core/logging/RunMQLogger"; @@ -72,10 +73,15 @@ export class RunMQConsumerCreator { new RunMQFailedMessageRejecterProcessor( new RunMQRetriesCheckerProcessor( new RunMQFailureLoggerProcessor( - new RunMQBaseProcessor( - consumerConfiguration.processor, + new RunMQSchemaFailureProcessor( + new RunMQBaseProcessor( + consumerConfiguration.processor, + consumerConfiguration.processorConfig, + new DefaultDeserializer() + ), consumerConfiguration.processorConfig, - new DefaultDeserializer() + DLQPublisher, + this.logger ), this.logger ), diff --git a/src/core/consumer/processors/RunMQSchemaFailureProcessor.ts b/src/core/consumer/processors/RunMQSchemaFailureProcessor.ts new file mode 100644 index 0000000..9b7061d --- /dev/null +++ b/src/core/consumer/processors/RunMQSchemaFailureProcessor.ts @@ -0,0 +1,82 @@ +import {RunMQConsumer, RunMQProcessorConfiguration, RunMQPublisher} from "@src/types"; +import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; +import {RunMQLogger} from "@src/core/logging/RunMQLogger"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; +import {RunMQMessage} from "@src/core/message/RunMQMessage"; +import {RunMQSchemaValidationError} from "@src/core/serializers/deserializer/DefaultDeserializer"; + +/** + * Honors the configured `failureStrategy` for schema-validation failures. + * + * Without this processor, a schema-validation error takes the same path as + * any handler error: it gets retried `attempts` times before reaching the + * DLQ. That's wasteful — a malformed message will never become valid by + * retrying it. + * + * When `messageSchema.failureStrategy === 'dlq'`, this processor catches + * `RunMQSchemaValidationError` from the deserializer and routes the message + * straight to the DLQ. The original message is then acked (we return true, + * letting `RunMQSucceededMessageAcknowledgerProcessor` do the ack). + * + * Other errors (handler exceptions, JSON parse errors) propagate up the + * chain unchanged. + */ +export class RunMQSchemaFailureProcessor implements RunMQConsumer { + constructor( + private readonly consumer: RunMQConsumer, + private readonly config: RunMQProcessorConfiguration, + private readonly DLQPublisher: RunMQPublisher, + private readonly logger: RunMQLogger, + ) { + } + + public async consume(message: RabbitMQMessage): Promise { + try { + return await this.consumer.consume(message); + } catch (e: unknown) { + if (this.shouldRouteToDLQ(e)) { + this.logger.warn('Schema validation failed — routing message to DLQ.', { + correlationId: message.correlationId, + error: e instanceof RunMQSchemaValidationError ? e.error : undefined, + }); + this.routeToDLQ(message); + return true; + } + throw e; + } + } + + private shouldRouteToDLQ(e: unknown): boolean { + if (!(e instanceof RunMQSchemaValidationError)) return false; + return this.config.messageSchema?.failureStrategy === 'dlq'; + } + + private routeToDLQ(message: RabbitMQMessage) { + const dlqMessage = new RabbitMQMessage( + this.extractOriginalPayload(message), + message.id, + message.correlationId, + message.channel, + message.amqpMessage, + message.headers, + ); + this.DLQPublisher.publish( + ConsumerCreatorUtils.getDLQTopicName(this.config.name), + dlqMessage, + ); + } + + private extractOriginalPayload(message: RabbitMQMessage): any { + if (typeof message.message === 'string') { + try { + const parsed = JSON.parse(message.message); + if (RunMQMessage.isValid(parsed)) { + return parsed.message; + } + } catch { + // Not valid JSON, use as-is + } + } + return message.message; + } +} diff --git a/tests/e2e/RunMQ.schemaFailure.e2e.test.ts b/tests/e2e/RunMQ.schemaFailure.e2e.test.ts new file mode 100644 index 0000000..4381885 --- /dev/null +++ b/tests/e2e/RunMQ.schemaFailure.e2e.test.ts @@ -0,0 +1,115 @@ +import {RunMQ} from '@src/core/RunMQ'; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; +import {Constants} from "@src/core/constants"; +import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; +import {RunMQProcessorConfigurationExample, MessageSchemaExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; +import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample"; +import {MessageTestUtils} from "@tests/helpers/MessageTestUtils"; +import {RunMQMessage, RunMQMessageMeta} from "@src/core/message/RunMQMessage"; + +/** + * Integration tests for issue #23: schema-validation `failureStrategy: 'dlq'` + * must route invalid messages straight to the DLQ on first delivery, with no + * retries. A malformed message will never become valid by retrying — that's + * just dead time and broker churn. + */ +describe('RunMQ Schema Failure Strategy E2E', () => { + const validConfig = RunMQConnectionConfigExample.valid(); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('routes schema-invalid message straight to DLQ on first delivery (no retries)', async () => { + const configuration = RunMQProcessorConfigurationExample.random( + 'schema_dlq_strategy', + 1, + 5, // 5 attempts — the test must prove these aren't used + 500, // 500ms retry delay — gives plenty of time to observe retries if they happen + MessageSchemaExample.simplePersonSchema(), + ); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + let handlerCalls = 0; + await runMQ.process('user.created', configuration, () => { + handlerCalls++; + return Promise.resolve(); + }); + + // Publish a message that is structurally a RunMQMessage but fails the + // person schema (age is a string, not an integer). + const invalidPayload = JSON.stringify(new RunMQMessage( + {name: 'Alice', age: 'not-a-number', email: 'a@b.com'} as any, + new RunMQMessageMeta('msg-id', Date.now(), 'corr-id'), + )); + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'user.created', + Buffer.from(invalidPayload), + ); + + // Wait long enough that retries WOULD have fired (5 × 500ms = 2.5s). + // If failureStrategy is honoured the message lands in DLQ within the + // first ~200ms and stays there. + await RunMQUtils.delay(800); + + // The user handler must NEVER have been invoked. + expect(handlerCalls).toBe(0); + // DLQ holds exactly the one message. + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + // Main queue is empty. + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + // Retry-delay queue is empty — no retry attempts were scheduled. + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0); + + await runMQ.disconnect(); + await testingConnection.disconnect(); + }); + + it('still retries handler errors normally (the fix only short-circuits schema errors)', async () => { + const configuration = RunMQProcessorConfigurationExample.random( + 'schema_handler_retries', + 1, + 3, // 3 attempts + 100, + MessageSchemaExample.simplePersonSchema(), // Schema configured, but message will be VALID + ); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + let handlerCalls = 0; + await runMQ.process('user.created', configuration, () => { + handlerCalls++; + throw new Error('handler intentionally fails'); + }); + + // Publish a SCHEMA-VALID message — the handler will be the thing failing. + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'user.created', + MessageTestUtils.buffer(RunMQMessageExample.person()), + ); + + await RunMQUtils.delay(800); + + // Handler invoked 3 times (full retry budget), then DLQ'd. + expect(handlerCalls).toBe(3); + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0); + + await runMQ.disconnect(); + await testingConnection.disconnect(); + }); +}); diff --git a/tests/unit/core/consumer/processors/RunMQSchemaFailureProcessor.test.ts b/tests/unit/core/consumer/processors/RunMQSchemaFailureProcessor.test.ts new file mode 100644 index 0000000..036db1f --- /dev/null +++ b/tests/unit/core/consumer/processors/RunMQSchemaFailureProcessor.test.ts @@ -0,0 +1,132 @@ +import {RunMQSchemaFailureProcessor} from "@src/core/consumer/processors/RunMQSchemaFailureProcessor"; +import {RunMQSchemaValidationError, DeserializationError} from "@src/core/serializers/deserializer/DefaultDeserializer"; +import {RunMQConsumer, RunMQProcessorConfiguration} from "@src/types"; +import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {MockedRabbitMQPublisher} from "@tests/mocks/MockedRunMQPublisher"; +import {MockedAMQPChannel} from "@tests/mocks/MockedAMQPChannel"; +import {MockedAmqpMessage} from "@tests/mocks/MockedAmqpMessage"; +import {RunMQMessage, RunMQMessageMeta} from "@src/core/message/RunMQMessage"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; + +class MockedConsumer implements RunMQConsumer { + constructor(private readonly behavior: 'success' | Error) {} + + consume(): Promise { + if (this.behavior === 'success') return Promise.resolve(true); + return Promise.reject(this.behavior); + } +} + +describe('RunMQSchemaFailureProcessor', () => { + const configWithDLQStrategy: RunMQProcessorConfiguration = { + name: 'test-processor', + consumersCount: 1, + attempts: 3, + attemptsDelay: 100, + messageSchema: { + type: 'ajv', + schema: {type: 'object'}, + failureStrategy: 'dlq', + }, + }; + + const configWithoutSchema: RunMQProcessorConfiguration = { + name: 'test-processor', + consumersCount: 1, + }; + + const buildMessage = (payload: any) => new RabbitMQMessage( + payload, + 'msg-id', + 'corr-id', + new MockedAMQPChannel(), + MockedAmqpMessage, + {}, + ); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('passes through successful consume calls unchanged', async () => { + const inner = new MockedConsumer('success'); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + const result = await processor.consume(buildMessage('payload')); + + expect(result).toBe(true); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('routes message directly to DLQ when schema validation fails and strategy is "dlq"', async () => { + const validationError = new RunMQSchemaValidationError('schema invalid', 'details'); + const inner = new MockedConsumer(validationError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + const originalPayload = {bad: 'data'}; + const wrappedMessage = JSON.stringify(new RunMQMessage( + originalPayload, + new RunMQMessageMeta('msg-id', Date.now(), 'corr-id'), + )); + + const result = await processor.consume(buildMessage(wrappedMessage)); + + // Returns true so the outer chain acks the original message. + expect(result).toBe(true); + // Published once, to the right DLQ topic, with the unwrapped payload. + expect(publisher.publish).toHaveBeenCalledTimes(1); + const [topic, dlqMessage] = publisher.publish.mock.calls[0]; + expect(topic).toBe(ConsumerCreatorUtils.getDLQTopicName(configWithDLQStrategy.name)); + expect((dlqMessage as RabbitMQMessage).message).toEqual(originalPayload); + // Warn log fires. + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + expect.stringContaining('Schema validation failed'), + expect.objectContaining({correlationId: 'corr-id', error: 'details'}), + ); + }); + + it('rethrows handler errors (non-schema) for the outer chain to handle', async () => { + const handlerError = new Error('handler failed'); + const inner = new MockedConsumer(handlerError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + await expect(processor.consume(buildMessage('payload'))).rejects.toBe(handlerError); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('rethrows JSON parse / deserialization errors (not schema errors)', async () => { + const parseError = new DeserializationError('invalid json'); + const inner = new MockedConsumer(parseError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + await expect(processor.consume(buildMessage('payload'))).rejects.toBe(parseError); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('rethrows schema validation errors when no schema is configured', async () => { + const validationError = new RunMQSchemaValidationError('schema invalid'); + const inner = new MockedConsumer(validationError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithoutSchema, publisher, MockedRunMQLogger); + + await expect(processor.consume(buildMessage('payload'))).rejects.toBe(validationError); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('keeps payload as-is when message content is not a wrapped RunMQMessage', async () => { + const validationError = new RunMQSchemaValidationError('schema invalid'); + const inner = new MockedConsumer(validationError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + await processor.consume(buildMessage('plain text not json')); + + const [, dlqMessage] = publisher.publish.mock.calls[0]; + expect((dlqMessage as RabbitMQMessage).message).toBe('plain text not json'); + }); +}); From 95cf3dc8a8a09ba58a4801f6f616d609ddbccc26 Mon Sep 17 00:00:00 2001 From: Fawzi Abdulfattah Date: Sun, 10 May 2026 02:34:56 +0200 Subject: [PATCH 2/2] test: align DLQ schema-failure test with new strategy and fix cross-file topic flake Update the existing 'Should end up in DLQ when message is not meeting the schema validation' e2e to assert the new direct-to-DLQ behaviour from issue #23: with failureStrategy 'dlq' the handler is never invoked and the retry pipeline is not engaged. Use unique topic keys in the schema failure and root e2e suites so parallel test files don't fan-out into each other's consumers via the shared topic exchange. Co-Authored-By: Claude Opus 4.7 --- tests/e2e/RunMQ.e2e.test.ts | 23 ++++++++++++++--------- tests/e2e/RunMQ.schemaFailure.e2e.test.ts | 8 ++++---- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/tests/e2e/RunMQ.e2e.test.ts b/tests/e2e/RunMQ.e2e.test.ts index 41e3f17..a744f51 100644 --- a/tests/e2e/RunMQ.e2e.test.ts +++ b/tests/e2e/RunMQ.e2e.test.ts @@ -100,30 +100,35 @@ describe('RunMQ E2E Tests', () => { describe('processing', () => { it('Should end up in DLQ when message is not meeting the schema validation', async () => { + // simpleNoSchema's default messageSchema has failureStrategy: 'dlq', + // so per issue #23 a schema-validation failure must short-circuit + // straight to the DLQ — no retries, no handler invocation. const configuration = RunMQProcessorConfigurationExample.simpleNoSchema() const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); - await runMQ.process("ad.played", configuration, + let handlerCalls = 0; + await runMQ.process("runmq.e2e.ad.played", configuration, (): Promise => { + handlerCalls++; return Promise.resolve(); } ) - channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'ad.played', MessageTestUtils.buffer(MessageExample.person())) + channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'runmq.e2e.ad.played', MessageTestUtils.buffer(MessageExample.person())) await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1) await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0) await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0) - await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message processing failed', configuration.attempts as number) - await LoggerTestHelpers.assertLoggedWithCountAndParameters(MockedRunMQLogger.error, 'Message reached maximum attempts. Moving to dead-letter queue.', { - attempts: configuration.attempts, - max: configuration.attempts, - }, - 1 - ) + // Direct-to-DLQ path: the user handler is never reached and the + // retry pipeline is never engaged, so neither the per-attempt + // failure log nor the max-retries log should appear. + expect(handlerCalls).toBe(0); + await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.warn, 'Schema validation failed — routing message to DLQ.', 1) + await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message processing failed', 0) + await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message reached maximum attempts. Moving to dead-letter queue.', 0) await runMQ.disconnect(); await testingConnection.disconnect(); }) diff --git a/tests/e2e/RunMQ.schemaFailure.e2e.test.ts b/tests/e2e/RunMQ.schemaFailure.e2e.test.ts index 4381885..5de949e 100644 --- a/tests/e2e/RunMQ.schemaFailure.e2e.test.ts +++ b/tests/e2e/RunMQ.schemaFailure.e2e.test.ts @@ -39,7 +39,7 @@ describe('RunMQ Schema Failure Strategy E2E', () => { const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); let handlerCalls = 0; - await runMQ.process('user.created', configuration, () => { + await runMQ.process('schemaFailure.user.created', configuration, () => { handlerCalls++; return Promise.resolve(); }); @@ -52,7 +52,7 @@ describe('RunMQ Schema Failure Strategy E2E', () => { )); channel.publish( Constants.ROUTER_EXCHANGE_NAME, - 'user.created', + 'schemaFailure.user.created', Buffer.from(invalidPayload), ); @@ -89,7 +89,7 @@ describe('RunMQ Schema Failure Strategy E2E', () => { const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); let handlerCalls = 0; - await runMQ.process('user.created', configuration, () => { + await runMQ.process('schemaFailure.user.created', configuration, () => { handlerCalls++; throw new Error('handler intentionally fails'); }); @@ -97,7 +97,7 @@ describe('RunMQ Schema Failure Strategy E2E', () => { // Publish a SCHEMA-VALID message — the handler will be the thing failing. channel.publish( Constants.ROUTER_EXCHANGE_NAME, - 'user.created', + 'schemaFailure.user.created', MessageTestUtils.buffer(RunMQMessageExample.person()), );