diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index 2fa8168..195ce9e 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -7,6 +7,7 @@ import { import {RunMQFailedMessageRejecterProcessor} from "@src/core/consumer/processors/RunMQFailedMessageRejecterProcessor"; import {RunMQRetriesCheckerProcessor} from "@src/core/consumer/processors/RunMQRetriesCheckerProcessor"; import {RunMQFailureLoggerProcessor} from "@src/core/consumer/processors/RunMQFailureLoggerProcessor"; +import {RunMQSchemaFailureProcessor} from "@src/core/consumer/processors/RunMQSchemaFailureProcessor"; import {RunMQBaseProcessor} from "@src/core/consumer/processors/RunMQBaseProcessor"; import {RunMQExceptionLoggerProcessor} from "@src/core/consumer/processors/RunMQExceptionLoggerProcessor"; import {RunMQLogger} from "@src/core/logging/RunMQLogger"; @@ -88,10 +89,15 @@ export class RunMQConsumerCreator { new RunMQFailedMessageRejecterProcessor( new RunMQRetriesCheckerProcessor( new RunMQFailureLoggerProcessor( - new RunMQBaseProcessor( - consumerConfiguration.processor, + new RunMQSchemaFailureProcessor( + new RunMQBaseProcessor( + consumerConfiguration.processor, + consumerConfiguration.processorConfig, + new DefaultDeserializer() + ), consumerConfiguration.processorConfig, - new DefaultDeserializer() + DLQPublisher, + this.logger ), this.logger ), diff --git a/src/core/consumer/processors/RunMQSchemaFailureProcessor.ts b/src/core/consumer/processors/RunMQSchemaFailureProcessor.ts new file mode 100644 index 0000000..9b7061d --- /dev/null +++ b/src/core/consumer/processors/RunMQSchemaFailureProcessor.ts @@ -0,0 +1,82 @@ +import {RunMQConsumer, RunMQProcessorConfiguration, RunMQPublisher} from "@src/types"; +import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; +import {RunMQLogger} from "@src/core/logging/RunMQLogger"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; +import {RunMQMessage} from "@src/core/message/RunMQMessage"; +import {RunMQSchemaValidationError} from "@src/core/serializers/deserializer/DefaultDeserializer"; + +/** + * Honors the configured `failureStrategy` for schema-validation failures. + * + * Without this processor, a schema-validation error takes the same path as + * any handler error: it gets retried `attempts` times before reaching the + * DLQ. That's wasteful — a malformed message will never become valid by + * retrying it. + * + * When `messageSchema.failureStrategy === 'dlq'`, this processor catches + * `RunMQSchemaValidationError` from the deserializer and routes the message + * straight to the DLQ. The original message is then acked (we return true, + * letting `RunMQSucceededMessageAcknowledgerProcessor` do the ack). + * + * Other errors (handler exceptions, JSON parse errors) propagate up the + * chain unchanged. + */ +export class RunMQSchemaFailureProcessor implements RunMQConsumer { + constructor( + private readonly consumer: RunMQConsumer, + private readonly config: RunMQProcessorConfiguration, + private readonly DLQPublisher: RunMQPublisher, + private readonly logger: RunMQLogger, + ) { + } + + public async consume(message: RabbitMQMessage): Promise { + try { + return await this.consumer.consume(message); + } catch (e: unknown) { + if (this.shouldRouteToDLQ(e)) { + this.logger.warn('Schema validation failed — routing message to DLQ.', { + correlationId: message.correlationId, + error: e instanceof RunMQSchemaValidationError ? e.error : undefined, + }); + this.routeToDLQ(message); + return true; + } + throw e; + } + } + + private shouldRouteToDLQ(e: unknown): boolean { + if (!(e instanceof RunMQSchemaValidationError)) return false; + return this.config.messageSchema?.failureStrategy === 'dlq'; + } + + private routeToDLQ(message: RabbitMQMessage) { + const dlqMessage = new RabbitMQMessage( + this.extractOriginalPayload(message), + message.id, + message.correlationId, + message.channel, + message.amqpMessage, + message.headers, + ); + this.DLQPublisher.publish( + ConsumerCreatorUtils.getDLQTopicName(this.config.name), + dlqMessage, + ); + } + + private extractOriginalPayload(message: RabbitMQMessage): any { + if (typeof message.message === 'string') { + try { + const parsed = JSON.parse(message.message); + if (RunMQMessage.isValid(parsed)) { + return parsed.message; + } + } catch { + // Not valid JSON, use as-is + } + } + return message.message; + } +} diff --git a/tests/e2e/RunMQ.e2e.test.ts b/tests/e2e/RunMQ.e2e.test.ts index 41e3f17..a744f51 100644 --- a/tests/e2e/RunMQ.e2e.test.ts +++ b/tests/e2e/RunMQ.e2e.test.ts @@ -100,30 +100,35 @@ describe('RunMQ E2E Tests', () => { describe('processing', () => { it('Should end up in DLQ when message is not meeting the schema validation', async () => { + // simpleNoSchema's default messageSchema has failureStrategy: 'dlq', + // so per issue #23 a schema-validation failure must short-circuit + // straight to the DLQ — no retries, no handler invocation. const configuration = RunMQProcessorConfigurationExample.simpleNoSchema() const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); - await runMQ.process("ad.played", configuration, + let handlerCalls = 0; + await runMQ.process("runmq.e2e.ad.played", configuration, (): Promise => { + handlerCalls++; return Promise.resolve(); } ) - channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'ad.played', MessageTestUtils.buffer(MessageExample.person())) + channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'runmq.e2e.ad.played', MessageTestUtils.buffer(MessageExample.person())) await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1) await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0) await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0) - await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message processing failed', configuration.attempts as number) - await LoggerTestHelpers.assertLoggedWithCountAndParameters(MockedRunMQLogger.error, 'Message reached maximum attempts. Moving to dead-letter queue.', { - attempts: configuration.attempts, - max: configuration.attempts, - }, - 1 - ) + // Direct-to-DLQ path: the user handler is never reached and the + // retry pipeline is never engaged, so neither the per-attempt + // failure log nor the max-retries log should appear. + expect(handlerCalls).toBe(0); + await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.warn, 'Schema validation failed — routing message to DLQ.', 1) + await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message processing failed', 0) + await LoggerTestHelpers.assertLoggedWithCount(MockedRunMQLogger.error, 'Message reached maximum attempts. Moving to dead-letter queue.', 0) await runMQ.disconnect(); await testingConnection.disconnect(); }) diff --git a/tests/e2e/RunMQ.schemaFailure.e2e.test.ts b/tests/e2e/RunMQ.schemaFailure.e2e.test.ts new file mode 100644 index 0000000..5de949e --- /dev/null +++ b/tests/e2e/RunMQ.schemaFailure.e2e.test.ts @@ -0,0 +1,115 @@ +import {RunMQ} from '@src/core/RunMQ'; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; +import {Constants} from "@src/core/constants"; +import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; +import {RunMQProcessorConfigurationExample, MessageSchemaExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; +import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample"; +import {MessageTestUtils} from "@tests/helpers/MessageTestUtils"; +import {RunMQMessage, RunMQMessageMeta} from "@src/core/message/RunMQMessage"; + +/** + * Integration tests for issue #23: schema-validation `failureStrategy: 'dlq'` + * must route invalid messages straight to the DLQ on first delivery, with no + * retries. A malformed message will never become valid by retrying — that's + * just dead time and broker churn. + */ +describe('RunMQ Schema Failure Strategy E2E', () => { + const validConfig = RunMQConnectionConfigExample.valid(); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('routes schema-invalid message straight to DLQ on first delivery (no retries)', async () => { + const configuration = RunMQProcessorConfigurationExample.random( + 'schema_dlq_strategy', + 1, + 5, // 5 attempts — the test must prove these aren't used + 500, // 500ms retry delay — gives plenty of time to observe retries if they happen + MessageSchemaExample.simplePersonSchema(), + ); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + let handlerCalls = 0; + await runMQ.process('schemaFailure.user.created', configuration, () => { + handlerCalls++; + return Promise.resolve(); + }); + + // Publish a message that is structurally a RunMQMessage but fails the + // person schema (age is a string, not an integer). + const invalidPayload = JSON.stringify(new RunMQMessage( + {name: 'Alice', age: 'not-a-number', email: 'a@b.com'} as any, + new RunMQMessageMeta('msg-id', Date.now(), 'corr-id'), + )); + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'schemaFailure.user.created', + Buffer.from(invalidPayload), + ); + + // Wait long enough that retries WOULD have fired (5 × 500ms = 2.5s). + // If failureStrategy is honoured the message lands in DLQ within the + // first ~200ms and stays there. + await RunMQUtils.delay(800); + + // The user handler must NEVER have been invoked. + expect(handlerCalls).toBe(0); + // DLQ holds exactly the one message. + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + // Main queue is empty. + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + // Retry-delay queue is empty — no retry attempts were scheduled. + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0); + + await runMQ.disconnect(); + await testingConnection.disconnect(); + }); + + it('still retries handler errors normally (the fix only short-circuits schema errors)', async () => { + const configuration = RunMQProcessorConfigurationExample.random( + 'schema_handler_retries', + 1, + 3, // 3 attempts + 100, + MessageSchemaExample.simplePersonSchema(), // Schema configured, but message will be VALID + ); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + let handlerCalls = 0; + await runMQ.process('schemaFailure.user.created', configuration, () => { + handlerCalls++; + throw new Error('handler intentionally fails'); + }); + + // Publish a SCHEMA-VALID message — the handler will be the thing failing. + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'schemaFailure.user.created', + MessageTestUtils.buffer(RunMQMessageExample.person()), + ); + + await RunMQUtils.delay(800); + + // Handler invoked 3 times (full retry budget), then DLQ'd. + expect(handlerCalls).toBe(3); + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0); + + await runMQ.disconnect(); + await testingConnection.disconnect(); + }); +}); diff --git a/tests/unit/core/consumer/processors/RunMQSchemaFailureProcessor.test.ts b/tests/unit/core/consumer/processors/RunMQSchemaFailureProcessor.test.ts new file mode 100644 index 0000000..036db1f --- /dev/null +++ b/tests/unit/core/consumer/processors/RunMQSchemaFailureProcessor.test.ts @@ -0,0 +1,132 @@ +import {RunMQSchemaFailureProcessor} from "@src/core/consumer/processors/RunMQSchemaFailureProcessor"; +import {RunMQSchemaValidationError, DeserializationError} from "@src/core/serializers/deserializer/DefaultDeserializer"; +import {RunMQConsumer, RunMQProcessorConfiguration} from "@src/types"; +import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {MockedRabbitMQPublisher} from "@tests/mocks/MockedRunMQPublisher"; +import {MockedAMQPChannel} from "@tests/mocks/MockedAMQPChannel"; +import {MockedAmqpMessage} from "@tests/mocks/MockedAmqpMessage"; +import {RunMQMessage, RunMQMessageMeta} from "@src/core/message/RunMQMessage"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; + +class MockedConsumer implements RunMQConsumer { + constructor(private readonly behavior: 'success' | Error) {} + + consume(): Promise { + if (this.behavior === 'success') return Promise.resolve(true); + return Promise.reject(this.behavior); + } +} + +describe('RunMQSchemaFailureProcessor', () => { + const configWithDLQStrategy: RunMQProcessorConfiguration = { + name: 'test-processor', + consumersCount: 1, + attempts: 3, + attemptsDelay: 100, + messageSchema: { + type: 'ajv', + schema: {type: 'object'}, + failureStrategy: 'dlq', + }, + }; + + const configWithoutSchema: RunMQProcessorConfiguration = { + name: 'test-processor', + consumersCount: 1, + }; + + const buildMessage = (payload: any) => new RabbitMQMessage( + payload, + 'msg-id', + 'corr-id', + new MockedAMQPChannel(), + MockedAmqpMessage, + {}, + ); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('passes through successful consume calls unchanged', async () => { + const inner = new MockedConsumer('success'); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + const result = await processor.consume(buildMessage('payload')); + + expect(result).toBe(true); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('routes message directly to DLQ when schema validation fails and strategy is "dlq"', async () => { + const validationError = new RunMQSchemaValidationError('schema invalid', 'details'); + const inner = new MockedConsumer(validationError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + const originalPayload = {bad: 'data'}; + const wrappedMessage = JSON.stringify(new RunMQMessage( + originalPayload, + new RunMQMessageMeta('msg-id', Date.now(), 'corr-id'), + )); + + const result = await processor.consume(buildMessage(wrappedMessage)); + + // Returns true so the outer chain acks the original message. + expect(result).toBe(true); + // Published once, to the right DLQ topic, with the unwrapped payload. + expect(publisher.publish).toHaveBeenCalledTimes(1); + const [topic, dlqMessage] = publisher.publish.mock.calls[0]; + expect(topic).toBe(ConsumerCreatorUtils.getDLQTopicName(configWithDLQStrategy.name)); + expect((dlqMessage as RabbitMQMessage).message).toEqual(originalPayload); + // Warn log fires. + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + expect.stringContaining('Schema validation failed'), + expect.objectContaining({correlationId: 'corr-id', error: 'details'}), + ); + }); + + it('rethrows handler errors (non-schema) for the outer chain to handle', async () => { + const handlerError = new Error('handler failed'); + const inner = new MockedConsumer(handlerError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + await expect(processor.consume(buildMessage('payload'))).rejects.toBe(handlerError); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('rethrows JSON parse / deserialization errors (not schema errors)', async () => { + const parseError = new DeserializationError('invalid json'); + const inner = new MockedConsumer(parseError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + await expect(processor.consume(buildMessage('payload'))).rejects.toBe(parseError); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('rethrows schema validation errors when no schema is configured', async () => { + const validationError = new RunMQSchemaValidationError('schema invalid'); + const inner = new MockedConsumer(validationError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithoutSchema, publisher, MockedRunMQLogger); + + await expect(processor.consume(buildMessage('payload'))).rejects.toBe(validationError); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('keeps payload as-is when message content is not a wrapped RunMQMessage', async () => { + const validationError = new RunMQSchemaValidationError('schema invalid'); + const inner = new MockedConsumer(validationError); + const publisher = new MockedRabbitMQPublisher(); + const processor = new RunMQSchemaFailureProcessor(inner, configWithDLQStrategy, publisher, MockedRunMQLogger); + + await processor.consume(buildMessage('plain text not json')); + + const [, dlqMessage] = publisher.publish.mock.calls[0]; + expect((dlqMessage as RabbitMQMessage).message).toBe('plain text not json'); + }); +});