Skip to content

Commit 9dfb7e9

Browse files
committed
feat(extension-driver-ksqldb): add testing for ksqldb
1 parent 6d2c805 commit 9dfb7e9

13 files changed

Lines changed: 617 additions & 23 deletions
Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,36 @@
11
# extension-driver-ksqldb
22

3-
This library was generated with [Nx](https://nx.dev).
3+
This is the KSqlDb driver for VulcanSQL, provided by [Canner](https://cannerdata.com/).
44

5-
## Building
5+
## Installation
66

7-
Run `nx build extension-driver-ksqldb` to build the library.
7+
1. Install the package:
88

9-
## Running unit tests
9+
```bash
10+
npm i @vulcan-sql/extension-driver-ksqldb
11+
```
1012

11-
Run `nx test extension-driver-ksqldb` to execute the unit tests via [Jest](https://jestjs.io).
13+
2. Update your `vulcan.yaml` file to enable the extension:
14+
15+
```yaml
16+
extensions:
17+
ksqldb: '@vulcan-sql/extension-driver-ksqldb'
18+
```
19+
20+
3. Create a new profile in your `profiles.yaml` file or in the designated profile paths. For more information, please refer to the [KsqlDb documentation](https://ksqldb.io/) for the available arguments.
21+
22+
```yaml
23+
- name: ksql # Profile name
24+
type: ksqldb
25+
connection:
26+
# Optional: KSqlDb instance URL. Default is http://localhost:8088.
27+
host: 'www.example.com:8088'
28+
```
29+
30+
## Testing
31+
32+
To run tests for the `extension-driver-ksqldb` module, use the following command:
33+
34+
```bash
35+
nx test extension-driver-ksqldb
36+
```

packages/extension-driver-ksqldb/jest.config.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
module.exports = {
22
displayName: 'extension-driver-ksqldb',
33
preset: '../../jest.preset.ts',
4+
// Use node environment to avoid facing "TypeError: The "listener" argument must be of type function. Received an instance of Object" error
5+
// when using ksqldb client executing query in the jest environment.
6+
testEnvironment: 'node',
47
globals: {
58
'ts-jest': {
69
tsconfig: '<rootDir>/tsconfig.spec.json',

packages/extension-driver-ksqldb/project.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@
2525
"options": {
2626
"jestConfig": "packages/extension-driver-ksqldb/jest.config.ts",
2727
"passWithNoTests": true
28-
}
28+
},
29+
"dependsOn": [
30+
{
31+
"projects": "self",
32+
"target": "install-dependencies"
33+
}
34+
]
2935
}
3036
},
3137
"tags": []

packages/extension-driver-ksqldb/src/lib/ksqldbDataSource.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ export class KSQLDBDataSource extends DataSource<any, any> {
3939
this.clientMapping.set(profile.name, { client, options });
4040

4141
// Testing connection
42-
await client.checkConnection();
42+
const isRunning = await client.checkConnectionRunning();
43+
if (!isRunning) {
44+
throw new Error('KsqlDb server is not running');
45+
}
46+
4347
this.logger.debug(`Profile ${profile.name} initialized`);
4448
}
4549
}

packages/extension-driver-ksqldb/src/lib/restfulClient.ts

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const BASIC_HEADERS = {
66

77
const RESTFUL_API = {
88
INFO: '/info',
9-
KQL: '/ksql',
9+
KSQL: '/ksql',
1010
QUERY: '/query',
1111
};
1212

@@ -96,32 +96,68 @@ export class RestfulClient {
9696
});
9797
}
9898

99-
public async checkConnection() {
100-
const res = await this.request<KsqlInfoResponse>(RESTFUL_API.INFO, 'GET');
101-
if (res.KsqlServerInfo['serverStatus'] !== 'RUNNING') {
102-
throw new Error('KSQLDB server is not running');
99+
public async checkConnection(): Promise<
100+
KsqlInfoResponse['KsqlServerInfo']['serverStatus']
101+
> {
102+
try {
103+
const res = await this.request<KsqlInfoResponse>(RESTFUL_API.INFO, 'GET');
104+
return res.KsqlServerInfo['serverStatus'];
105+
} catch (e) {
106+
throw new Error('KsqlDb server is not ready');
107+
}
108+
}
109+
110+
public async checkConnectionRunning(): Promise<boolean> {
111+
try {
112+
const status = await this.checkConnection();
113+
const isRunning = status === 'RUNNING';
114+
return isRunning;
115+
} catch (e) {
116+
return false;
103117
}
104-
return res;
105118
}
106119

107120
public async query({
108121
query,
109122
query_params = {},
110123
}: {
111124
query: string;
112-
query_params: Record<string, any>;
125+
query_params?: Record<string, any>;
113126
}) {
127+
// bind query parameters
128+
const ksql = this.bindParams(query, query_params);
129+
130+
const buffer = Buffer.from(JSON.stringify({ ksql }));
131+
const res = await this.request(RESTFUL_API.QUERY, 'POST', buffer);
132+
return res;
133+
}
134+
135+
public async exec({
136+
query,
137+
query_params = {},
138+
}: {
139+
query: string;
140+
query_params?: Record<string, any>;
141+
}) {
142+
// bind query parameters
143+
const ksql = this.bindParams(query, query_params);
144+
145+
const buffer = Buffer.from(JSON.stringify({ ksql }));
146+
const res = await this.request(RESTFUL_API.KSQL, 'POST', buffer);
147+
return res;
148+
}
149+
150+
private bindParams(query: string, query_params: Record<string, any>) {
114151
const values = Object.values(query_params);
115152

116153
// replace the parameterized placeholder to values
117154
const ksql = query.replace(/\$(\d+)/g, (_, index) => {
118155
const valueIndex = parseInt(index) - 1;
119-
return values[valueIndex];
156+
const paramValue = values[valueIndex];
157+
return typeof paramValue === 'string' ? `'${paramValue}'` : paramValue;
120158
});
121159

122-
const buffer = Buffer.from(JSON.stringify({ ksql }));
123-
const res = await this.request(RESTFUL_API.QUERY, 'POST', buffer);
124-
return res;
160+
return ksql;
125161
}
126162

127163
private async request<R = QueryResponse[]>(
@@ -151,11 +187,17 @@ export class RestfulClient {
151187
});
152188

153189
req.on('end', () => {
154-
const jsonData = JSON.parse(data);
190+
let responseData: any = data;
191+
try {
192+
responseData = JSON.parse(data);
193+
} catch (e) {
194+
responseData = data;
195+
}
196+
155197
if (status === 200) {
156-
resolve(jsonData);
198+
resolve(responseData);
157199
} else {
158-
reject(jsonData);
200+
reject(responseData);
159201
}
160202
this.close();
161203
});
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import * as Docker from 'dockerode';
2+
import * as path from 'path';
3+
import * as jsYaml from 'js-yaml';
4+
import { promises as fs } from 'fs';
5+
6+
const docker = new Docker();
7+
8+
interface ComposeConfig {
9+
image: string;
10+
hostname: string;
11+
ports: string[];
12+
environment: Record<string, any>;
13+
expose: string[];
14+
entrypoint: string | string[];
15+
tty: boolean;
16+
networks: string[];
17+
}
18+
19+
interface ComposeJson {
20+
services: Record<string, ComposeConfig>;
21+
networks: Record<string, any>;
22+
}
23+
24+
export class Compose {
25+
public network?: Docker.Network;
26+
public containers = new Map<string, Docker.Container>();
27+
public ksqldbName = 'ksqldb-server';
28+
public ksqldbContainer?: Docker.Container;
29+
30+
public async up() {
31+
const composeJson = await this.getComposeJson();
32+
await docker.pruneContainers();
33+
await docker.pruneNetworks();
34+
const network = Object.keys(composeJson.networks)[0];
35+
this.network = await docker.createNetwork({
36+
Name: network,
37+
});
38+
39+
for (const serviceName in composeJson.services) {
40+
const service = composeJson.services[serviceName];
41+
const pullStream = await docker.pull(service.image);
42+
// https://github.com/apocas/dockerode/issues/647
43+
await new Promise((res) => docker.modem.followProgress(pullStream, res));
44+
45+
const containerConfig: Docker.ContainerCreateOptions =
46+
this.convertContainerConfig(service, network);
47+
48+
this.containers.set(
49+
serviceName,
50+
await docker.createContainer(containerConfig)
51+
);
52+
53+
await this.containers.get(serviceName)!.start();
54+
}
55+
}
56+
57+
public async down() {
58+
for (const container of this.containers.values()) {
59+
await container.stop();
60+
}
61+
await docker.pruneContainers();
62+
await docker.pruneNetworks();
63+
}
64+
65+
private async getComposeJson(): Promise<ComposeJson> {
66+
const content = await fs.readFile(
67+
path.resolve(__dirname, 'docker-compose.yml'),
68+
'utf-8'
69+
);
70+
const composeJson = jsYaml.load(content) as ComposeJson;
71+
return composeJson;
72+
}
73+
74+
private convertContainerConfig(service: ComposeConfig, network: string) {
75+
return {
76+
Image: service['image'],
77+
name: service['hostname'],
78+
Hostname: service['hostname'],
79+
HostConfig: {
80+
PortBindings: (service['ports'] || []).reduce((acc, port) => {
81+
const [hostPort, containerPort] = port.split(':');
82+
return {
83+
...acc,
84+
[`${containerPort}/tcp`]: [{ HostPort: hostPort }],
85+
};
86+
}, {}),
87+
NetworkMode: network,
88+
},
89+
Env: Object.keys(service['environment'] || {}).map(
90+
(key) => `${key}=${service['environment'][key]}`
91+
),
92+
ExposedPorts: (service['expose'] || []).reduce(
93+
(acc, port) => ({ ...acc, [`${port}/tcp`]: {} }),
94+
{}
95+
),
96+
Entrypoint: service['entrypoint'] || [],
97+
Tty: service['tty'] || false,
98+
};
99+
}
100+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
version: '2'
2+
services:
3+
zookeeper:
4+
image: confluentinc/cp-zookeeper:7.4.0
5+
hostname: zookeeper
6+
ports:
7+
- "2181:2181"
8+
expose:
9+
- "2181"
10+
environment:
11+
ZOOKEEPER_CLIENT_PORT: 2181
12+
ZOOKEEPER_TICK_TIME: 2000
13+
networks:
14+
- ksqldb_default
15+
16+
broker:
17+
image: confluentinc/cp-kafka:7.4.0
18+
hostname: broker
19+
depends_on:
20+
- zookeeper
21+
ports:
22+
- "29092:29092"
23+
expose:
24+
- "29092"
25+
environment:
26+
KAFKA_BROKER_ID: 1
27+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
28+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
29+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
30+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
31+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
32+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
33+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
34+
networks:
35+
- ksqldb_default
36+
37+
ksqldb-server:
38+
image: confluentinc/ksqldb-server:0.29.0
39+
hostname: ksqldb-server
40+
depends_on:
41+
- broker
42+
ports:
43+
- "8088:8088"
44+
expose:
45+
- "8088"
46+
environment:
47+
KSQL_LISTENERS: http://0.0.0.0:8088
48+
KSQL_BOOTSTRAP_SERVERS: broker:9092
49+
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
50+
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
51+
networks:
52+
- ksqldb_default
53+
54+
ksqldb-cli:
55+
image: confluentinc/ksqldb-cli:0.29.0
56+
hostname: ksqldb-cli
57+
depends_on:
58+
- broker
59+
- ksqldb-server
60+
entrypoint: /bin/sh
61+
tty: true
62+
networks:
63+
- ksqldb_default
64+
65+
networks:
66+
ksqldb_default:
67+
driver: bridge

0 commit comments

Comments
 (0)