@@ -99,20 +99,84 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
9999 }
100100 const { db, configurationParameters, ...options } =
101101 this . dbMapping . get ( profileName ) ! ;
102- const [ builtSQL , streamSQL ] = buildSQL ( sql , operations ) ;
102+ const [ firstDataSQL , restDataSQL ] = buildSQL ( sql , operations ) ;
103103
104104 // create new connection for each query
105+ const parameters = Array . from ( bindParams . values ( ) ) ;
106+ this . logRequest ( firstDataSQL , parameters , options ) ;
105107 const connection = db . connect ( ) ;
106108 await this . loadExtensions ( connection , configurationParameters ) ;
107- const parameters = Array . from ( bindParams . values ( ) ) ;
108- this . logRequest ( builtSQL , parameters , options ) ;
109- if ( streamSQL ) this . logRequest ( streamSQL , parameters , options ) ;
109+ if ( restDataSQL ) this . logRequest ( restDataSQL , parameters , options ) ;
110+ const [ firstData , restDataStream ] = await this . acquireData (
111+ firstDataSQL ,
112+ restDataSQL ,
113+ parameters ,
114+ db
115+ ) ;
116+ const readable = this . createReadableStream ( firstData , restDataStream ) ;
117+ return {
118+ getColumns : ( ) => {
119+ if ( ! firstData || firstData . length === 0 ) return [ ] ;
120+ return Object . keys ( firstData [ 0 ] ) . map ( ( columnName ) => ( {
121+ name : columnName ,
122+ type : getType ( firstData [ 0 ] [ columnName as any ] ) ,
123+ } ) ) ;
124+ } ,
125+ getData : ( ) => {
126+ return readable ;
127+ } ,
128+ } ;
129+ }
130+
131+ public async prepare ( { parameterIndex } : RequestParameter ) {
132+ return `$${ parameterIndex } ` ;
133+ }
134+
135+ private createReadableStream (
136+ firstData : duckdb . TableData ,
137+ restDataStream : duckdb . QueryResult | undefined
138+ ) {
139+ const readable = new Readable ( {
140+ objectMode : true ,
141+ read : function ( ) {
142+ for ( const row of firstData ) {
143+ this . push ( row ) ;
144+ }
145+ this . push ( null ) ;
146+ } ,
147+ } ) ;
148+ if ( firstData . length >= chunkSize ) {
149+ readable . _read = async function ( ) {
150+ if ( restDataStream ) {
151+ for await ( const row of restDataStream ) {
152+ this . push ( row ) ;
153+ }
154+ this . push ( null ) ;
155+ }
156+ } ;
157+ if ( firstData ) {
158+ for ( const row of firstData ) {
159+ readable . push ( row ) ;
160+ }
161+ }
162+ }
163+ return readable ;
164+ }
110165
111- const [ result , asyncIterable ] = await Promise . all ( [
166+ private async acquireData (
167+ firstDataSql : string ,
168+ restDataSql : string | undefined ,
169+ parameters : any [ ] ,
170+ db : duckdb . Database
171+ ) {
172+ // conn.all() is faster then stream.checkChunk().
173+ // For the small size data we use conn.all() to get the data at once
174+ // To limit memory use and prevent server crashes, we will use conn.all() to acquire the initial chunk of data, then conn.stream() to receive the remainder of the data.
175+ return await Promise . all ( [
112176 new Promise < duckdb . TableData > ( ( resolve , reject ) => {
113177 const c = db . connect ( ) ;
114178 c . all (
115- builtSQL ,
179+ firstDataSql ,
116180 ...parameters ,
117181 ( err : duckdb . DuckDbError | null , res : duckdb . TableData ) => {
118182 if ( err ) {
@@ -123,56 +187,16 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
123187 ) ;
124188 } ) ,
125189 new Promise < duckdb . QueryResult | undefined > ( ( resolve , reject ) => {
126- if ( ! streamSQL ) resolve ( undefined ) ;
190+ if ( ! restDataSql ) resolve ( undefined ) ;
127191 try {
128192 const c = db . connect ( ) ;
129- const result = c . stream ( streamSQL , ...parameters ) ;
193+ const result = c . stream ( restDataSql , ...parameters ) ;
130194 resolve ( result ) ;
131195 } catch ( err : any ) {
132196 reject ( err ) ;
133197 }
134198 } ) ,
135199 ] ) ;
136- const asyncIterableStream = new Readable ( {
137- objectMode : true ,
138- read : function ( ) {
139- for ( const row of result ) {
140- this . push ( row ) ;
141- }
142- this . push ( null ) ;
143- } ,
144- } ) ;
145- if ( result . length >= chunkSize ) {
146- asyncIterableStream . _read = async function ( ) {
147- if ( asyncIterable ) {
148- for await ( const row of asyncIterable ) {
149- this . push ( row ) ;
150- }
151- this . push ( null ) ;
152- }
153- } ;
154- if ( result ) {
155- for ( const row of result ) {
156- asyncIterableStream . push ( row ) ;
157- }
158- }
159- }
160- return {
161- getColumns : ( ) => {
162- if ( ! result || result . length === 0 ) return [ ] ;
163- return Object . keys ( result [ 0 ] ) . map ( ( columnName ) => ( {
164- name : columnName ,
165- type : getType ( result [ 0 ] [ columnName as any ] ) ,
166- } ) ) ;
167- } ,
168- getData : ( ) => {
169- return asyncIterableStream ;
170- } ,
171- } ;
172- }
173-
174- public async prepare ( { parameterIndex } : RequestParameter ) {
175- return `$${ parameterIndex } ` ;
176200 }
177201
178202 private logRequest (
@@ -271,9 +295,10 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
271295 } ) ;
272296 }
273297
274- // set duckdb thread to number
298+ // The dafault duckdb thread is 16
299+ // Setting thread below your CPU core number may result in enhanced performance, according to our observations.
275300 private async setThread ( db : duckdb . Database ) {
276- const thread = process . env [ 'THREADS ' ] ;
301+ const thread = process . env [ 'DUCKDB_THREADS ' ] ;
277302
278303 if ( ! thread ) return ;
279304 await new Promise ( ( resolve , reject ) => {
0 commit comments