diff --git a/README.md b/README.md
index 2db2b6c..997d369 100644
--- a/README.md
+++ b/README.md
@@ -99,7 +99,7 @@ What's happening here:
### Publish a message
```typescript
-runMQ.publish('user.created', {
+await runMQ.publish('user.created', {
userId: '123',
email: 'user@example.com',
name: 'John Doe'
@@ -108,6 +108,8 @@ runMQ.publish('user.created', {
✅ One publish, every subscribed processor receives the message — independently and atomically.
+✅ **Confirmed delivery by default.** `runMQ.publish()` returns a promise that resolves only after RabbitMQ has accepted the message; if the broker rejects it (alarm state, mandatory routing failure, etc.), the promise rejects so your code can handle it. Set `usePublisherConfirms: false` in the connection config to opt out and fall back to fire-and-forget publishing if per-publish round-trip latency matters more to you than detecting silent drops.
+
## Patterns RunMQ fits naturally
diff --git a/src/core/RunMQ.ts b/src/core/RunMQ.ts
index 0fa2113..e71c616 100644
--- a/src/core/RunMQ.ts
+++ b/src/core/RunMQ.ts
@@ -57,18 +57,26 @@ export class RunMQ {
}
/**
- * Publishes a message to the specified topic with an optional correlation ID
+ * Publishes a message to the specified topic with an optional correlation ID.
+ *
+ * If publisher confirms are enabled (`usePublisherConfirms: true` in the
+ * connection config), the returned promise resolves only after RabbitMQ
+ * acknowledges the message; if the broker rejects, the promise rejects.
+ * Otherwise it resolves once the message is flushed to the TCP socket
+ * (fire-and-forget, no delivery guarantee — same behavior as before
+ * publisher confirms were introduced).
+ *
* @param topic The name of the topic to publish the message to
* @param message The message payload to be published
* @param correlationId (Optional) A unique identifier for correlating messages; if not provided, a new UUID will be generated
*/
- public publish(topic: string, message: Record, correlationId: string = RunMQUtils.generateUUID()): void {
+ public async publish(topic: string, message: Record, correlationId: string = RunMQUtils.generateUUID()): Promise {
if (!this.publisher || !this.publishChannel) {
throw new RunMQException(Exceptions.NOT_INITIALIZED, {});
}
RunMQUtils.assertRecord(message);
const messageId = RunMQUtils.generateUUID();
- this.publisher.publish(topic,
+ await this.publisher.publish(topic,
RabbitMQMessage.from(
message,
this.publishChannel,
@@ -148,6 +156,9 @@ export class RunMQ {
// Use a dedicated channel for publishes so a setup-time channel close
// (e.g. a precondition_failed on assertQueue) cannot break the publish path.
this.publishChannel = await this.client.getChannel();
+ if (this.config.usePublisherConfirms !== false) {
+ await this.publishChannel.confirmSelect();
+ }
this.publisher = new RunMQPublisherCreator(this.logger).createPublisher();
}
}
\ No newline at end of file
diff --git a/src/core/clients/RabbitMQClientChannel.ts b/src/core/clients/RabbitMQClientChannel.ts
index c710309..eff0f32 100644
--- a/src/core/clients/RabbitMQClientChannel.ts
+++ b/src/core/clients/RabbitMQClientChannel.ts
@@ -106,8 +106,8 @@ export class RabbitMQClientChannel implements AMQPChannel {
});
}
- publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean {
- this.channel.basicPublish({
+ async publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise {
+ await this.channel.basicPublish({
exchange,
routingKey,
correlationId: options?.correlationId,
@@ -124,7 +124,10 @@ export class RabbitMQClientChannel implements AMQPChannel {
userId: options?.userId,
appId: options?.appId,
}, content);
- return true;
+ }
+
+ async confirmSelect(): Promise {
+ await this.channel.confirmSelect();
}
async consume(
diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts
index 19dc8cd..b5e38bd 100644
--- a/src/core/consumer/RunMQConsumerCreator.ts
+++ b/src/core/consumer/RunMQConsumerCreator.ts
@@ -19,7 +19,6 @@ import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolic
import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
import {RunMQException} from "@src/core/exceptions/RunMQException";
import {Exceptions} from "@src/core/exceptions/Exceptions";
-import {RunMQUtils} from "@src/core/utils/RunMQUtils";
export class RunMQConsumerCreator {
private ttlPolicyManager: RunMQTTLPolicyManager;
@@ -69,17 +68,19 @@ export class RunMQConsumerCreator {
});
const DLQPublisher = new RunMQPublisherCreator(this.logger).createPublisher(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME);
+ // Always enable publisher confirms on the consumer channel: DLQ
+ // publishes flow through this channel and we cannot tolerate silent
+ // drops there (issue #19).
+ await consumerChannel.confirmSelect();
+
const prefetchCount = consumerConfiguration.processorConfig.prefetch ?? DEFAULTS.PREFETCH_COUNT;
await consumerChannel.prefetch(prefetchCount);
await consumerChannel.consume(consumerConfiguration.processorConfig.name, async (msg) => {
if (!msg) return;
const rabbitmqMessage = new RabbitMQMessage(
msg.content.toString(),
- // Synthesize ids when an external (non-RunMQ) publisher did
- // not set the AMQP messageId/correlationId — keeps log tracing
- // consistent for cross-tenant queues.
- msg.properties.messageId ?? RunMQUtils.generateUUID(),
- msg.properties.correlationId ?? RunMQUtils.generateUUID(),
+ msg.properties.messageId,
+ msg.properties.correlationId,
consumerChannel,
msg,
msg.properties.headers,
diff --git a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts
index 49424c7..7c52e3c 100644
--- a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts
+++ b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts
@@ -21,7 +21,23 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer {
} catch (e: unknown) {
if (this.hasReachedMaxRetries(message)) {
this.logMaxRetriesReached(message);
- this.moveToFinalDeadLetter(message);
+ try {
+ await this.moveToFinalDeadLetter(message);
+ } catch (publishError) {
+ // DLQ publish failed (broker rejected, channel closed, etc.).
+ // Do NOT ack — that would lose the message. nack(false)
+ // sends it back through the retry pipeline, where it'll
+ // come right back here on the next attempt with a natural
+ // backoff (the retry-delay-queue TTL). If the underlying
+ // failure is transient, the next attempt's DLQ publish
+ // will succeed.
+ this.logger.error('Failed to publish to DLQ — message will be redelivered', {
+ correlationId: message.correlationId,
+ cause: publishError instanceof Error ? publishError.message : String(publishError),
+ });
+ message.nack(false);
+ return false;
+ }
this.acknowledgeMessage(message);
return false;
}
@@ -47,10 +63,12 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer {
}
// Republish the original AMQP body verbatim so the envelope (including
- // publishedAt) is preserved end-to-end for audit/replay.
- private moveToFinalDeadLetter(message: RabbitMQMessage) {
+ // publishedAt) is preserved end-to-end for audit/replay. The consumer
+ // channel runs in confirm mode (see RunMQConsumerCreator), so awaiting
+ // this publish surfaces broker-side rejections instead of dropping them.
+ private async moveToFinalDeadLetter(message: RabbitMQMessage): Promise {
if (!message.amqpMessage) return;
- message.channel.publish(
+ await message.channel.publish(
Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME,
ConsumerCreatorUtils.getDLQTopicName(this.config.name),
message.amqpMessage.content,
diff --git a/src/core/message/RabbitMQMessage.ts b/src/core/message/RabbitMQMessage.ts
index 15e4ef2..579f75b 100644
--- a/src/core/message/RabbitMQMessage.ts
+++ b/src/core/message/RabbitMQMessage.ts
@@ -1,3 +1,4 @@
+import {RunMQUtils} from "@src/core/utils/RunMQUtils";
import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties";
import {AMQPMessage} from "@src/core/message/AmqpMessage";
import {AMQPChannel} from "@src/types";
@@ -5,8 +6,8 @@ import {AMQPChannel} from "@src/types";
export class RabbitMQMessage {
constructor(
readonly message: any,
- readonly id: string,
- readonly correlationId: string,
+ readonly id: string = RunMQUtils.generateUUID(),
+ readonly correlationId: string = RunMQUtils.generateUUID(),
readonly channel: AMQPChannel,
readonly amqpMessage: AMQPMessage = null,
readonly headers: Record = {}) {
diff --git a/src/core/publisher/producers/RunMQBaseProducer.ts b/src/core/publisher/producers/RunMQBaseProducer.ts
index b220825..d1640cb 100644
--- a/src/core/publisher/producers/RunMQBaseProducer.ts
+++ b/src/core/publisher/producers/RunMQBaseProducer.ts
@@ -8,7 +8,7 @@ export class RunMQBaseProducer implements RunMQPublisher {
constructor(private serializer: Serializer, private exchange = Constants.ROUTER_EXCHANGE_NAME) {
}
- publish(topic: string, message: RabbitMQMessage): void {
+ async publish(topic: string, message: RabbitMQMessage): Promise {
const runMQMessage = new RunMQMessage(
message.message,
new RunMQMessageMeta(
@@ -17,7 +17,7 @@ export class RunMQBaseProducer implements RunMQPublisher {
message.correlationId,
));
const serialized = this.serializer.serialize(runMQMessage);
- message.channel.publish(this.exchange, topic, Buffer.from(serialized), {
+ await message.channel.publish(this.exchange, topic, Buffer.from(serialized), {
correlationId: message.correlationId,
messageId: message.id,
headers: message.headers,
diff --git a/src/core/publisher/producers/RunMQFailureLoggerProducer.ts b/src/core/publisher/producers/RunMQFailureLoggerProducer.ts
index 91e0b2f..c427daf 100644
--- a/src/core/publisher/producers/RunMQFailureLoggerProducer.ts
+++ b/src/core/publisher/producers/RunMQFailureLoggerProducer.ts
@@ -6,12 +6,13 @@ export class RunMQFailureLoggerProducer implements RunMQPublisher {
constructor(private producer: RunMQPublisher, private logger: RunMQLogger) {
}
- publish(topic: string, message: RabbitMQMessage): void {
+ async publish(topic: string, message: RabbitMQMessage): Promise {
try {
- this.producer.publish(topic, message);
+ await this.producer.publish(topic, message);
} catch (e) {
this.logger.error('Message publishing failed', {
- message: message,
+ topic,
+ correlationId: message.correlationId,
error: e instanceof Error ? e.message : JSON.stringify(e),
stack: e instanceof Error ? e.stack : undefined,
});
diff --git a/src/types/index.ts b/src/types/index.ts
index a1ac29b..dd2b7f5 100644
--- a/src/types/index.ts
+++ b/src/types/index.ts
@@ -150,8 +150,20 @@ export interface AMQPChannel {
/**
* Publishes a message to an exchange.
+ *
+ * If `confirmSelect()` was called on this channel, the returned promise
+ * resolves only after the broker acknowledges the message; if the broker
+ * rejects, the promise rejects. Otherwise it resolves once the message is
+ * flushed to the TCP socket.
+ */
+ publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise;
+
+ /**
+ * Enables publisher confirms on this channel. After this is called every
+ * `publish()` waits for broker acknowledgement before resolving.
+ * https://www.rabbitmq.com/confirms.html#publisher-confirms
*/
- publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean;
+ confirmSelect(): Promise;
/**
* Starts consuming messages from a queue.
@@ -221,6 +233,21 @@ export interface RunMQConnectionConfig {
password: string;
};
/**
+ * Controls publisher confirms on the user-publish path. When enabled
+ * (default), `runMQ.publish()` resolves only after RabbitMQ acknowledges
+ * each message, and rejects on broker error (e.g. mandatory routing
+ * failure, alarm state). Set to `false` to opt out and fall back to
+ * fire-and-forget — publish resolves once the message is written to the
+ * TCP socket, with no delivery guarantee.
+ *
+ * Trade-off: confirms add a broker round-trip per publish (typically a
+ * few hundred microseconds). They're the only way to detect silent
+ * publish failures, so we default to safety. DLQ publishes from the
+ * consumer chain are *always* confirmed regardless of this setting —
+ * the message-loss risk there is not negotiable.
+ */
+ usePublisherConfirms?: boolean;
+ /*
* If true, RunMQ includes the full message payload in info/error log
* lines that mention a message (publish success, per-attempt failure,
* max-retries-reached). Off by default — capturing megabyte-sized
@@ -361,5 +388,5 @@ export interface RunMQConsumer {
export interface RunMQPublisher {
- publish: (topic: string, message: RabbitMQMessage) => void;
+ publish: (topic: string, message: RabbitMQMessage) => Promise;
}
\ No newline at end of file
diff --git a/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts b/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts
new file mode 100644
index 0000000..f5a3629
--- /dev/null
+++ b/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts
@@ -0,0 +1,131 @@
+import {RunMQ} from '@src/core/RunMQ';
+import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter";
+import {RabbitMQClientChannel} from "@src/core/clients/RabbitMQClientChannel";
+import {Constants} from "@src/core/constants";
+import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers";
+import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
+import {RunMQUtils} from "@src/core/utils/RunMQUtils";
+import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger";
+import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample";
+import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample";
+import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample";
+import {MessageTestUtils} from "@tests/helpers/MessageTestUtils";
+
+/**
+ * Integration tests for issues #19 + #28: publisher confirms.
+ *
+ * Before this fix, DLQ publishes from RunMQRetriesCheckerProcessor were
+ * fire-and-forget — the message was ack'd immediately even if the DLQ
+ * publish silently failed. That meant a network blip during a
+ * retry-exhausted message's DLQ handoff resulted in permanent message loss.
+ *
+ * The fix: enable publisher confirms on the consumer channel and await the
+ * DLQ publish. On failure, nack(false) so the message goes back through the
+ * retry pipeline (with natural backoff via the retry-delay queue's TTL) and
+ * gets another shot at reaching the DLQ.
+ */
+describe('RunMQ DLQ Publisher Confirms E2E', () => {
+ const validConfig = RunMQConnectionConfigExample.valid();
+
+ beforeEach(() => {
+ jest.restoreAllMocks();
+ });
+
+ afterEach(() => {
+ jest.restoreAllMocks();
+ });
+
+ it('does not lose the message when the DLQ publish fails — message is redelivered', async () => {
+ const configuration = RunMQProcessorConfigurationExample.simpleNoSchema('dlq_confirm_redeliver');
+
+ const testingConnection = new RabbitMQClientAdapter(validConfig);
+ const channel = await testingConnection.getChannel();
+ await ChannelTestHelpers.deleteQueue(channel, configuration.name);
+
+ const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
+ let handlerCalls = 0;
+ await runMQ.process('dlq.confirm', configuration, () => {
+ handlerCalls++;
+ throw new Error('handler intentionally fails');
+ });
+
+ // Make the FIRST DLQ publish fail (simulating broker rejection or
+ // channel closure mid-flight). Subsequent publishes succeed.
+ // This is the scenario where the old code silently lost messages.
+ let dlqPublishesAttempted = 0;
+ const original = RabbitMQClientChannel.prototype.publish;
+ jest.spyOn(RabbitMQClientChannel.prototype, 'publish')
+ .mockImplementation(async function (this: RabbitMQClientChannel, exchange: string, routingKey: string, content: Buffer, options) {
+ if (exchange === Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME) {
+ dlqPublishesAttempted++;
+ if (dlqPublishesAttempted === 1) {
+ throw new Error('simulated broker rejection on first DLQ publish');
+ }
+ }
+ return original.call(this, exchange, routingKey, content, options);
+ });
+
+ try {
+ // Publish a message. Handler fails on first delivery → DLQ publish
+ // attempts → first publish fails → message is nacked back into the
+ // retry pipeline → retry-delay TTL → handler fails again → DLQ
+ // publish succeeds → message lands in DLQ.
+ channel.publish(
+ Constants.ROUTER_EXCHANGE_NAME,
+ 'dlq.confirm',
+ MessageTestUtils.buffer(RunMQMessageExample.random()),
+ );
+
+ // Default attemptsDelay is 100ms (simpleNoSchema config).
+ // We need: handler call, DLQ publish fail, retry-delay wait, handler call, DLQ publish succeeds.
+ await RunMQUtils.delay(2000);
+
+ // Both DLQ publish attempts were made (first failed, second succeeded).
+ expect(dlqPublishesAttempted).toBeGreaterThanOrEqual(2);
+ // Handler ran multiple times due to redelivery — proves the message
+ // was nack'd back, NOT silently dropped.
+ expect(handlerCalls).toBeGreaterThanOrEqual(2);
+ // Message landed in DLQ on the second attempt.
+ await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1);
+ // Main queue is empty.
+ await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0);
+ } finally {
+ await runMQ.disconnect();
+ await testingConnection.disconnect();
+ }
+ });
+
+ it('confirms DLQ publishes by default — broker ack required before original is acked', async () => {
+ const configuration = RunMQProcessorConfigurationExample.random(
+ 'dlq_confirm_happy_path',
+ 1,
+ 1, // attempts=1, so first failure → DLQ
+ 100,
+ );
+
+ const testingConnection = new RabbitMQClientAdapter(validConfig);
+ const channel = await testingConnection.getChannel();
+ await ChannelTestHelpers.deleteQueue(channel, configuration.name);
+
+ const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
+ await runMQ.process('dlq.happy', configuration, () => {
+ throw new Error('handler always fails');
+ });
+
+ channel.publish(
+ Constants.ROUTER_EXCHANGE_NAME,
+ 'dlq.happy',
+ MessageTestUtils.buffer(RunMQMessageExample.random()),
+ );
+
+ await RunMQUtils.delay(500);
+
+ // Message should be in DLQ (with confirm received from broker).
+ await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1);
+ await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0);
+ await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0);
+
+ await runMQ.disconnect();
+ await testingConnection.disconnect();
+ });
+});
diff --git a/tests/e2e/RunMQ.e2e.test.ts b/tests/e2e/RunMQ.e2e.test.ts
index a744f51..ba5ce68 100644
--- a/tests/e2e/RunMQ.e2e.test.ts
+++ b/tests/e2e/RunMQ.e2e.test.ts
@@ -165,12 +165,11 @@ describe('RunMQ E2E Tests', () => {
await testingConnection.disconnect();
})
- it("should throw error when publishing invalid message", async () => {
+ it("should reject when publishing invalid message", async () => {
const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
- expect(() => {
- runMQ.publish("user.created", "invalid message" as any);
- }).toThrow(RunMQException);
+ await expect(runMQ.publish("user.created", "invalid message" as any))
+ .rejects.toThrow(RunMQException);
await runMQ.disconnect();
});
diff --git a/tests/mocks/MockedAMQPChannel.ts b/tests/mocks/MockedAMQPChannel.ts
index e46936d..b75ec63 100644
--- a/tests/mocks/MockedAMQPChannel.ts
+++ b/tests/mocks/MockedAMQPChannel.ts
@@ -39,7 +39,7 @@ export class MockedAMQPChannel implements AMQPChannel {
bindQueue = jest.fn, [string, string, string, Record?]>().mockResolvedValue();
- publish = jest.fn().mockReturnValue(true);
+ publish = jest.fn, [string, string, Buffer, AMQPPublishOptions?]>().mockResolvedValue();
consume = jest.fn, [string, (msg: ConsumeMessage | null) => void, AMQPConsumeOptions?]>().mockResolvedValue({
consumerTag: 'test-consumer-tag'
@@ -51,6 +51,8 @@ export class MockedAMQPChannel implements AMQPChannel {
prefetch = jest.fn, [number, boolean?]>().mockResolvedValue();
+ confirmSelect = jest.fn, []>().mockResolvedValue();
+
close = jest.fn, []>().mockResolvedValue();
}
diff --git a/tests/mocks/MockedRabbitMQChannel.ts b/tests/mocks/MockedRabbitMQChannel.ts
index 91af045..bfaa059 100644
--- a/tests/mocks/MockedRabbitMQChannel.ts
+++ b/tests/mocks/MockedRabbitMQChannel.ts
@@ -9,12 +9,13 @@ export class MockedRabbitMQChannel implements AMQPChannel {
deleteExchange = jest.fn();
checkExchange = jest.fn();
assertExchange = jest.fn();
- publish = jest.fn();
+ publish = jest.fn().mockResolvedValue(undefined);
consume = jest.fn();
get = jest.fn();
ack = jest.fn();
nack = jest.fn();
prefetch = jest.fn();
+ confirmSelect = jest.fn().mockResolvedValue(undefined);
close = jest.fn();
connection: Connection = {} as Connection;
on = jest.fn();
diff --git a/tests/mocks/MockedRunMQPublisher.ts b/tests/mocks/MockedRunMQPublisher.ts
index eb31b35..030e926 100644
--- a/tests/mocks/MockedRunMQPublisher.ts
+++ b/tests/mocks/MockedRunMQPublisher.ts
@@ -1,5 +1,5 @@
import {RunMQPublisher} from "@src/types";
export class MockedRabbitMQPublisher implements RunMQPublisher {
- public publish = jest.fn();
+ public publish = jest.fn().mockResolvedValue(undefined);
}
\ No newline at end of file
diff --git a/tests/unit/core/RunMQ.test.ts b/tests/unit/core/RunMQ.test.ts
index 6036933..a20d586 100644
--- a/tests/unit/core/RunMQ.test.ts
+++ b/tests/unit/core/RunMQ.test.ts
@@ -38,7 +38,7 @@ describe('RunMQ Unit Tests', () => {
const setupPublisherMock = () => {
const mockPublisherCreator = RunMQPublisherCreator as jest.MockedClass;
- const mockPublisher = {publish: jest.fn()};
+ const mockPublisher = {publish: jest.fn().mockResolvedValue(undefined)};
mockPublisherCreator.prototype.createPublisher.mockReturnValue(mockPublisher as any);
return {mockPublisherCreator, mockPublisher};
};
@@ -143,13 +143,12 @@ describe('RunMQ Unit Tests', () => {
});
describe('producer', () => {
- it('should throw error if message is not a valid record', async () => {
+ it('should reject if message is not a valid record', async () => {
setupSuccessfulClientMock();
const runMQ = await RunMQ.start(validConfig);
- expect(() => {
- runMQ.publish('test.topic', "invalid message" as any);
- }).toThrow(RunMQException);
+ await expect(runMQ.publish('test.topic', "invalid message" as any))
+ .rejects.toThrow(RunMQException);
});
it('should publish message correctly if valid record', async () => {
@@ -157,7 +156,7 @@ describe('RunMQ Unit Tests', () => {
const {mockPublisher} = setupPublisherMock();
const runMQ = await RunMQ.start(validConfig);
- runMQ.publish('test.topic', MessageExample.person());
+ await runMQ.publish('test.topic', MessageExample.person());
expect(mockPublisher.publish).toHaveBeenCalledWith('test.topic', expect.any(Object));
});
@@ -168,7 +167,7 @@ describe('RunMQ Unit Tests', () => {
(RunMQUtils.generateUUID as jest.Mock).mockReturnValue('msg-uuid');
const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
- runMQ.publish('test.topic', MessageExample.person(), 'corr-1');
+ await runMQ.publish('test.topic', MessageExample.person(), 'corr-1');
expect(MockedRunMQLogger.info).toHaveBeenCalledWith('Published message', {
topic: 'test.topic',
@@ -187,7 +186,7 @@ describe('RunMQ Unit Tests', () => {
MockedRunMQLogger,
);
const payload = MessageExample.person();
- runMQ.publish('test.topic', payload, 'corr-1');
+ await runMQ.publish('test.topic', payload, 'corr-1');
expect(MockedRunMQLogger.info).toHaveBeenCalledWith('Published message', {
topic: 'test.topic',
diff --git a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts
index 7674205..7f99f2a 100644
--- a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts
+++ b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts
@@ -28,11 +28,11 @@ describe('RunMQBaseProducer Unit Tests', () => {
});
describe('publish', () => {
- it('should create RunMQMessage with generated ID and current timestamp', () => {
+ it('should create RunMQMessage with generated ID and current timestamp', async () => {
const testMessage = MockedRabbitMQMessage;
const testTopic = 'test.topic';
- producer.publish(testTopic, testMessage);
+ await producer.publish(testTopic, testMessage);
expect(RunMQMessage).toHaveBeenCalledWith(
testMessage.message,
@@ -44,11 +44,11 @@ describe('RunMQBaseProducer Unit Tests', () => {
);
});
- it('should serialize the RunMQMessage', () => {
+ it('should serialize the RunMQMessage', async () => {
const testMessage = MockedRabbitMQMessage;
const testTopic = 'test.topic';
- producer.publish(testTopic, testMessage);
+ await producer.publish(testTopic, testMessage);
expect(mockedSerializer.serialize).toHaveBeenCalledTimes(1);
expect(mockedSerializer.serialize).toHaveBeenCalledWith(
@@ -63,7 +63,7 @@ describe('RunMQBaseProducer Unit Tests', () => {
);
});
- it('should publish to the correct exchange and routing key', () => {
+ it('should publish to the correct exchange and routing key', async () => {
jest.useFakeTimers().setSystemTime(new Date('2025-10-10T00:00:00Z'));
const message = RunMQMessageExample.person();
const testMessage = mockedRabbitMQMessageWithChannelAndMessage(
@@ -75,7 +75,7 @@ describe('RunMQBaseProducer Unit Tests', () => {
const testTopic = 'test.topic';
const serializedMessage = JSON.stringify(message);
- producer.publish(testTopic, testMessage);
+ await producer.publish(testTopic, testMessage);
expect(mockedChannel.publish).toHaveBeenCalledWith(
Constants.ROUTER_EXCHANGE_NAME,
diff --git a/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts b/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts
index 168370e..4b55b7d 100644
--- a/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts
+++ b/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts
@@ -10,136 +10,83 @@ describe('RunMQFailureLoggerProducer Unit Tests', () => {
MockedRunMQLogger
);
+ beforeEach(() => {
+ jest.clearAllMocks();
+ mockProducer.publish.mockResolvedValue(undefined);
+ });
+
describe('publish', () => {
- it('should delegate to wrapped producer when publish succeeds', () => {
+ it('should delegate to wrapped producer when publish succeeds', async () => {
const testTopic = 'test.topic';
const testMessage = MockedRabbitMQMessage;
- failureLoggerProducer.publish(testTopic, testMessage);
+ await failureLoggerProducer.publish(testTopic, testMessage);
expect(mockProducer.publish).toHaveBeenCalledWith(testTopic, testMessage);
expect(MockedRunMQLogger.error).not.toHaveBeenCalled();
});
- it('should log error and rethrow when publish fails', () => {
+ it('should log error and rethrow when publish rejects', async () => {
const testTopic = 'test.topic';
const testMessage = MockedRabbitMQMessage;
const publishError = new Error('Publish failed');
+ mockProducer.publish.mockRejectedValueOnce(publishError);
- mockProducer.publish.mockImplementation(() => {
- throw publishError;
- });
-
- expect(() => {
- failureLoggerProducer.publish(testTopic, testMessage);
- }).toThrow('Publish failed');
+ await expect(failureLoggerProducer.publish(testTopic, testMessage))
+ .rejects.toThrow('Publish failed');
expect(mockProducer.publish).toHaveBeenCalledWith(testTopic, testMessage);
expect(MockedRunMQLogger.error).toHaveBeenCalledWith(
'Message publishing failed',
- {
- message: testMessage,
+ expect.objectContaining({
+ topic: testTopic,
+ correlationId: testMessage.correlationId,
error: 'Publish failed',
stack: publishError.stack,
- }
+ })
);
});
- it('should handle non-Error exceptions', () => {
+ it('should handle non-Error rejections', async () => {
const testTopic = 'test.topic';
const testMessage = MockedRabbitMQMessage;
- const publishError = 'String error';
-
- mockProducer.publish.mockImplementation(() => {
- throw publishError;
- });
+ mockProducer.publish.mockRejectedValueOnce('String error');
- expect(() => {
- failureLoggerProducer.publish(testTopic, testMessage);
- }).toThrow('String error');
+ await expect(failureLoggerProducer.publish(testTopic, testMessage))
+ .rejects.toBe('String error');
expect(MockedRunMQLogger.error).toHaveBeenCalledWith(
'Message publishing failed',
- {
- message: testMessage,
- error: JSON.stringify(publishError),
+ expect.objectContaining({
+ topic: testTopic,
+ correlationId: testMessage.correlationId,
+ error: JSON.stringify('String error'),
stack: undefined,
- }
+ })
);
});
- it('should handle complex message objects in error logging', () => {
- const testTopic = 'test.topic';
- const testMessage = MockedRabbitMQMessage;
-
- const publishError = new Error('Complex message error');
-
- mockProducer.publish.mockImplementation(() => {
- throw publishError;
- });
-
- expect(() => {
- failureLoggerProducer.publish(testTopic, testMessage);
- }).toThrow('Complex message error');
-
- expect(MockedRunMQLogger.error).toHaveBeenCalledWith(
- 'Message publishing failed',
- {
- message: testMessage,
- error: 'Complex message error',
- stack: publishError.stack,
- }
- );
- });
-
- it('should handle null/undefined message content in error logging', () => {
- const testTopic = 'test.topic';
- const testMessage = MockedRabbitMQMessage;
-
- const publishError = new Error('Null message error');
-
- mockProducer.publish.mockImplementation(() => {
- throw publishError;
- });
-
- expect(() => {
- failureLoggerProducer.publish(testTopic, testMessage);
- }).toThrow('Null message error');
-
- expect(MockedRunMQLogger.error).toHaveBeenCalledWith(
- 'Message publishing failed',
- {
- message: testMessage,
- error: 'Null message error',
- stack: publishError.stack,
- }
- );
- });
-
- it('should preserve original error when rethrowing', () => {
+ it('should preserve original error when rethrowing', async () => {
const testTopic = 'test.topic';
const testMessage = MockedRabbitMQMessage;
const originalError = new Error('Original error');
originalError.name = 'CustomError';
+ mockProducer.publish.mockRejectedValueOnce(originalError);
- mockProducer.publish.mockImplementation(() => {
- throw originalError;
- });
-
- expect(() => {
- failureLoggerProducer.publish(testTopic, testMessage);
- }).toThrow(originalError);
+ await expect(failureLoggerProducer.publish(testTopic, testMessage))
+ .rejects.toBe(originalError);
expect(MockedRunMQLogger.error).toHaveBeenCalledWith(
'Message publishing failed',
- {
- message: testMessage,
+ expect.objectContaining({
+ topic: testTopic,
+ correlationId: testMessage.correlationId,
error: 'Original error',
stack: originalError.stack,
- }
+ })
);
});
});