Skip to content

Commit ff15ba3

Browse files
committed
chore(extention-driver-duckdb): use promise all to pareralize duckdb queries, and use connection.all to speed up
1 parent ceac352 commit ff15ba3

5 files changed

Lines changed: 78 additions & 41 deletions

File tree

packages/extension-driver-duckdb/src/lib/duckdbDataSource.ts

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
VulcanExtensionId,
1515
} from '@vulcan-sql/core';
1616
import * as path from 'path';
17-
import { buildSQL } from './sqlBuilder';
17+
import { buildSQL, chunkSize } from './sqlBuilder';
1818
import { DuckDBExtensionLoader } from './duckdbExtensionLoader';
1919

2020
const getType = (value: any) => {
@@ -99,49 +99,74 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
9999
}
100100
const { db, configurationParameters, ...options } =
101101
this.dbMapping.get(profileName)!;
102-
const builtSQL = buildSQL(sql, operations);
102+
const [builtSQL, streamSQL] = buildSQL(sql, operations);
103+
103104
// create new connection for each query
104105
const connection = db.connect();
105106
await this.loadExtensions(connection, configurationParameters);
106-
const statement = connection.prepare(builtSQL);
107107
const parameters = Array.from(bindParams.values());
108108
this.logRequest(builtSQL, parameters, options);
109+
if (streamSQL) this.logRequest(streamSQL, parameters, options);
109110

110-
const result = await statement.stream(...parameters);
111-
const firstChunk = await result.nextChunk();
111+
const [result, asyncIterable] = await Promise.all([
112+
new Promise<duckdb.TableData>((resolve, reject) => {
113+
const c = db.connect();
114+
c.all(
115+
builtSQL,
116+
...parameters,
117+
(err: duckdb.DuckDbError | null, res: duckdb.TableData) => {
118+
if (err) {
119+
reject(err);
120+
}
121+
resolve(res);
122+
}
123+
);
124+
}),
125+
new Promise<duckdb.QueryResult | undefined>((resolve, reject) => {
126+
if (!streamSQL) resolve(undefined);
127+
try {
128+
const c = db.connect();
129+
const result = c.stream(streamSQL, ...parameters);
130+
resolve(result);
131+
} catch (err: any) {
132+
reject(err);
133+
}
134+
}),
135+
]);
136+
const asyncIterableStream = new Readable({
137+
objectMode: true,
138+
read: function () {
139+
for (const row of result) {
140+
this.push(row);
141+
}
142+
this.push(null);
143+
},
144+
});
145+
if (result.length >= chunkSize) {
146+
asyncIterableStream._read = async function () {
147+
if (asyncIterable) {
148+
for await (const row of asyncIterable) {
149+
this.push(row);
150+
}
151+
this.push(null);
152+
}
153+
};
154+
if (result) {
155+
for (const row of result) {
156+
asyncIterableStream.push(row);
157+
}
158+
}
159+
}
112160
return {
113161
getColumns: () => {
114-
if (!firstChunk || firstChunk.length === 0) return [];
115-
return Object.keys(firstChunk[0]).map((columnName) => ({
162+
if (!result || result.length === 0) return [];
163+
return Object.keys(result[0]).map((columnName) => ({
116164
name: columnName,
117-
type: getType(firstChunk[0][columnName as any]),
165+
type: getType(result[0][columnName as any]),
118166
}));
119167
},
120168
getData: () => {
121-
const stream = new Readable({
122-
objectMode: true,
123-
read() {
124-
result.nextChunk().then((chunk) => {
125-
if (!chunk) {
126-
this.push(null);
127-
return;
128-
}
129-
for (const row of chunk) {
130-
this.push(row);
131-
}
132-
});
133-
},
134-
});
135-
// Send the first chunk
136-
if (firstChunk) {
137-
for (const row of firstChunk) {
138-
stream.push(row);
139-
}
140-
} else {
141-
// If there is no data, close the stream.
142-
stream.push(null);
143-
}
144-
return stream;
169+
return asyncIterableStream;
145170
},
146171
};
147172
}

packages/extension-driver-duckdb/src/lib/sqlBuilder.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,23 @@ export const isNoOP = (
2626
return true;
2727
};
2828

29+
const duckdbExecuteChunkSize =
30+
process.env['DUCKDB_EXECUTE_CHUNK_SIZE'] || '2000';
31+
32+
export const chunkSize = Number(duckdbExecuteChunkSize);
33+
2934
export const buildSQL = (
3035
sql: string,
3136
operations: Partial<Parameterized<SQLClauseOperation>>
32-
): string => {
33-
if (isNoOP(operations)) return sql;
37+
): string[] => {
38+
if (isNoOP(operations) && !/^select/.test(sql.toLowerCase()))
39+
return [sql, ''];
3440
let builtSQL = '';
3541
builtSQL += `SELECT * FROM (${removeEndingSemiColon(sql)})`;
3642
builtSQL = addLimit(builtSQL, operations.limit);
3743
builtSQL = addOffset(builtSQL, operations.offset);
38-
builtSQL += ';';
39-
return builtSQL;
44+
45+
const chunkSql = `SELECT * FROM (${builtSQL}) LIMIT ${chunkSize};`;
46+
const streamSql = `SELECT * FROM (${builtSQL}) OFFSET ${chunkSize};`;
47+
return [chunkSql, streamSql];
4048
};

packages/extension-driver-duckdb/tests/duckdbDataSource.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ it('Should throw error from upstream', async () => {
206206
operations: {} as any,
207207
profileName: 'mocked-profile',
208208
})
209-
).rejects.toThrow(/^Parser Error: syntax error at or near "wrong"/);
209+
).rejects.toThrow(/^Parser Error: syntax error at or near/);
210210
});
211211

