Skip to content

Commit 6d2c805

Browse files
committed
feat(extension-driver-ksqldb): add ksqldb driver handling
1 parent 5316570 commit 6d2c805

7 files changed

Lines changed: 362 additions & 11 deletions

File tree

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
export * from './lib/extension-driver-ksqldb';
1+
export * from './lib/ksqldbDataSource';
2+
import { KSQLDBDataSource } from './lib/ksqldbDataSource';
3+
export default [KSQLDBDataSource];

packages/extension-driver-ksqldb/src/lib/extension-driver-ksqldb.spec.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.

packages/extension-driver-ksqldb/src/lib/extension-driver-ksqldb.ts

Lines changed: 0 additions & 3 deletions
This file was deleted.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import {
2+
DataColumn,
3+
DataResult,
4+
DataSource,
5+
ExecuteOptions,
6+
InternalError,
7+
RequestParameter,
8+
VulcanExtensionId,
9+
} from '@vulcan-sql/core';
10+
import { Stream } from 'stream';
11+
import { buildSQL, convertSchemaToColumns } from './sqlBuilder';
12+
import { mapFromKsqlDbType } from './typeMapper';
13+
import {
14+
RestfulClient,
15+
RestfulClientOptions,
16+
QueryResponse,
17+
Header,
18+
Row,
19+
FinalMessage,
20+
} from './restfulClient';
21+
22+
@VulcanExtensionId('ksqldb')
23+
export class KSQLDBDataSource extends DataSource<any, any> {
24+
private logger = this.getLogger();
25+
private clientMapping = new Map<
26+
string,
27+
{ client: RestfulClient; options?: RestfulClientOptions }
28+
>();
29+
public override async onActivate() {
30+
const profiles = this.getProfiles().values();
31+
for (const profile of profiles) {
32+
this.logger.debug(
33+
`Initializing profile: ${profile.name} using ksqldb driver`
34+
);
35+
const options = {
36+
...profile.connection!,
37+
};
38+
const client = new RestfulClient(options);
39+
this.clientMapping.set(profile.name, { client, options });
40+
41+
// Testing connection
42+
await client.checkConnection();
43+
this.logger.debug(`Profile ${profile.name} initialized`);
44+
}
45+
}
46+
47+
public async prepare({ parameterIndex }: RequestParameter) {
48+
return `$${parameterIndex}`;
49+
}
50+
51+
public async execute({
52+
statement: sql,
53+
bindParams,
54+
profileName,
55+
operations,
56+
}: ExecuteOptions): Promise<DataResult> {
57+
this.checkProfileExist(profileName);
58+
const { client } = this.clientMapping.get(profileName)!;
59+
60+
const params = Object.fromEntries(bindParams);
61+
try {
62+
const builtSQL = buildSQL(sql, operations);
63+
const data = await client.query({
64+
query: builtSQL,
65+
query_params: params,
66+
});
67+
return await this.getResultFromRestfulResponse(data);
68+
} catch (e) {
69+
this.logger.debug(
70+
`Errors occurred, release connection from ${profileName}`
71+
);
72+
throw e;
73+
}
74+
}
75+
76+
private async getResultFromRestfulResponse(data: QueryResponse[]) {
77+
const dataRowStream = new Stream.Readable({
78+
objectMode: true,
79+
read: () => null,
80+
// automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16
81+
autoDestroy: true,
82+
});
83+
84+
const headerData = (data[0] as Header).header;
85+
const columns: DataColumn[] = convertSchemaToColumns(headerData.schema);
86+
// add the data row to the stream
87+
for (const innerData of data) {
88+
// format the ksqldb table response to VulcanSQL Data API
89+
// https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/query-endpoint/#example-table-response
90+
if ((innerData as Row).row) {
91+
const rowColumns = (innerData as Row).row.columns;
92+
const outputData = rowColumns.reduce((result, value, index) => {
93+
return { ...result, [columns[index].name]: value };
94+
}, {});
95+
dataRowStream.push(outputData);
96+
}
97+
98+
// the end of query result
99+
if ((innerData as FinalMessage).finalMessage) {
100+
dataRowStream.push(null);
101+
}
102+
}
103+
104+
return {
105+
getColumns: () => {
106+
return columns.map((column) => ({
107+
name: column.name || '',
108+
// Convert KsqlDb type to FieldDataType supported by VulcanSQL for generating the response schema in the specification, see: https://github.com/Canner/vulcan-sql/pull/78#issuecomment-1621532674
109+
type: mapFromKsqlDbType(column.type || ''),
110+
}));
111+
},
112+
getData: () => dataRowStream,
113+
};
114+
}
115+
116+
private checkProfileExist(profileName: string) {
117+
if (!this.clientMapping.has(profileName)) {
118+
throw new InternalError(`Profile instance ${profileName} not found`);
119+
}
120+
}
121+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import * as http2 from 'http2';
2+
3+
const BASIC_HEADERS = {
4+
Accept: 'application/vnd.ksql.v1+json',
5+
};
6+
7+
const RESTFUL_API = {
8+
INFO: '/info',
9+
KQL: '/ksql',
10+
QUERY: '/query',
11+
};
12+
13+
interface KsqlInfoResponse {
14+
KsqlServerInfo: {
15+
version: string;
16+
kafkaClusterId: string;
17+
ksqlServiceId: string;
18+
serverStatus: string;
19+
};
20+
}
21+
22+
export type Header = {
23+
header: {
24+
queryId: string;
25+
schema: string;
26+
};
27+
};
28+
29+
export type Row = {
30+
row: {
31+
columns: any[];
32+
};
33+
};
34+
35+
export type FinalMessage = {
36+
finalMessage: string;
37+
};
38+
39+
/**
40+
* The example query response like below:
41+
* [
42+
* {
43+
* "header": {
44+
* "queryId": "transient_RIDERLOCATIONS_356492705638097482",
45+
* "schema": "`PROFILEID` STRING, `LATITUDE` DOUBLE, `LONGITUDE` DOUBLE"
46+
* }
47+
* },
48+
* { "row": {"columns": ["c2309eec",37.7877,-122.4205]} },
49+
* { ...more rows },
50+
*
51+
* { "finalMessage": "Query Completed"}
52+
* ]
53+
*/
54+
export type QueryResponse = Header | Row | FinalMessage;
55+
56+
export interface RestfulClientOptions {
57+
host: string;
58+
}
59+
60+
export class RestfulClient {
61+
private options: RestfulClientOptions;
62+
public client?: http2.ClientHttp2Session;
63+
public connected = false;
64+
private startSession: () => null | Promise<void> = () => null;
65+
66+
constructor(options: RestfulClientOptions) {
67+
this.options = options;
68+
this.connect();
69+
}
70+
71+
public connect() {
72+
this.startSession = () =>
73+
new Promise((resolve, reject) => {
74+
this.client = http2.connect(this.options.host);
75+
76+
this.client.on('connect', () => {
77+
this.connected = true;
78+
resolve();
79+
});
80+
81+
this.client.on('error', (error: any) => {
82+
reject(error);
83+
});
84+
});
85+
}
86+
87+
public close(): Promise<void> {
88+
return new Promise((resolve, reject) => {
89+
if (this.client) {
90+
!this.client.destroyed && this.client.destroy();
91+
this.connected = false;
92+
resolve();
93+
} else {
94+
reject(new Error('Client is not initialized'));
95+
}
96+
});
97+
}
98+
99+
public async checkConnection() {
100+
const res = await this.request<KsqlInfoResponse>(RESTFUL_API.INFO, 'GET');
101+
if (res.KsqlServerInfo['serverStatus'] !== 'RUNNING') {
102+
throw new Error('KSQLDB server is not running');
103+
}
104+
return res;
105+
}
106+
107+
public async query({
108+
query,
109+
query_params = {},
110+
}: {
111+
query: string;
112+
query_params: Record<string, any>;
113+
}) {
114+
const values = Object.values(query_params);
115+
116+
// replace the parameterized placeholder to values
117+
const ksql = query.replace(/\$(\d+)/g, (_, index) => {
118+
const valueIndex = parseInt(index) - 1;
119+
return values[valueIndex];
120+
});
121+
122+
const buffer = Buffer.from(JSON.stringify({ ksql }));
123+
const res = await this.request(RESTFUL_API.QUERY, 'POST', buffer);
124+
return res;
125+
}
126+
127+
private async request<R = QueryResponse[]>(
128+
path: string,
129+
method: string,
130+
buffer?: Buffer
131+
): Promise<R> {
132+
this.startSession && (await this.startSession());
133+
134+
return new Promise((resolve, reject) => {
135+
const config = {
136+
...BASIC_HEADERS,
137+
':method': method,
138+
':path': path,
139+
};
140+
const req = this.client!.request(config);
141+
req.setEncoding('utf-8');
142+
143+
let status: number | null = null;
144+
req.on('response', (headers) => {
145+
status = headers[':status']!;
146+
});
147+
148+
let data = '';
149+
req.on('data', (chunk) => {
150+
data += chunk;
151+
});
152+
153+
req.on('end', () => {
154+
const jsonData = JSON.parse(data);
155+
if (status === 200) {
156+
resolve(jsonData);
157+
} else {
158+
reject(jsonData);
159+
}
160+
this.close();
161+
});
162+
163+
req.on('error', (error) => {
164+
reject(error);
165+
this.close();
166+
});
167+
168+
buffer && req.write(buffer);
169+
req.end();
170+
});
171+
}
172+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { Parameterized, SQLClauseOperation } from '@vulcan-sql/core';
2+
import { isNull, isUndefined, startCase, lowerCase } from 'lodash';
3+
4+
const isNullOrUndefine = (value: any) => isUndefined(value) || isNull(value);
5+
6+
export const removeEndingSemiColon = (sql: string) => {
7+
return sql.replace(/;([ \n]+)?$/, '');
8+
};
9+
10+
export const addLimit = (sql: string, limit?: string | null) => {
11+
if (isNullOrUndefine(limit)) return sql;
12+
return [sql, `LIMIT`, limit].join(' ');
13+
};
14+
15+
// Check if there is no operations
16+
export const isNoOP = (
17+
operations: Partial<Parameterized<SQLClauseOperation>>
18+
): boolean => {
19+
if (!isNullOrUndefine(operations.limit)) return false;
20+
if (!isNullOrUndefine(operations.offset)) return false;
21+
return true;
22+
};
23+
24+
export const buildSQL = (
25+
sql: string,
26+
operations: Partial<Parameterized<SQLClauseOperation>>
27+
): string => {
28+
if (isNoOP(operations)) return sql;
29+
let builtSQL = '';
30+
// ksqlDB currently does not support subquery
31+
// OFFSET is not supported, LIMIT syntax is supported >= ksqldb 0.24.0
32+
// https://github.com/confluentinc/ksql/issues/745
33+
builtSQL += removeEndingSemiColon(sql);
34+
builtSQL = addLimit(builtSQL, operations.limit);
35+
builtSQL += ';';
36+
return builtSQL;
37+
};
38+
39+
export const convertSchemaToColumns = (schema: string) => {
40+
// schema example: "`PROFILEID` STRING, `LATITUDE` DOUBLE, `LONGITUDE` DOUBLE"
41+
return schema
42+
.replace(/`/g, '')
43+
.split(', ')
44+
.map((columnString) => {
45+
const [name, type] = columnString.split(' ');
46+
return { name: startCase(lowerCase(name)), type };
47+
});
48+
};
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
const typeMapping = new Map<string, string>();
2+
3+
const register = (ksqlType: string, type: string) => {
4+
typeMapping.set(ksqlType, type);
5+
};
6+
7+
// Reference
8+
// https://docs.ksqldb.io/en/latest/reference/sql/data-types/
9+
register('BOOLEAN', 'boolean');
10+
register('INT', 'number');
11+
register('BIGINT', 'number');
12+
register('DOUBLE', 'number');
13+
register('DECIMAL', 'number');
14+
15+
export const mapFromKsqlDbType = (ksqlType: string) => {
16+
if (typeMapping.has(ksqlType)) return typeMapping.get(ksqlType)!;
17+
return 'string';
18+
};

0 commit comments

Comments
 (0)