diff --git a/README.md b/README.md index 2db2b6c..997d369 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ What's happening here: ### Publish a message ```typescript -runMQ.publish('user.created', { +await runMQ.publish('user.created', { userId: '123', email: 'user@example.com', name: 'John Doe' @@ -108,6 +108,8 @@ runMQ.publish('user.created', { ✅ One publish, every subscribed processor receives the message — independently and atomically. +✅ **Confirmed delivery by default.** `runMQ.publish()` returns a promise that resolves only after RabbitMQ has accepted the message; if the broker rejects it (alarm state, mandatory routing failure, etc.), the promise rejects so your code can handle it. Set `usePublisherConfirms: false` in the connection config to opt out and fall back to fire-and-forget publishing if per-publish round-trip latency matters more to you than detecting silent drops. +
## Patterns RunMQ fits naturally diff --git a/src/core/RunMQ.ts b/src/core/RunMQ.ts index 0fa2113..e71c616 100644 --- a/src/core/RunMQ.ts +++ b/src/core/RunMQ.ts @@ -57,18 +57,26 @@ export class RunMQ { } /** - * Publishes a message to the specified topic with an optional correlation ID + * Publishes a message to the specified topic with an optional correlation ID. + * + * If publisher confirms are enabled (`usePublisherConfirms: true` in the + * connection config), the returned promise resolves only after RabbitMQ + * acknowledges the message; if the broker rejects, the promise rejects. + * Otherwise it resolves once the message is flushed to the TCP socket + * (fire-and-forget, no delivery guarantee — same behavior as before + * publisher confirms were introduced). + * * @param topic The name of the topic to publish the message to * @param message The message payload to be published * @param correlationId (Optional) A unique identifier for correlating messages; if not provided, a new UUID will be generated */ - public publish(topic: string, message: Record, correlationId: string = RunMQUtils.generateUUID()): void { + public async publish(topic: string, message: Record, correlationId: string = RunMQUtils.generateUUID()): Promise { if (!this.publisher || !this.publishChannel) { throw new RunMQException(Exceptions.NOT_INITIALIZED, {}); } RunMQUtils.assertRecord(message); const messageId = RunMQUtils.generateUUID(); - this.publisher.publish(topic, + await this.publisher.publish(topic, RabbitMQMessage.from( message, this.publishChannel, @@ -148,6 +156,9 @@ export class RunMQ { // Use a dedicated channel for publishes so a setup-time channel close // (e.g. a precondition_failed on assertQueue) cannot break the publish path. this.publishChannel = await this.client.getChannel(); + if (this.config.usePublisherConfirms !== false) { + await this.publishChannel.confirmSelect(); + } this.publisher = new RunMQPublisherCreator(this.logger).createPublisher(); } } \ No newline at end of file diff --git a/src/core/clients/RabbitMQClientChannel.ts b/src/core/clients/RabbitMQClientChannel.ts index c710309..eff0f32 100644 --- a/src/core/clients/RabbitMQClientChannel.ts +++ b/src/core/clients/RabbitMQClientChannel.ts @@ -106,8 +106,8 @@ export class RabbitMQClientChannel implements AMQPChannel { }); } - publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean { - this.channel.basicPublish({ + async publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise { + await this.channel.basicPublish({ exchange, routingKey, correlationId: options?.correlationId, @@ -124,7 +124,10 @@ export class RabbitMQClientChannel implements AMQPChannel { userId: options?.userId, appId: options?.appId, }, content); - return true; + } + + async confirmSelect(): Promise { + await this.channel.confirmSelect(); } async consume( diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index 19dc8cd..b5e38bd 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -19,7 +19,6 @@ import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolic import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager"; import {RunMQException} from "@src/core/exceptions/RunMQException"; import {Exceptions} from "@src/core/exceptions/Exceptions"; -import {RunMQUtils} from "@src/core/utils/RunMQUtils"; export class RunMQConsumerCreator { private ttlPolicyManager: RunMQTTLPolicyManager; @@ -69,17 +68,19 @@ export class RunMQConsumerCreator { }); const DLQPublisher = new RunMQPublisherCreator(this.logger).createPublisher(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME); + // Always enable publisher confirms on the consumer channel: DLQ + // publishes flow through this channel and we cannot tolerate silent + // drops there (issue #19). + await consumerChannel.confirmSelect(); + const prefetchCount = consumerConfiguration.processorConfig.prefetch ?? DEFAULTS.PREFETCH_COUNT; await consumerChannel.prefetch(prefetchCount); await consumerChannel.consume(consumerConfiguration.processorConfig.name, async (msg) => { if (!msg) return; const rabbitmqMessage = new RabbitMQMessage( msg.content.toString(), - // Synthesize ids when an external (non-RunMQ) publisher did - // not set the AMQP messageId/correlationId — keeps log tracing - // consistent for cross-tenant queues. - msg.properties.messageId ?? RunMQUtils.generateUUID(), - msg.properties.correlationId ?? RunMQUtils.generateUUID(), + msg.properties.messageId, + msg.properties.correlationId, consumerChannel, msg, msg.properties.headers, diff --git a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts index 49424c7..7c52e3c 100644 --- a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts +++ b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts @@ -21,7 +21,23 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer { } catch (e: unknown) { if (this.hasReachedMaxRetries(message)) { this.logMaxRetriesReached(message); - this.moveToFinalDeadLetter(message); + try { + await this.moveToFinalDeadLetter(message); + } catch (publishError) { + // DLQ publish failed (broker rejected, channel closed, etc.). + // Do NOT ack — that would lose the message. nack(false) + // sends it back through the retry pipeline, where it'll + // come right back here on the next attempt with a natural + // backoff (the retry-delay-queue TTL). If the underlying + // failure is transient, the next attempt's DLQ publish + // will succeed. + this.logger.error('Failed to publish to DLQ — message will be redelivered', { + correlationId: message.correlationId, + cause: publishError instanceof Error ? publishError.message : String(publishError), + }); + message.nack(false); + return false; + } this.acknowledgeMessage(message); return false; } @@ -47,10 +63,12 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer { } // Republish the original AMQP body verbatim so the envelope (including - // publishedAt) is preserved end-to-end for audit/replay. - private moveToFinalDeadLetter(message: RabbitMQMessage) { + // publishedAt) is preserved end-to-end for audit/replay. The consumer + // channel runs in confirm mode (see RunMQConsumerCreator), so awaiting + // this publish surfaces broker-side rejections instead of dropping them. + private async moveToFinalDeadLetter(message: RabbitMQMessage): Promise { if (!message.amqpMessage) return; - message.channel.publish( + await message.channel.publish( Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, ConsumerCreatorUtils.getDLQTopicName(this.config.name), message.amqpMessage.content, diff --git a/src/core/message/RabbitMQMessage.ts b/src/core/message/RabbitMQMessage.ts index 15e4ef2..579f75b 100644 --- a/src/core/message/RabbitMQMessage.ts +++ b/src/core/message/RabbitMQMessage.ts @@ -1,3 +1,4 @@ +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties"; import {AMQPMessage} from "@src/core/message/AmqpMessage"; import {AMQPChannel} from "@src/types"; @@ -5,8 +6,8 @@ import {AMQPChannel} from "@src/types"; export class RabbitMQMessage { constructor( readonly message: any, - readonly id: string, - readonly correlationId: string, + readonly id: string = RunMQUtils.generateUUID(), + readonly correlationId: string = RunMQUtils.generateUUID(), readonly channel: AMQPChannel, readonly amqpMessage: AMQPMessage = null, readonly headers: Record = {}) { diff --git a/src/core/publisher/producers/RunMQBaseProducer.ts b/src/core/publisher/producers/RunMQBaseProducer.ts index b220825..d1640cb 100644 --- a/src/core/publisher/producers/RunMQBaseProducer.ts +++ b/src/core/publisher/producers/RunMQBaseProducer.ts @@ -8,7 +8,7 @@ export class RunMQBaseProducer implements RunMQPublisher { constructor(private serializer: Serializer, private exchange = Constants.ROUTER_EXCHANGE_NAME) { } - publish(topic: string, message: RabbitMQMessage): void { + async publish(topic: string, message: RabbitMQMessage): Promise { const runMQMessage = new RunMQMessage( message.message, new RunMQMessageMeta( @@ -17,7 +17,7 @@ export class RunMQBaseProducer implements RunMQPublisher { message.correlationId, )); const serialized = this.serializer.serialize(runMQMessage); - message.channel.publish(this.exchange, topic, Buffer.from(serialized), { + await message.channel.publish(this.exchange, topic, Buffer.from(serialized), { correlationId: message.correlationId, messageId: message.id, headers: message.headers, diff --git a/src/core/publisher/producers/RunMQFailureLoggerProducer.ts b/src/core/publisher/producers/RunMQFailureLoggerProducer.ts index 91e0b2f..c427daf 100644 --- a/src/core/publisher/producers/RunMQFailureLoggerProducer.ts +++ b/src/core/publisher/producers/RunMQFailureLoggerProducer.ts @@ -6,12 +6,13 @@ export class RunMQFailureLoggerProducer implements RunMQPublisher { constructor(private producer: RunMQPublisher, private logger: RunMQLogger) { } - publish(topic: string, message: RabbitMQMessage): void { + async publish(topic: string, message: RabbitMQMessage): Promise { try { - this.producer.publish(topic, message); + await this.producer.publish(topic, message); } catch (e) { this.logger.error('Message publishing failed', { - message: message, + topic, + correlationId: message.correlationId, error: e instanceof Error ? e.message : JSON.stringify(e), stack: e instanceof Error ? e.stack : undefined, }); diff --git a/src/types/index.ts b/src/types/index.ts index a1ac29b..dd2b7f5 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -150,8 +150,20 @@ export interface AMQPChannel { /** * Publishes a message to an exchange. + * + * If `confirmSelect()` was called on this channel, the returned promise + * resolves only after the broker acknowledges the message; if the broker + * rejects, the promise rejects. Otherwise it resolves once the message is + * flushed to the TCP socket. + */ + publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise; + + /** + * Enables publisher confirms on this channel. After this is called every + * `publish()` waits for broker acknowledgement before resolving. + * https://www.rabbitmq.com/confirms.html#publisher-confirms */ - publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean; + confirmSelect(): Promise; /** * Starts consuming messages from a queue. @@ -221,6 +233,21 @@ export interface RunMQConnectionConfig { password: string; }; /** + * Controls publisher confirms on the user-publish path. When enabled + * (default), `runMQ.publish()` resolves only after RabbitMQ acknowledges + * each message, and rejects on broker error (e.g. mandatory routing + * failure, alarm state). Set to `false` to opt out and fall back to + * fire-and-forget — publish resolves once the message is written to the + * TCP socket, with no delivery guarantee. + * + * Trade-off: confirms add a broker round-trip per publish (typically a + * few hundred microseconds). They're the only way to detect silent + * publish failures, so we default to safety. DLQ publishes from the + * consumer chain are *always* confirmed regardless of this setting — + * the message-loss risk there is not negotiable. + */ + usePublisherConfirms?: boolean; + /* * If true, RunMQ includes the full message payload in info/error log * lines that mention a message (publish success, per-attempt failure, * max-retries-reached). Off by default — capturing megabyte-sized @@ -361,5 +388,5 @@ export interface RunMQConsumer { export interface RunMQPublisher { - publish: (topic: string, message: RabbitMQMessage) => void; + publish: (topic: string, message: RabbitMQMessage) => Promise; } \ No newline at end of file diff --git a/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts b/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts new file mode 100644 index 0000000..f5a3629 --- /dev/null +++ b/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts @@ -0,0 +1,131 @@ +import {RunMQ} from '@src/core/RunMQ'; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; +import {RabbitMQClientChannel} from "@src/core/clients/RabbitMQClientChannel"; +import {Constants} from "@src/core/constants"; +import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; +import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; +import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample"; +import {MessageTestUtils} from "@tests/helpers/MessageTestUtils"; + +/** + * Integration tests for issues #19 + #28: publisher confirms. + * + * Before this fix, DLQ publishes from RunMQRetriesCheckerProcessor were + * fire-and-forget — the message was ack'd immediately even if the DLQ + * publish silently failed. That meant a network blip during a + * retry-exhausted message's DLQ handoff resulted in permanent message loss. + * + * The fix: enable publisher confirms on the consumer channel and await the + * DLQ publish. On failure, nack(false) so the message goes back through the + * retry pipeline (with natural backoff via the retry-delay queue's TTL) and + * gets another shot at reaching the DLQ. + */ +describe('RunMQ DLQ Publisher Confirms E2E', () => { + const validConfig = RunMQConnectionConfigExample.valid(); + + beforeEach(() => { + jest.restoreAllMocks(); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + it('does not lose the message when the DLQ publish fails — message is redelivered', async () => { + const configuration = RunMQProcessorConfigurationExample.simpleNoSchema('dlq_confirm_redeliver'); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + let handlerCalls = 0; + await runMQ.process('dlq.confirm', configuration, () => { + handlerCalls++; + throw new Error('handler intentionally fails'); + }); + + // Make the FIRST DLQ publish fail (simulating broker rejection or + // channel closure mid-flight). Subsequent publishes succeed. + // This is the scenario where the old code silently lost messages. + let dlqPublishesAttempted = 0; + const original = RabbitMQClientChannel.prototype.publish; + jest.spyOn(RabbitMQClientChannel.prototype, 'publish') + .mockImplementation(async function (this: RabbitMQClientChannel, exchange: string, routingKey: string, content: Buffer, options) { + if (exchange === Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME) { + dlqPublishesAttempted++; + if (dlqPublishesAttempted === 1) { + throw new Error('simulated broker rejection on first DLQ publish'); + } + } + return original.call(this, exchange, routingKey, content, options); + }); + + try { + // Publish a message. Handler fails on first delivery → DLQ publish + // attempts → first publish fails → message is nacked back into the + // retry pipeline → retry-delay TTL → handler fails again → DLQ + // publish succeeds → message lands in DLQ. + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'dlq.confirm', + MessageTestUtils.buffer(RunMQMessageExample.random()), + ); + + // Default attemptsDelay is 100ms (simpleNoSchema config). + // We need: handler call, DLQ publish fail, retry-delay wait, handler call, DLQ publish succeeds. + await RunMQUtils.delay(2000); + + // Both DLQ publish attempts were made (first failed, second succeeded). + expect(dlqPublishesAttempted).toBeGreaterThanOrEqual(2); + // Handler ran multiple times due to redelivery — proves the message + // was nack'd back, NOT silently dropped. + expect(handlerCalls).toBeGreaterThanOrEqual(2); + // Message landed in DLQ on the second attempt. + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + // Main queue is empty. + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + } finally { + await runMQ.disconnect(); + await testingConnection.disconnect(); + } + }); + + it('confirms DLQ publishes by default — broker ack required before original is acked', async () => { + const configuration = RunMQProcessorConfigurationExample.random( + 'dlq_confirm_happy_path', + 1, + 1, // attempts=1, so first failure → DLQ + 100, + ); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + await runMQ.process('dlq.happy', configuration, () => { + throw new Error('handler always fails'); + }); + + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'dlq.happy', + MessageTestUtils.buffer(RunMQMessageExample.random()), + ); + + await RunMQUtils.delay(500); + + // Message should be in DLQ (with confirm received from broker). + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0); + + await runMQ.disconnect(); + await testingConnection.disconnect(); + }); +}); diff --git a/tests/e2e/RunMQ.e2e.test.ts b/tests/e2e/RunMQ.e2e.test.ts index a744f51..ba5ce68 100644 --- a/tests/e2e/RunMQ.e2e.test.ts +++ b/tests/e2e/RunMQ.e2e.test.ts @@ -165,12 +165,11 @@ describe('RunMQ E2E Tests', () => { await testingConnection.disconnect(); }) - it("should throw error when publishing invalid message", async () => { + it("should reject when publishing invalid message", async () => { const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); - expect(() => { - runMQ.publish("user.created", "invalid message" as any); - }).toThrow(RunMQException); + await expect(runMQ.publish("user.created", "invalid message" as any)) + .rejects.toThrow(RunMQException); await runMQ.disconnect(); }); diff --git a/tests/mocks/MockedAMQPChannel.ts b/tests/mocks/MockedAMQPChannel.ts index e46936d..b75ec63 100644 --- a/tests/mocks/MockedAMQPChannel.ts +++ b/tests/mocks/MockedAMQPChannel.ts @@ -39,7 +39,7 @@ export class MockedAMQPChannel implements AMQPChannel { bindQueue = jest.fn, [string, string, string, Record?]>().mockResolvedValue(); - publish = jest.fn().mockReturnValue(true); + publish = jest.fn, [string, string, Buffer, AMQPPublishOptions?]>().mockResolvedValue(); consume = jest.fn, [string, (msg: ConsumeMessage | null) => void, AMQPConsumeOptions?]>().mockResolvedValue({ consumerTag: 'test-consumer-tag' @@ -51,6 +51,8 @@ export class MockedAMQPChannel implements AMQPChannel { prefetch = jest.fn, [number, boolean?]>().mockResolvedValue(); + confirmSelect = jest.fn, []>().mockResolvedValue(); + close = jest.fn, []>().mockResolvedValue(); } diff --git a/tests/mocks/MockedRabbitMQChannel.ts b/tests/mocks/MockedRabbitMQChannel.ts index 91af045..bfaa059 100644 --- a/tests/mocks/MockedRabbitMQChannel.ts +++ b/tests/mocks/MockedRabbitMQChannel.ts @@ -9,12 +9,13 @@ export class MockedRabbitMQChannel implements AMQPChannel { deleteExchange = jest.fn(); checkExchange = jest.fn(); assertExchange = jest.fn(); - publish = jest.fn(); + publish = jest.fn().mockResolvedValue(undefined); consume = jest.fn(); get = jest.fn(); ack = jest.fn(); nack = jest.fn(); prefetch = jest.fn(); + confirmSelect = jest.fn().mockResolvedValue(undefined); close = jest.fn(); connection: Connection = {} as Connection; on = jest.fn(); diff --git a/tests/mocks/MockedRunMQPublisher.ts b/tests/mocks/MockedRunMQPublisher.ts index eb31b35..030e926 100644 --- a/tests/mocks/MockedRunMQPublisher.ts +++ b/tests/mocks/MockedRunMQPublisher.ts @@ -1,5 +1,5 @@ import {RunMQPublisher} from "@src/types"; export class MockedRabbitMQPublisher implements RunMQPublisher { - public publish = jest.fn(); + public publish = jest.fn().mockResolvedValue(undefined); } \ No newline at end of file diff --git a/tests/unit/core/RunMQ.test.ts b/tests/unit/core/RunMQ.test.ts index 6036933..a20d586 100644 --- a/tests/unit/core/RunMQ.test.ts +++ b/tests/unit/core/RunMQ.test.ts @@ -38,7 +38,7 @@ describe('RunMQ Unit Tests', () => { const setupPublisherMock = () => { const mockPublisherCreator = RunMQPublisherCreator as jest.MockedClass; - const mockPublisher = {publish: jest.fn()}; + const mockPublisher = {publish: jest.fn().mockResolvedValue(undefined)}; mockPublisherCreator.prototype.createPublisher.mockReturnValue(mockPublisher as any); return {mockPublisherCreator, mockPublisher}; }; @@ -143,13 +143,12 @@ describe('RunMQ Unit Tests', () => { }); describe('producer', () => { - it('should throw error if message is not a valid record', async () => { + it('should reject if message is not a valid record', async () => { setupSuccessfulClientMock(); const runMQ = await RunMQ.start(validConfig); - expect(() => { - runMQ.publish('test.topic', "invalid message" as any); - }).toThrow(RunMQException); + await expect(runMQ.publish('test.topic', "invalid message" as any)) + .rejects.toThrow(RunMQException); }); it('should publish message correctly if valid record', async () => { @@ -157,7 +156,7 @@ describe('RunMQ Unit Tests', () => { const {mockPublisher} = setupPublisherMock(); const runMQ = await RunMQ.start(validConfig); - runMQ.publish('test.topic', MessageExample.person()); + await runMQ.publish('test.topic', MessageExample.person()); expect(mockPublisher.publish).toHaveBeenCalledWith('test.topic', expect.any(Object)); }); @@ -168,7 +167,7 @@ describe('RunMQ Unit Tests', () => { (RunMQUtils.generateUUID as jest.Mock).mockReturnValue('msg-uuid'); const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); - runMQ.publish('test.topic', MessageExample.person(), 'corr-1'); + await runMQ.publish('test.topic', MessageExample.person(), 'corr-1'); expect(MockedRunMQLogger.info).toHaveBeenCalledWith('Published message', { topic: 'test.topic', @@ -187,7 +186,7 @@ describe('RunMQ Unit Tests', () => { MockedRunMQLogger, ); const payload = MessageExample.person(); - runMQ.publish('test.topic', payload, 'corr-1'); + await runMQ.publish('test.topic', payload, 'corr-1'); expect(MockedRunMQLogger.info).toHaveBeenCalledWith('Published message', { topic: 'test.topic', diff --git a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts index 7674205..7f99f2a 100644 --- a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts +++ b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts @@ -28,11 +28,11 @@ describe('RunMQBaseProducer Unit Tests', () => { }); describe('publish', () => { - it('should create RunMQMessage with generated ID and current timestamp', () => { + it('should create RunMQMessage with generated ID and current timestamp', async () => { const testMessage = MockedRabbitMQMessage; const testTopic = 'test.topic'; - producer.publish(testTopic, testMessage); + await producer.publish(testTopic, testMessage); expect(RunMQMessage).toHaveBeenCalledWith( testMessage.message, @@ -44,11 +44,11 @@ describe('RunMQBaseProducer Unit Tests', () => { ); }); - it('should serialize the RunMQMessage', () => { + it('should serialize the RunMQMessage', async () => { const testMessage = MockedRabbitMQMessage; const testTopic = 'test.topic'; - producer.publish(testTopic, testMessage); + await producer.publish(testTopic, testMessage); expect(mockedSerializer.serialize).toHaveBeenCalledTimes(1); expect(mockedSerializer.serialize).toHaveBeenCalledWith( @@ -63,7 +63,7 @@ describe('RunMQBaseProducer Unit Tests', () => { ); }); - it('should publish to the correct exchange and routing key', () => { + it('should publish to the correct exchange and routing key', async () => { jest.useFakeTimers().setSystemTime(new Date('2025-10-10T00:00:00Z')); const message = RunMQMessageExample.person(); const testMessage = mockedRabbitMQMessageWithChannelAndMessage( @@ -75,7 +75,7 @@ describe('RunMQBaseProducer Unit Tests', () => { const testTopic = 'test.topic'; const serializedMessage = JSON.stringify(message); - producer.publish(testTopic, testMessage); + await producer.publish(testTopic, testMessage); expect(mockedChannel.publish).toHaveBeenCalledWith( Constants.ROUTER_EXCHANGE_NAME, diff --git a/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts b/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts index 168370e..4b55b7d 100644 --- a/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts +++ b/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts @@ -10,136 +10,83 @@ describe('RunMQFailureLoggerProducer Unit Tests', () => { MockedRunMQLogger ); + beforeEach(() => { + jest.clearAllMocks(); + mockProducer.publish.mockResolvedValue(undefined); + }); + describe('publish', () => { - it('should delegate to wrapped producer when publish succeeds', () => { + it('should delegate to wrapped producer when publish succeeds', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; - failureLoggerProducer.publish(testTopic, testMessage); + await failureLoggerProducer.publish(testTopic, testMessage); expect(mockProducer.publish).toHaveBeenCalledWith(testTopic, testMessage); expect(MockedRunMQLogger.error).not.toHaveBeenCalled(); }); - it('should log error and rethrow when publish fails', () => { + it('should log error and rethrow when publish rejects', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; const publishError = new Error('Publish failed'); + mockProducer.publish.mockRejectedValueOnce(publishError); - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('Publish failed'); + await expect(failureLoggerProducer.publish(testTopic, testMessage)) + .rejects.toThrow('Publish failed'); expect(mockProducer.publish).toHaveBeenCalledWith(testTopic, testMessage); expect(MockedRunMQLogger.error).toHaveBeenCalledWith( 'Message publishing failed', - { - message: testMessage, + expect.objectContaining({ + topic: testTopic, + correlationId: testMessage.correlationId, error: 'Publish failed', stack: publishError.stack, - } + }) ); }); - it('should handle non-Error exceptions', () => { + it('should handle non-Error rejections', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; - const publishError = 'String error'; - - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); + mockProducer.publish.mockRejectedValueOnce('String error'); - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('String error'); + await expect(failureLoggerProducer.publish(testTopic, testMessage)) + .rejects.toBe('String error'); expect(MockedRunMQLogger.error).toHaveBeenCalledWith( 'Message publishing failed', - { - message: testMessage, - error: JSON.stringify(publishError), + expect.objectContaining({ + topic: testTopic, + correlationId: testMessage.correlationId, + error: JSON.stringify('String error'), stack: undefined, - } + }) ); }); - it('should handle complex message objects in error logging', () => { - const testTopic = 'test.topic'; - const testMessage = MockedRabbitMQMessage; - - const publishError = new Error('Complex message error'); - - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('Complex message error'); - - expect(MockedRunMQLogger.error).toHaveBeenCalledWith( - 'Message publishing failed', - { - message: testMessage, - error: 'Complex message error', - stack: publishError.stack, - } - ); - }); - - it('should handle null/undefined message content in error logging', () => { - const testTopic = 'test.topic'; - const testMessage = MockedRabbitMQMessage; - - const publishError = new Error('Null message error'); - - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('Null message error'); - - expect(MockedRunMQLogger.error).toHaveBeenCalledWith( - 'Message publishing failed', - { - message: testMessage, - error: 'Null message error', - stack: publishError.stack, - } - ); - }); - - it('should preserve original error when rethrowing', () => { + it('should preserve original error when rethrowing', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; const originalError = new Error('Original error'); originalError.name = 'CustomError'; + mockProducer.publish.mockRejectedValueOnce(originalError); - mockProducer.publish.mockImplementation(() => { - throw originalError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow(originalError); + await expect(failureLoggerProducer.publish(testTopic, testMessage)) + .rejects.toBe(originalError); expect(MockedRunMQLogger.error).toHaveBeenCalledWith( 'Message publishing failed', - { - message: testMessage, + expect.objectContaining({ + topic: testTopic, + correlationId: testMessage.correlationId, error: 'Original error', stack: originalError.stack, - } + }) ); }); });