Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ What's happening here:
### Publish a message

```typescript
runMQ.publish('user.created', {
await runMQ.publish('user.created', {
userId: '123',
email: 'user@example.com',
name: 'John Doe'
Expand All @@ -108,6 +108,8 @@ runMQ.publish('user.created', {

✅ One publish, every subscribed processor receives the message — independently and atomically.

✅ **Confirmed delivery by default.** `runMQ.publish()` returns a promise that resolves only after RabbitMQ has accepted the message; if the broker rejects it (alarm state, mandatory routing failure, etc.), the promise rejects so your code can handle it. Set `usePublisherConfirms: false` in the connection config to opt out and fall back to fire-and-forget publishing if per-publish round-trip latency matters more to you than detecting silent drops.

<br>

## Patterns RunMQ fits naturally
Expand Down
17 changes: 14 additions & 3 deletions src/core/RunMQ.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,26 @@ export class RunMQ {
}

/**
* Publishes a message to the specified topic with an optional correlation ID
* Publishes a message to the specified topic with an optional correlation ID.
*
* If publisher confirms are enabled (`usePublisherConfirms: true` in the
* connection config), the returned promise resolves only after RabbitMQ
* acknowledges the message; if the broker rejects, the promise rejects.
* Otherwise it resolves once the message is flushed to the TCP socket
* (fire-and-forget, no delivery guarantee — same behavior as before
* publisher confirms were introduced).
*
* @param topic The name of the topic to publish the message to
* @param message The message payload to be published
* @param correlationId (Optional) A unique identifier for correlating messages; if not provided, a new UUID will be generated
*/
public publish(topic: string, message: Record<string, any>, correlationId: string = RunMQUtils.generateUUID()): void {
public async publish(topic: string, message: Record<string, any>, correlationId: string = RunMQUtils.generateUUID()): Promise<void> {
if (!this.publisher || !this.publishChannel) {
throw new RunMQException(Exceptions.NOT_INITIALIZED, {});
}
RunMQUtils.assertRecord(message);
const messageId = RunMQUtils.generateUUID();
this.publisher.publish(topic,
await this.publisher.publish(topic,
RabbitMQMessage.from(
message,
this.publishChannel,
Expand Down Expand Up @@ -148,6 +156,9 @@ export class RunMQ {
// Use a dedicated channel for publishes so a setup-time channel close
// (e.g. a precondition_failed on assertQueue) cannot break the publish path.
this.publishChannel = await this.client.getChannel();
if (this.config.usePublisherConfirms !== false) {
await this.publishChannel.confirmSelect();
}
this.publisher = new RunMQPublisherCreator(this.logger).createPublisher();
}
}
9 changes: 6 additions & 3 deletions src/core/clients/RabbitMQClientChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ export class RabbitMQClientChannel implements AMQPChannel {
});
}

publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean {
this.channel.basicPublish({
async publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise<void> {
await this.channel.basicPublish({
exchange,
routingKey,
correlationId: options?.correlationId,
Expand All @@ -124,7 +124,10 @@ export class RabbitMQClientChannel implements AMQPChannel {
userId: options?.userId,
appId: options?.appId,
}, content);
return true;
}

async confirmSelect(): Promise<void> {
await this.channel.confirmSelect();
}

async consume(
Expand Down
13 changes: 7 additions & 6 deletions src/core/consumer/RunMQConsumerCreator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolic
import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
import {RunMQException} from "@src/core/exceptions/RunMQException";
import {Exceptions} from "@src/core/exceptions/Exceptions";
import {RunMQUtils} from "@src/core/utils/RunMQUtils";

export class RunMQConsumerCreator {
private ttlPolicyManager: RunMQTTLPolicyManager;
Expand Down Expand Up @@ -69,17 +68,19 @@ export class RunMQConsumerCreator {
});
const DLQPublisher = new RunMQPublisherCreator(this.logger).createPublisher(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME);

// Always enable publisher confirms on the consumer channel: DLQ
// publishes flow through this channel and we cannot tolerate silent
// drops there (issue #19).
await consumerChannel.confirmSelect();

const prefetchCount = consumerConfiguration.processorConfig.prefetch ?? DEFAULTS.PREFETCH_COUNT;
await consumerChannel.prefetch(prefetchCount);
await consumerChannel.consume(consumerConfiguration.processorConfig.name, async (msg) => {
if (!msg) return;
const rabbitmqMessage = new RabbitMQMessage(
msg.content.toString(),
// Synthesize ids when an external (non-RunMQ) publisher did
// not set the AMQP messageId/correlationId — keeps log tracing
// consistent for cross-tenant queues.
msg.properties.messageId ?? RunMQUtils.generateUUID(),
msg.properties.correlationId ?? RunMQUtils.generateUUID(),
msg.properties.messageId,
msg.properties.correlationId,
consumerChannel,
msg,
msg.properties.headers,
Expand Down
26 changes: 22 additions & 4 deletions src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,23 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer {
} catch (e: unknown) {
if (this.hasReachedMaxRetries(message)) {
this.logMaxRetriesReached(message);
this.moveToFinalDeadLetter(message);
try {
await this.moveToFinalDeadLetter(message);
} catch (publishError) {
// DLQ publish failed (broker rejected, channel closed, etc.).
// Do NOT ack — that would lose the message. nack(false)
// sends it back through the retry pipeline, where it'll
// come right back here on the next attempt with a natural
// backoff (the retry-delay-queue TTL). If the underlying
// failure is transient, the next attempt's DLQ publish
// will succeed.
this.logger.error('Failed to publish to DLQ — message will be redelivered', {
correlationId: message.correlationId,
cause: publishError instanceof Error ? publishError.message : String(publishError),
});
message.nack(false);
return false;
}
this.acknowledgeMessage(message);
return false;
}
Expand All @@ -47,10 +63,12 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer {
}

// Republish the original AMQP body verbatim so the envelope (including
// publishedAt) is preserved end-to-end for audit/replay.
private moveToFinalDeadLetter(message: RabbitMQMessage) {
// publishedAt) is preserved end-to-end for audit/replay. The consumer
// channel runs in confirm mode (see RunMQConsumerCreator), so awaiting
// this publish surfaces broker-side rejections instead of dropping them.
private async moveToFinalDeadLetter(message: RabbitMQMessage): Promise<void> {
if (!message.amqpMessage) return;
message.channel.publish(
await message.channel.publish(
Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME,
ConsumerCreatorUtils.getDLQTopicName(this.config.name),
message.amqpMessage.content,
Expand Down
5 changes: 3 additions & 2 deletions src/core/message/RabbitMQMessage.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties";
import {AMQPMessage} from "@src/core/message/AmqpMessage";
import {AMQPChannel} from "@src/types";

export class RabbitMQMessage {
constructor(
readonly message: any,
readonly id: string,
readonly correlationId: string,
readonly id: string = RunMQUtils.generateUUID(),
readonly correlationId: string = RunMQUtils.generateUUID(),
readonly channel: AMQPChannel,
readonly amqpMessage: AMQPMessage = null,
readonly headers: Record<string, any> = {}) {
Expand Down
4 changes: 2 additions & 2 deletions src/core/publisher/producers/RunMQBaseProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class RunMQBaseProducer implements RunMQPublisher {
constructor(private serializer: Serializer, private exchange = Constants.ROUTER_EXCHANGE_NAME) {
}

publish(topic: string, message: RabbitMQMessage): void {
async publish(topic: string, message: RabbitMQMessage): Promise<void> {
const runMQMessage = new RunMQMessage(
message.message,
new RunMQMessageMeta(
Expand All @@ -17,7 +17,7 @@ export class RunMQBaseProducer implements RunMQPublisher {
message.correlationId,
));
const serialized = this.serializer.serialize(runMQMessage);
message.channel.publish(this.exchange, topic, Buffer.from(serialized), {
await message.channel.publish(this.exchange, topic, Buffer.from(serialized), {
correlationId: message.correlationId,
messageId: message.id,
headers: message.headers,
Expand Down
7 changes: 4 additions & 3 deletions src/core/publisher/producers/RunMQFailureLoggerProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ export class RunMQFailureLoggerProducer implements RunMQPublisher {
constructor(private producer: RunMQPublisher, private logger: RunMQLogger) {
}

publish(topic: string, message: RabbitMQMessage): void {
async publish(topic: string, message: RabbitMQMessage): Promise<void> {
try {
this.producer.publish(topic, message);
await this.producer.publish(topic, message);
} catch (e) {
this.logger.error('Message publishing failed', {
message: message,
topic,
correlationId: message.correlationId,
error: e instanceof Error ? e.message : JSON.stringify(e),
stack: e instanceof Error ? e.stack : undefined,
});
Expand Down
31 changes: 29 additions & 2 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,20 @@ export interface AMQPChannel {

/**
* Publishes a message to an exchange.
*
* If `confirmSelect()` was called on this channel, the returned promise
* resolves only after the broker acknowledges the message; if the broker
* rejects, the promise rejects. Otherwise it resolves once the message is
* flushed to the TCP socket.
*/
publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise<void>;

/**
* Enables publisher confirms on this channel. After this is called every
* `publish()` waits for broker acknowledgement before resolving.
* https://www.rabbitmq.com/confirms.html#publisher-confirms
*/
publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean;
confirmSelect(): Promise<void>;

/**
* Starts consuming messages from a queue.
Expand Down Expand Up @@ -221,6 +233,21 @@ export interface RunMQConnectionConfig {
password: string;
};
/**
* Controls publisher confirms on the user-publish path. When enabled
* (default), `runMQ.publish()` resolves only after RabbitMQ acknowledges
* each message, and rejects on broker error (e.g. mandatory routing
* failure, alarm state). Set to `false` to opt out and fall back to
* fire-and-forget — publish resolves once the message is written to the
* TCP socket, with no delivery guarantee.
*
* Trade-off: confirms add a broker round-trip per publish (typically a
* few hundred microseconds). They're the only way to detect silent
* publish failures, so we default to safety. DLQ publishes from the
* consumer chain are *always* confirmed regardless of this setting —
* the message-loss risk there is not negotiable.
*/
usePublisherConfirms?: boolean;
/*
* If true, RunMQ includes the full message payload in info/error log
* lines that mention a message (publish success, per-attempt failure,
* max-retries-reached). Off by default — capturing megabyte-sized
Expand Down Expand Up @@ -361,5 +388,5 @@ export interface RunMQConsumer {


export interface RunMQPublisher {
publish: (topic: string, message: RabbitMQMessage) => void;
publish: (topic: string, message: RabbitMQMessage) => Promise<void>;
}
131 changes: 131 additions & 0 deletions tests/e2e/RunMQ.dlqConfirm.e2e.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import {RunMQ} from '@src/core/RunMQ';
import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter";
import {RabbitMQClientChannel} from "@src/core/clients/RabbitMQClientChannel";
import {Constants} from "@src/core/constants";
import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers";
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger";
import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample";
import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample";
import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample";
import {MessageTestUtils} from "@tests/helpers/MessageTestUtils";

/**
* Integration tests for issues #19 + #28: publisher confirms.
*
* Before this fix, DLQ publishes from RunMQRetriesCheckerProcessor were
* fire-and-forget — the message was ack'd immediately even if the DLQ
* publish silently failed. That meant a network blip during a
* retry-exhausted message's DLQ handoff resulted in permanent message loss.
*
* The fix: enable publisher confirms on the consumer channel and await the
* DLQ publish. On failure, nack(false) so the message goes back through the
* retry pipeline (with natural backoff via the retry-delay queue's TTL) and
* gets another shot at reaching the DLQ.
*/
describe('RunMQ DLQ Publisher Confirms E2E', () => {
const validConfig = RunMQConnectionConfigExample.valid();

beforeEach(() => {
jest.restoreAllMocks();
});

afterEach(() => {
jest.restoreAllMocks();
});

it('does not lose the message when the DLQ publish fails — message is redelivered', async () => {
const configuration = RunMQProcessorConfigurationExample.simpleNoSchema('dlq_confirm_redeliver');

const testingConnection = new RabbitMQClientAdapter(validConfig);
const channel = await testingConnection.getChannel();
await ChannelTestHelpers.deleteQueue(channel, configuration.name);

const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
let handlerCalls = 0;
await runMQ.process('dlq.confirm', configuration, () => {
handlerCalls++;
throw new Error('handler intentionally fails');
});

// Make the FIRST DLQ publish fail (simulating broker rejection or
// channel closure mid-flight). Subsequent publishes succeed.
// This is the scenario where the old code silently lost messages.
let dlqPublishesAttempted = 0;
const original = RabbitMQClientChannel.prototype.publish;
jest.spyOn(RabbitMQClientChannel.prototype, 'publish')
.mockImplementation(async function (this: RabbitMQClientChannel, exchange: string, routingKey: string, content: Buffer, options) {
if (exchange === Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME) {
dlqPublishesAttempted++;
if (dlqPublishesAttempted === 1) {
throw new Error('simulated broker rejection on first DLQ publish');
}
}
return original.call(this, exchange, routingKey, content, options);
});

try {
// Publish a message. Handler fails on first delivery → DLQ publish
// attempts → first publish fails → message is nacked back into the
// retry pipeline → retry-delay TTL → handler fails again → DLQ
// publish succeeds → message lands in DLQ.
channel.publish(
Constants.ROUTER_EXCHANGE_NAME,
'dlq.confirm',
MessageTestUtils.buffer(RunMQMessageExample.random()),
);

// Default attemptsDelay is 100ms (simpleNoSchema config).
// We need: handler call, DLQ publish fail, retry-delay wait, handler call, DLQ publish succeeds.
await RunMQUtils.delay(2000);

// Both DLQ publish attempts were made (first failed, second succeeded).
expect(dlqPublishesAttempted).toBeGreaterThanOrEqual(2);
// Handler ran multiple times due to redelivery — proves the message
// was nack'd back, NOT silently dropped.
expect(handlerCalls).toBeGreaterThanOrEqual(2);
// Message landed in DLQ on the second attempt.
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1);
// Main queue is empty.
await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0);
} finally {
await runMQ.disconnect();
await testingConnection.disconnect();
}
});

it('confirms DLQ publishes by default — broker ack required before original is acked', async () => {
const configuration = RunMQProcessorConfigurationExample.random(
'dlq_confirm_happy_path',
1,
1, // attempts=1, so first failure → DLQ
100,
);

const testingConnection = new RabbitMQClientAdapter(validConfig);
const channel = await testingConnection.getChannel();
await ChannelTestHelpers.deleteQueue(channel, configuration.name);

const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
await runMQ.process('dlq.happy', configuration, () => {
throw new Error('handler always fails');
});

channel.publish(
Constants.ROUTER_EXCHANGE_NAME,
'dlq.happy',
MessageTestUtils.buffer(RunMQMessageExample.random()),
);

await RunMQUtils.delay(500);

// Message should be in DLQ (with confirm received from broker).
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1);
await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0);
await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0);

await runMQ.disconnect();
await testingConnection.disconnect();
});
});
Loading
Loading