212212
it('Should return empty data and column with zero result', async () => {
@@ -275,7 +275,7 @@ it('Should print queries without binding when log-queries = true', async () => {
275275
profileName: 'mocked-profile',
276276
});
277277
// Assert
278-
expect(logs.slice(-1)[0][0]).toBe(`select $1::INTEGER as test`);
278+
expect(/select \$1::INTEGER as test/.test(logs.slice(-1)[0][0])).toBe(true);
279279
});
280280

281281
it('Should print queries with binding when log-queries = true and log-parameters = true', async () => {
@@ -316,7 +316,7 @@ it('Should print queries with binding when log-queries = true and log-parameters
316316
profileName: 'mocked-profile',
317317
});
318318
// Assert
319-
expect(logs.slice(-1)[0][0]).toBe(`select $1::INTEGER as test`);
319+
expect(/select \$1::INTEGER as test/.test(logs.slice(-1)[0][0])).toBe(true);
320320
expect(logs.slice(-1)[0][1]).toEqual([1234]);
321321
});
322322

packages/extension-driver-duckdb/tests/sqlBuilder.spec.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ it('BuildSQL function should build sql with operations', async () => {
6767
// Act
6868
const result = builder.buildSQL(statement, { limit: '$1', offset: '$2' });
6969
// Arrange
70-
expect(result).toBe(
71-
'SELECT * FROM (SELECT * FROM users) LIMIT $1 OFFSET $2;'
70+
expect(result[0]).toBe(
71+
`SELECT * FROM (SELECT * FROM (SELECT * FROM users) LIMIT $1 OFFSET $2) LIMIT ${builder.chunkSize};`
72+
);
73+
expect(result[1]).toBe(
74+
`SELECT * FROM (SELECT * FROM (SELECT * FROM users) LIMIT $1 OFFSET $2) OFFSET ${builder.chunkSize};`
7275
);
7376
});

types/duckdb.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ declare module 'duckdb' {
4040
sql: string,
4141
...params: [...any, Callback<Statement>] | []
4242
): Statement;
43+
stream(sql: any, ...args: any[]): QueryResult;
4344
}
4445

4546
export class Database {

0 commit comments

Comments
 (0)