Skip to content

Commit dc792f0

Browse files
committed
add exp-backoff, handle nexttoken, remove destroy
1 parent 0309539 commit dc792f0

3 files changed

Lines changed: 27 additions & 10 deletions

File tree

packages/extension-driver-redshift/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
},
2424
"license": "Apache-2.0",
2525
"dependencies": {
26-
"@aws-sdk/client-redshift-data": "^3.405.0"
26+
"@aws-sdk/client-redshift-data": "^3.405.0",
27+
"exponential-backoff": "^3.1.1"
2728
},
2829
"peerDependencies": {
2930
"@vulcan-sql/core": "~0.9.1-0"

packages/extension-driver-redshift/src/lib/redshiftDataSource.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
GetStatementResultCommand,
2323
SqlParameter,
2424
} from '@aws-sdk/client-redshift-data';
25+
import { backOff } from 'exponential-backoff';
2526

2627
export type RedshiftOptions = RedshiftDataClientConfig & Omit<ExecuteStatementCommandInput, "Sql" | "Parameters">;
2728

@@ -89,7 +90,6 @@ export class RedShiftDataSource extends DataSource<any, RedshiftOptions> {
8990
this.logger.debug(
9091
`Errors occurred, release connection from ${profileName}`
9192
);
92-
redshiftClient.destroy();
9393
throw e;
9494
}
9595
}
@@ -114,7 +114,9 @@ export class RedShiftDataSource extends DataSource<any, RedshiftOptions> {
114114
const statementCommandResult = await redshiftClient.send(executeStatementCommand);
115115
return await this.getResultFromExecuteStatement(statementCommandResult, redshiftClient);
116116
} catch (e) {
117-
redshiftClient.destroy();
117+
this.logger.debug(
118+
`Errors occurred, release connection from ${profileName}`
119+
);
118120
throw e;
119121
}
120122
}
@@ -132,7 +134,7 @@ export class RedShiftDataSource extends DataSource<any, RedshiftOptions> {
132134
// https://github.com/aws/aws-sdk-js-v3/blob/29056f4ca545f7e5cf951b915bb52178305fc305/clients/client-redshift-data/src/models/models_0.ts#L604
133135
while (!describeStatementResponse || describeStatementResponse.Status !== 'FINISHED') {
134136
const describeStatementCommand = new DescribeStatementCommand(describeStatementRequestInput);
135-
describeStatementResponse = await redshiftClient.send(describeStatementCommand);
137+
describeStatementResponse = await backOff(() =>redshiftClient.send(describeStatementCommand));
136138

137139
if (
138140
describeStatementResponse.Status === 'ABORTED' ||
@@ -142,15 +144,26 @@ export class RedShiftDataSource extends DataSource<any, RedshiftOptions> {
142144
}
143145
}
144146

145-
const getStatementResultCommandParams: GetStatementResultCommandInput = {
147+
let getStatementResultCommandParams: GetStatementResultCommandInput = {
146148
"Id": describeStatementResponse.Id
147149
};
148-
const getStatementResultCommand = new GetStatementResultCommand(getStatementResultCommandParams);
149-
const getStatementResultResponse = await redshiftClient.send(getStatementResultCommand);
150+
let getStatementResultCommand = new GetStatementResultCommand(getStatementResultCommandParams);
151+
let getStatementResultResponse = await redshiftClient.send(getStatementResultCommand);
152+
const records = getStatementResultResponse.Records! || [];
153+
const columns = getStatementResultResponse.ColumnMetadata || [];
154+
155+
while (getStatementResultResponse.NextToken) {
156+
getStatementResultCommandParams = {
157+
"Id": describeStatementResponse.Id,
158+
"NextToken": getStatementResultResponse.NextToken,
159+
};
160+
getStatementResultCommand = new GetStatementResultCommand(getStatementResultCommandParams);
161+
getStatementResultResponse = await redshiftClient.send(getStatementResultCommand);
162+
records.push(...(getStatementResultResponse.Records! || []));
163+
}
150164

151165
return {
152166
getColumns: () => {
153-
const columns = getStatementResultResponse.ColumnMetadata || [];
154167
return columns.map((column) => ({
155168
name: column.name || '',
156169
type: mapFromRedShiftTypeId(column.typeName?.toLowerCase() || ''),
@@ -159,8 +172,6 @@ export class RedShiftDataSource extends DataSource<any, RedshiftOptions> {
159172
getData: () => new Readable({
160173
objectMode: true,
161174
read() {
162-
const records = getStatementResultResponse.Records! || [];
163-
const columns = getStatementResultResponse.ColumnMetadata || [];
164175
for (const record of records) {
165176
const row: RedShiftDataRow = {};
166177
for (const [i, recordField] of record.entries()) {

packages/extension-driver-redshift/yarn.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,11 @@ bowser@^2.11.0:
768768
resolved "https://registry.yarnpkg.com/bowser/-/bowser-2.11.0.tgz#5ca3c35757a7aa5771500c70a73a9f91ef420a8f"
769769
integrity sha512-AlcaJBi/pqqJBIQ8U9Mcpc9i8Aqxn88Skv5d+xBX006BY5u8N3mGLHa5Lgppa7L/HfwgwLgZ6NYs+Ag6uUmJRA==
770770

771+
exponential-backoff@^3.1.1:
772+
version "3.1.1"
773+
resolved "https://registry.yarnpkg.com/exponential-backoff/-/exponential-backoff-3.1.1.tgz#64ac7526fe341ab18a39016cd22c787d01e00bf6"
774+
integrity sha512-dX7e/LHVJ6W3DE1MHWi9S1EYzDESENfLrYohG2G++ovZrYOkm4Knwa0mc1cn84xJOR4KEU0WSchhLbd0UklbHw==
775+
771776
fast-xml-parser@4.2.5:
772777
version "4.2.5"
773778
resolved "https://registry.yarnpkg.com/fast-xml-parser/-/fast-xml-parser-4.2.5.tgz#a6747a09296a6cb34f2ae634019bf1738f3b421f"

0 commit comments

Comments
 (0)