diff --git a/.github/workflows/test-multi-version.yml b/.github/workflows/test-multi-version.yml index f675cc4..bd25ded 100644 --- a/.github/workflows/test-multi-version.yml +++ b/.github/workflows/test-multi-version.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - node-version: ${{ github.event_name == 'pull_request' && fromJSON('["22"]') || fromJSON('["16", "18", "20", "22", "24"]') }} + node-version: ${{ github.event_name == 'pull_request' && fromJSON('["22"]') || fromJSON('["18", "20", "22", "24"]') }} rabbitmq-version: - "3.9-management" - "3.10-management" diff --git a/package.json b/package.json index eb116d0..b6518b1 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,9 @@ "url": "https://github.com/runmq/queue/issues" }, "homepage": "https://github.com/runmq/queue#readme", + "engines": { + "node": ">=18" + }, "devDependencies": { "@eslint/js": "^9.29.0", "@faker-js/faker": "^9.9.0", @@ -55,10 +58,10 @@ "@semantic-release/git": "^10.0.1", "@semantic-release/github": "^11.0.1", "@semantic-release/release-notes-generator": "^11.0.7", - "@types/jest": "^29.5.12", + "@types/jest": "^30.0.0", "@types/node": "^22.0.0", "eslint": "^9.29.0", - "jest": "^29.7.0", + "jest": "^30.0.3", "patch-package": "^8.0.1", "semantic-release": "^24.2.0", "ts-jest": "^29.4.0", @@ -71,9 +74,6 @@ "ajv": "8.18.0", "rabbitmq-client": "^5.0.0" }, - "engines": { - "node": ">=16" - }, "release": { "branches": [ "main" diff --git a/src/core/management/RabbitMQManagementClient.ts b/src/core/management/RabbitMQManagementClient.ts index 3e4931d..b040b68 100644 --- a/src/core/management/RabbitMQManagementClient.ts +++ b/src/core/management/RabbitMQManagementClient.ts @@ -1,17 +1,7 @@ -import * as http from "node:http"; -import * as https from "node:https"; import {RunMQLogger} from "@src/core/logging/RunMQLogger"; import {RabbitMQManagementConfig} from "@src"; import {RabbitMQOperatorPolicy} from "@src/types"; -interface ManagementResponse { - status: number; - ok: boolean; - body: string; -} - -const REQUEST_TIMEOUT_MS = 10_000; - export class RabbitMQManagementClient { constructor( private config: RabbitMQManagementConfig, @@ -23,69 +13,27 @@ export class RabbitMQManagementClient { return `Basic ${credentials}`; } - private request( - urlString: string, - method: string, - body?: unknown - ): Promise { - const url = new URL(urlString); - const lib = url.protocol === 'https:' ? https : http; - const payload = body !== undefined ? JSON.stringify(body) : undefined; - - const headers: Record = { - 'Authorization': this.getAuthHeader() - }; - if (payload !== undefined) { - headers['Content-Type'] = 'application/json'; - headers['Content-Length'] = Buffer.byteLength(payload).toString(); - } - - return new Promise((resolve, reject) => { - const req = lib.request( - { - protocol: url.protocol, - hostname: url.hostname, - port: url.port || (url.protocol === 'https:' ? 443 : 80), - path: `${url.pathname}${url.search}`, - method, - headers, - timeout: REQUEST_TIMEOUT_MS - }, - (res) => { - const chunks: Buffer[] = []; - res.on('data', (chunk: Buffer) => chunks.push(chunk)); - res.on('end', () => { - const status = res.statusCode ?? 0; - resolve({ - status, - ok: status >= 200 && status < 300, - body: Buffer.concat(chunks).toString('utf8') - }); - }); - res.on('error', reject); - } - ); - req.on('timeout', () => { - req.destroy(new Error(`Request timed out after ${REQUEST_TIMEOUT_MS}ms`)); - }); - req.on('error', reject); - if (payload !== undefined) req.write(payload); - req.end(); - }); - } - public async createOrUpdateOperatorPolicy(vhost: string, policy: RabbitMQOperatorPolicy): Promise { try { const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policy.name)}`; - const response = await this.request(url, 'PUT', { - pattern: policy.pattern, - definition: policy.definition, - priority: policy.priority || 0, - "apply-to": policy["apply-to"] + + const response = await fetch(url, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Authorization': this.getAuthHeader() + }, + body: JSON.stringify({ + pattern: policy.pattern, + definition: policy.definition, + priority: policy.priority || 0, + "apply-to": policy["apply-to"] + }) }); if (!response.ok) { - this.logger.error(`Failed to create operator policy: ${response.status} - ${response.body}`); + const error = await response.text(); + this.logger.error(`Failed to create operator policy: ${response.status} - ${error}`); return false; } @@ -100,17 +48,24 @@ export class RabbitMQManagementClient { public async getOperatorPolicy(vhost: string, policyName: string): Promise { try { const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`; - const response = await this.request(url, 'GET'); + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': this.getAuthHeader() + } + }); if (!response.ok) { if (response.status === 404) { return null; } - this.logger.error(`Failed to get operator policy: ${response.status} - ${response.body}`); + const error = await response.text(); + this.logger.error(`Failed to get operator policy: ${response.status} - ${error}`); return null; } - return JSON.parse(response.body); + return await response.json(); } catch (error) { this.logger.error(`Error getting operator policy: ${error}`); return null; @@ -120,10 +75,17 @@ export class RabbitMQManagementClient { public async deleteOperatorPolicy(vhost: string, policyName: string): Promise { try { const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`; - const response = await this.request(url, 'DELETE'); + + const response = await fetch(url, { + method: 'DELETE', + headers: { + 'Authorization': this.getAuthHeader() + } + }); if (!response.ok && response.status !== 404) { - this.logger.error(`Failed to delete operator policy: ${response.status} - ${response.body}`); + const error = await response.text(); + this.logger.error(`Failed to delete operator policy: ${response.status} - ${error}`); return false; } @@ -138,7 +100,14 @@ export class RabbitMQManagementClient { public async checkManagementPluginEnabled(): Promise { try { const url = `${this.config.url}/api/overview`; - const response = await this.request(url, 'GET'); + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': this.getAuthHeader() + } + }); + return response.ok; } catch (error) { this.logger.warn(`Management plugin not accessible: ${error}`); @@ -159,10 +128,19 @@ export class RabbitMQManagementClient { ): Promise { try { const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`; - const response = await this.request(url, 'PUT', {value}); + + const response = await fetch(url, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Authorization': this.getAuthHeader() + }, + body: JSON.stringify({value}) + }); if (!response.ok) { - this.logger.error(`Failed to set parameter ${name}: ${response.status} - ${response.body}`); + const error = await response.text(); + this.logger.error(`Failed to set parameter ${name}: ${response.status} - ${error}`); return false; } @@ -184,17 +162,24 @@ export class RabbitMQManagementClient { ): Promise { try { const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`; - const response = await this.request(url, 'GET'); + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': this.getAuthHeader() + } + }); if (!response.ok) { if (response.status === 404) { return null; } - this.logger.error(`Failed to get parameter ${name}: ${response.status} - ${response.body}`); + const error = await response.text(); + this.logger.error(`Failed to get parameter ${name}: ${response.status} - ${error}`); return null; } - const data = JSON.parse(response.body); + const data = await response.json(); return data.value as T; } catch (error) { this.logger.error(`Error getting parameter: ${error}`); @@ -212,10 +197,17 @@ export class RabbitMQManagementClient { ): Promise { try { const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`; - const response = await this.request(url, 'DELETE'); + + const response = await fetch(url, { + method: 'DELETE', + headers: { + 'Authorization': this.getAuthHeader() + } + }); if (!response.ok && response.status !== 404) { - this.logger.error(`Failed to delete parameter ${name}: ${response.status} - ${response.body}`); + const error = await response.text(); + this.logger.error(`Failed to delete parameter ${name}: ${response.status} - ${error}`); return false; } @@ -226,4 +218,4 @@ export class RabbitMQManagementClient { return false; } } -} +} \ No newline at end of file diff --git a/tests/unit/core/management/RabbitMQManagementClient.test.ts b/tests/unit/core/management/RabbitMQManagementClient.test.ts index 8cefb2d..8b7673a 100644 --- a/tests/unit/core/management/RabbitMQManagementClient.test.ts +++ b/tests/unit/core/management/RabbitMQManagementClient.test.ts @@ -7,7 +7,7 @@ import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; describe('RabbitMQManagementClient', () => { let client: RabbitMQManagementClient; let logger: RunMQConsoleLogger; - let requestSpy: jest.SpyInstance; + let fetchSpy: jest.SpyInstance; const MANAGEMENT_URL = "http://localhost:15673/api/operator-policies/%2F/" beforeEach(() => { @@ -17,10 +17,8 @@ describe('RabbitMQManagementClient', () => { logger ); - // Spy on the private node:http/https request helper. The boundary is - // intentionally kept private to the class — tests reach through the - // TS-private modifier (which has no runtime effect) to mock it. - requestSpy = jest.spyOn(client as unknown as { request: () => unknown }, 'request'); + global.fetch = jest.fn(); + fetchSpy = jest.spyOn(global, 'fetch'); }); afterEach(() => { @@ -29,27 +27,41 @@ describe('RabbitMQManagementClient', () => { describe('createOperatorPolicy', () => { it('should successfully create an operator policy', async () => { - requestSpy.mockResolvedValueOnce({ok: true, status: 201, body: ''}); + fetchSpy.mockResolvedValueOnce({ + ok: true, + status: 201, + text: async () => '' + } as Response); const result = await client.createOrUpdateOperatorPolicy('%2F', RabbitMQMessageTTLPolicyExample.validPolicy('test-queue', 5000)); expect(result).toBe(true); - expect(requestSpy).toHaveBeenCalledWith( + expect(fetchSpy).toHaveBeenCalledWith( MANAGEMENT_URL + ConsumerCreatorUtils.getMessageTTLPolicyName('test-queue'), - 'PUT', - { - pattern: 'test-queue', - definition: { - 'message-ttl': 5000 - }, - priority: 1000, - 'apply-to': 'queues' - } + expect.objectContaining({ + method: 'PUT', + headers: expect.objectContaining({ + 'Content-Type': 'application/json', + 'Authorization': expect.stringContaining('Basic') + }), + body: JSON.stringify({ + pattern: 'test-queue', + definition: { + 'message-ttl': 5000 + }, + priority: 1000, + 'apply-to': 'queues' + }) + }) ); }); it('should handle failed policy creation', async () => { - requestSpy.mockResolvedValueOnce({ok: false, status: 400, body: 'Bad Request'}); + fetchSpy.mockResolvedValueOnce({ + ok: false, + status: 400, + text: async () => 'Bad Request' + } as Response); const result = await client.createOrUpdateOperatorPolicy('%2F', RabbitMQMessageTTLPolicyExample.validPolicy()); @@ -59,19 +71,30 @@ describe('RabbitMQManagementClient', () => { describe('checkManagementPluginEnabled', () => { it('should return true when management plugin is accessible', async () => { - requestSpy.mockResolvedValueOnce({ok: true, status: 200, body: ''}); + fetchSpy.mockResolvedValueOnce({ + ok: true, + status: 200 + } as Response); const result = await client.checkManagementPluginEnabled(); expect(result).toBe(true); - expect(requestSpy).toHaveBeenCalledWith( + expect(fetchSpy).toHaveBeenCalledWith( 'http://localhost:15673/api/overview', - 'GET' + expect.objectContaining({ + method: 'GET', + headers: expect.objectContaining({ + 'Authorization': expect.stringContaining('Basic') + }) + }) ); }); it('should return false when management plugin is not accessible', async () => { - requestSpy.mockResolvedValueOnce({ok: false, status: 404, body: ''}); + fetchSpy.mockResolvedValueOnce({ + ok: false, + status: 404 + } as Response); const result = await client.checkManagementPluginEnabled(); @@ -79,7 +102,7 @@ describe('RabbitMQManagementClient', () => { }); it('should return false on network error', async () => { - requestSpy.mockRejectedValueOnce(new Error('Network error')); + fetchSpy.mockRejectedValueOnce(new Error('Network error')); const result = await client.checkManagementPluginEnabled(); @@ -89,23 +112,36 @@ describe('RabbitMQManagementClient', () => { describe('deleteOperatorPolicy', () => { it('should successfully delete an operator policy', async () => { - requestSpy.mockResolvedValueOnce({ok: true, status: 204, body: ''}); + fetchSpy.mockResolvedValueOnce({ + ok: true, + status: 204, + text: async () => '' + } as Response); const result = await client.deleteOperatorPolicy('%2F', 'test-queue'); expect(result).toBe(true); - expect(requestSpy).toHaveBeenCalledWith( + expect(fetchSpy).toHaveBeenCalledWith( MANAGEMENT_URL + 'test-queue', - 'DELETE' + expect.objectContaining({ + method: 'DELETE', + headers: expect.objectContaining({ + 'Authorization': expect.stringContaining('Basic') + }) + }) ); }); it('should handle 404 as success', async () => { - requestSpy.mockResolvedValueOnce({ok: false, status: 404, body: 'Not Found'}); + fetchSpy.mockResolvedValueOnce({ + ok: false, + status: 404, + text: async () => 'Not Found' + } as Response); const result = await client.deleteOperatorPolicy('%2F', 'test-policy'); expect(result).toBe(true); }); }); -}); +}); \ No newline at end of file