Skip to content

Commit e6c563a

Browse files
Merge pull request #234 from Canner/feature/support-data-lake-api
feature: support reading data from data lake
2 parents c7bb458 + 728d635 commit e6c563a

5 files changed

Lines changed: 261 additions & 9 deletions

File tree

packages/extension-driver-duckdb/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,15 @@
2929
log-queries: false
3030
# Optional: Whether log query requests' parameters, please be aware that query parameters might contain sensitive data (default: false)
3131
log-parameters: false
32+
# Optional: Contains the configuration parameters DuckDB need (ex: for duckdb extension "httpfs", it will needs region, accessKeyId, ...)
33+
# You can read more in the [duckdb extension page](https://duckdb.org/docs/extensions/overview)
34+
configuration-parameters:
35+
region?: string
36+
accessKeyId?: string
37+
secretAccessKey?: string
38+
# alternative option for accessKeyId and secretAccessKey
39+
sessionToken?: string
40+
endpoint?: string
41+
url_style?: string
42+
use_ssl?: boolean
3243
```

packages/extension-driver-duckdb/src/lib/duckdbDataSource.ts

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
} from '@vulcan-sql/core';
1616
import * as path from 'path';
1717
import { buildSQL } from './sqlBuilder';
18+
import { DuckDBExtensionLoader } from './duckdbExtensionLoader';
1819

1920
const getType = (value: any) => {
2021
const jsType = typeof value;
@@ -28,18 +29,36 @@ const getType = (value: any) => {
2829
}
2930
};
3031

32+
// read more configuration in DuckDB document: https://duckdb.org/docs/extensions/httpfs
33+
export interface ConfigurationParameters {
34+
region?: string;
35+
accessKeyId?: string;
36+
secretAccessKey?: string;
37+
// sessionToken: alternative option for accessKeyId and secretAccessKey
38+
sessionToken?: string;
39+
endpoint?: string;
40+
url_style?: string;
41+
use_ssl?: boolean;
42+
}
43+
3144
export interface DuckDBOptions {
3245
'persistent-path'?: string;
3346
'log-queries'?: boolean;
3447
'log-parameters'?: boolean;
48+
'configuration-parameters'?: ConfigurationParameters;
3549
}
3650

3751
@VulcanExtensionId('duckdb')
3852
export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
3953
// Use protected method for unit test
4054
protected dbMapping = new Map<
4155
string,
42-
{ db: duckdb.Database; logQueries: boolean; logParameters: boolean }
56+
{
57+
db: duckdb.Database;
58+
logQueries: boolean;
59+
logParameters: boolean;
60+
configurationParameters: ConfigurationParameters;
61+
}
4362
>();
4463
private logger = this.getLogger();
4564

@@ -56,13 +75,15 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
5675
// Only reuse db instance when not using in-memory only db
5776
let db = dbPath !== ':memory:' ? dbByPath.get(dbPath) : undefined;
5877
if (!db) {
59-
db = new duckdb.Database(dbPath);
78+
db = await this.initDatabase(dbPath);
6079
dbByPath.set(dbPath, db);
6180
}
6281
this.dbMapping.set(profile.name, {
6382
db,
6483
logQueries: profile.connection?.['log-queries'] || false,
6584
logParameters: profile.connection?.['log-parameters'] || false,
85+
configurationParameters:
86+
profile.connection?.['configuration-parameters'] || {},
6687
});
6788
}
6889
}
@@ -76,10 +97,12 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
7697
if (!this.dbMapping.has(profileName)) {
7798
throw new InternalError(`Profile instance ${profileName} not found`);
7899
}
79-
const { db, ...options } = this.dbMapping.get(profileName)!;
100+
const { db, configurationParameters, ...options } =
101+
this.dbMapping.get(profileName)!;
80102
const builtSQL = buildSQL(sql, operations);
81103
// create new connection for each query
82104
const connection = db.connect();
105+
await this.loadExtensions(connection, configurationParameters);
83106
const statement = connection.prepare(builtSQL);
84107
const parameters = Array.from(bindParams.values());
85108
this.logRequest(builtSQL, parameters, options);
@@ -153,9 +176,10 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
153176
if (!fs.existsSync(directory!))
154177
throw new InternalError(`The directory ${directory} not exists`);
155178

156-
const { db } = this.dbMapping.get(profileName)!;
179+
const { db, configurationParameters } = this.dbMapping.get(profileName)!;
157180
// create new connection for export
158181
const connection = db.connect();
182+
await this.loadExtensions(connection, configurationParameters);
159183
const formatTypeMapper = {
160184
[CacheLayerStoreFormatType.parquet.toString()]: 'parquet',
161185
};
@@ -221,4 +245,34 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
221245
);
222246
});
223247
}
248+
249+
private async initDatabase(dbPath: string) {
250+
const db = new duckdb.Database(dbPath);
251+
const conn = db.connect();
252+
await this.installExtensions(conn);
253+
return db;
254+
}
255+
256+
private async installExtensions(
257+
connection: duckdb.Connection
258+
): Promise<void> {
259+
// allows DuckDB to read remote/write remote files
260+
return await new Promise((resolve, reject) => {
261+
connection.run('INSTALL httpfs', (err: any) => {
262+
if (err) reject(err);
263+
this.logger.debug('Installed httpfs extension');
264+
resolve();
265+
});
266+
});
267+
}
268+
269+
private async loadExtensions(
270+
connection: duckdb.Connection,
271+
configurationParameters: ConfigurationParameters
272+
) {
273+
const configurationLoader = new DuckDBExtensionLoader(
274+
configurationParameters
275+
);
276+
await configurationLoader.loadExtension(connection, 'httpfs');
277+
}
224278
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import * as duckdb from 'duckdb';
2+
import { getLogger } from '@vulcan-sql/core';
3+
import { ConfigurationParameters } from './duckdbDataSource';
4+
5+
// DuckDB parameter name : configuration name
6+
export const HTTPFS_CONFIGURATIONS = Object.freeze({
7+
s3_region: 'region',
8+
s3_access_key_id: 'accessKeyId',
9+
s3_secret_access_key: 'secretAccessKey',
10+
// sessionToken= alternative option for accessKeyId and secretAccessKey
11+
s3_session_token: 'sessionToken',
12+
s3_endpoint: 'endpoint',
13+
s3_url_style: 'url_style',
14+
s3_use_ssl: 'use_ssl',
15+
});
16+
17+
export class DuckDBExtensionLoader {
18+
private configurations: ConfigurationParameters;
19+
private configurationMap: ReadonlyMap<string, Record<string, string>> =
20+
new Map([['httpfs', HTTPFS_CONFIGURATIONS]]);
21+
22+
private logger = getLogger({ scopeName: 'CORE' });
23+
24+
constructor(configurations: ConfigurationParameters) {
25+
this.configurations = configurations;
26+
}
27+
28+
public async loadExtension(
29+
conn: duckdb.Connection,
30+
extensionName: string
31+
): Promise<void> {
32+
const extensionConfigurations = this.configurationMap.get(extensionName);
33+
if (!extensionConfigurations) {
34+
this.logger.debug(
35+
`Can not find duckdb extension ${extensionName} or no configuration need to be set.`
36+
);
37+
return;
38+
}
39+
try {
40+
conn.run(`LOAD ${extensionName}`);
41+
} catch (error) {
42+
this.logger.debug(`Error when loading extension:${extensionName}`);
43+
throw error;
44+
}
45+
46+
Object.entries(extensionConfigurations).forEach(
47+
([dbParameterName, configurationKey]) => {
48+
const configurationValue =
49+
this.configurations[
50+
configurationKey as keyof ConfigurationParameters
51+
];
52+
// if configuration is not undefined
53+
if (configurationValue !== undefined) {
54+
conn.run(
55+
`SET ${dbParameterName}='${configurationValue}'`,
56+
(err: any) => {
57+
if (err) throw err;
58+
this.logger.debug(
59+
`Configuration error "${dbParameterName}": ${err}`
60+
);
61+
}
62+
);
63+
this.logger.debug(`Configuration parameter "${dbParameterName}" set`);
64+
} else {
65+
this.logger.debug(
66+
`Configuration "${dbParameterName}" has not been set`
67+
);
68+
}
69+
}
70+
);
71+
}
72+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import * as duckdb from 'duckdb';
2+
import {
3+
DuckDBExtensionLoader,
4+
HTTPFS_CONFIGURATIONS,
5+
} from '../src/lib/duckdbExtensionLoader';
6+
7+
const getQueryResults = (conn: duckdb.Connection, sql: string) =>
8+
new Promise<any[]>((resolve, reject) => {
9+
conn.all(sql, (err: any, result: any[]) => {
10+
err ? reject(err) : resolve(result);
11+
});
12+
});
13+
14+
it('Given Configurations should been set into DuckDB session after loadExtension', async () => {
15+
// Arrange
16+
const db = new duckdb.Database(':memory:');
17+
const connection = db.connect();
18+
connection.run('INSTALL httpfs');
19+
const configurations = {
20+
use_ssl: false,
21+
region: 'us-east-1',
22+
accessKeyId: 'accessKeyId',
23+
secretAccessKey: 'secretAccessKey',
24+
endpoint: 'http://localhost:4566',
25+
url_style: 'path',
26+
sessionToken: 'sessionToken',
27+
};
28+
const configurationLoader = new DuckDBExtensionLoader(configurations);
29+
30+
// Act
31+
await configurationLoader.loadExtension(connection, 'httpfs');
32+
33+
// Assert
34+
let dbConfigurations = await getQueryResults(
35+
connection,
36+
'SELECT * FROM duckdb_settings();'
37+
);
38+
dbConfigurations = dbConfigurations.filter(({ name }) =>
39+
Object.keys(HTTPFS_CONFIGURATIONS).includes(name)
40+
);
41+
42+
Object.entries(HTTPFS_CONFIGURATIONS).forEach(
43+
([dbParameterName, configurationKey]) => {
44+
if (Object.keys(configurations).includes(configurationKey)) {
45+
const dbValue = dbConfigurations.find(
46+
({ name }) => name === dbParameterName
47+
).value;
48+
const expectedValue =
49+
configurations[configurationKey as keyof typeof configurations];
50+
if (configurationKey === 'use_ssl') {
51+
expect(dbValue).toEqual('false');
52+
} else {
53+
expect(dbValue).toEqual(expectedValue);
54+
}
55+
}
56+
}
57+
);
58+
}, 500000);
59+
60+
it('Unprovided Configurations should not been set into DuckDB session after loadExtension', async () => {
61+
// Arrange
62+
const db = new duckdb.Database(':memory:');
63+
const connection = db.connect();
64+
connection.run('INSTALL httpfs');
65+
const configurations = {
66+
use_ssl: false,
67+
region: 'us-east-1',
68+
// accessKeyId: 'accessKeyId', // not provided, default is ''
69+
// secretAccessKey: 'secretAccessKey', // not provided, default is ''
70+
endpoint: 'http://localhost:4566',
71+
url_style: 'path',
72+
sessionToken: 'sessionToken',
73+
};
74+
const unprovidedConfigurations = ['accessKeyId', 'secretAccessKey'];
75+
const configurationLoader = new DuckDBExtensionLoader(configurations);
76+
77+
// Act
78+
await configurationLoader.loadExtension(connection, 'httpfs');
79+
80+
// Assert
81+
let dbConfigurations = await getQueryResults(
82+
connection,
83+
'SELECT * FROM duckdb_settings();'
84+
);
85+
dbConfigurations = dbConfigurations.filter(({ name }) =>
86+
unprovidedConfigurations.includes(name)
87+
);
88+
dbConfigurations.forEach(({ name, value }) => {
89+
if (unprovidedConfigurations.includes(name)) {
90+
expect(value).toEqual('');
91+
}
92+
});
93+
}, 500000);
94+
95+
it('HTTPFS_CONFIGURATIONS key should be a DuckDB parameter', async () => {
96+
// Arrange
97+
const db = new duckdb.Database(':memory:');
98+
const connection = db.connect();
99+
// duckdb will have the configuration after install extension
100+
await new Promise((resolve, reject) => {
101+
connection.run('INSTALL httpfs');
102+
connection.run('LOAD httpfs', (err: any) => {
103+
if (err) reject(err);
104+
resolve(true);
105+
});
106+
});
107+
108+
// Assert
109+
const dbConfigurations = await getQueryResults(
110+
connection,
111+
'SELECT * FROM duckdb_settings();'
112+
);
113+
const dbConfigurationNames = dbConfigurations.map(({ name }) => name);
114+
Object.keys(HTTPFS_CONFIGURATIONS).forEach((configurationKey) => {
115+
expect(dbConfigurationNames).toContain(configurationKey);
116+
});
117+
}, 500000);

packages/extension-driver-duckdb/tests/duckdbDataSource.spec.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,7 @@ it('Should print queries without binding when log-queries = true', async () => {
275275
profileName: 'mocked-profile',
276276
});
277277
// Assert
278-
expect(logs[0].length).toBe(1);
279-
expect(logs[0][0]).toBe(`select $1::INTEGER as test`);
278+
expect(logs.slice(-1)[0][0]).toBe(`select $1::INTEGER as test`);
280279
});
281280

282281
it('Should print queries with binding when log-queries = true and log-parameters = true', async () => {
@@ -317,9 +316,8 @@ it('Should print queries with binding when log-queries = true and log-parameters
317316
profileName: 'mocked-profile',
318317
});
319318
// Assert
320-
expect(logs[0].length).toBe(2);
321-
expect(logs[0][0]).toBe(`select $1::INTEGER as test`);
322-
expect(logs[0][1]).toEqual([1234]);
319+
expect(logs.slice(-1)[0][0]).toBe(`select $1::INTEGER as test`);
320+
expect(logs.slice(-1)[0][1]).toEqual([1234]);
323321
});
324322

325323
it('Should share db instances for same path besides in-memory only db', async () => {

0 commit comments

Comments
 (0)