Skip to content

Commit 0309539

Browse files
committed
add redshift extension
1 parent de0c747 commit 0309539

18 files changed

Lines changed: 1586 additions & 3 deletions
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"extends": ["../../.eslintrc.json"],
3+
"ignorePatterns": ["!**/*"],
4+
"overrides": [
5+
{
6+
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
7+
"rules": {}
8+
},
9+
{
10+
"files": ["*.ts", "*.tsx"],
11+
"rules": {}
12+
},
13+
{
14+
"files": ["*.js", "*.jsx"],
15+
"rules": {}
16+
}
17+
]
18+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# extension-driver-redshift
2+
3+
[@aws-sdk/client-redshift-data](https://www.npmjs.com/package/@aws-sdk/client-redshift-data) driver for VulcanSQL.
4+
5+
reference: https://github.com/aws/aws-sdk-js-v3/tree/main/clients/client-redshift-data
6+
7+
## Install
8+
9+
1. Install package
10+
11+
```bash
12+
npm i @vulcan-sql/extension-driver-redshift
13+
```
14+
15+
2. Update `vulcan.yaml`, enable the extension.
16+
17+
```yaml
18+
extensions:
19+
redshift: '@vulcan-sql/extension-driver-redshift'
20+
```
21+
22+
3. Create a new profile in `profiles.yaml` or in your profiles' paths.
23+
24+
```yaml
25+
- name: redshift # profile name
26+
type: redshift
27+
allow: "*"
28+
connection:
29+
# please see the type definition of RedshiftDataClientConfig
30+
# https://github.com/aws/aws-sdk-js-v3/blob/29056f4ca545f7e5cf951b915bb52178305fc305/clients/client-redshift-data/src/RedshiftDataClient.ts#L253C18-L253C42
31+
credentials:
32+
accessKeyId: <REDSHIFT_ACCESS_KEY_ID>
33+
secretAccessKey: <REDSHIFT_SECRET_ACCESS_KEY>
34+
# please see the type definition of ExecuteStatementCommandInput(omit Sql and Parameters)
35+
# https://github.com/aws/aws-sdk-js-v3/blob/29056f4ca545f7e5cf951b915bb52178305fc305/clients/client-redshift-data/src/models/models_0.ts#L805C18-L805C39
36+
Database: <REDSHIFT_DATABASE>
37+
WorkgroupName: <AWS_REDSHIFT_WORKGROUP_NAME>
38+
```
39+
40+
## Testing
41+
42+
```bash
43+
nx test extension-driver-redshift
44+
```
45+
46+
This library was generated with [Nx](https://nx.dev).
47+
48+
To run test, the following environment variables are required:
49+
50+
- AWS_ACCESS_KEY_ID
51+
- AWS_SECRET_ACCESS_KEY
52+
- AWS_REDSHIFT_DATABASE
53+
- AWS_REDSHIFT_WORKGROUP_NAME
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
module.exports = {
2+
displayName: 'extension-driver-redshift',
3+
preset: '../../jest.preset.ts',
4+
globals: {
5+
'ts-jest': {
6+
tsconfig: '<rootDir>/tsconfig.spec.json',
7+
},
8+
},
9+
transform: {
10+
'^.+\\.[tj]s$': 'ts-jest',
11+
},
12+
moduleFileExtensions: ['ts', 'js', 'html'],
13+
coverageDirectory: '../../coverage/packages/extension-driver-redshift',
14+
};
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "@vulcan-sql/extension-driver-redshift",
3+
"description": "Redshift driver for VulcanSQL",
4+
"version": "0.9.1",
5+
"type": "commonjs",
6+
"publishConfig": {
7+
"access": "public"
8+
},
9+
"keywords": [
10+
"vulcan",
11+
"vulcan-sql",
12+
"data",
13+
"sql",
14+
"database",
15+
"data-warehouse",
16+
"data-lake",
17+
"api-builder",
18+
"redshift"
19+
],
20+
"repository": {
21+
"type": "git",
22+
"url": "https://github.com/Canner/vulcan.git"
23+
},
24+
"license": "Apache-2.0",
25+
"dependencies": {
26+
"@aws-sdk/client-redshift-data": "^3.405.0"
27+
},
28+
"peerDependencies": {
29+
"@vulcan-sql/core": "~0.9.1-0"
30+
}
31+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
{
2+
"root": "packages/extension-driver-redshift",
3+
"sourceRoot": "packages/extension-driver-redshift/src",
4+
"targets": {
5+
"build": {
6+
"executor": "@nrwl/workspace:run-commands",
7+
"options": {
8+
"command": "yarn ts-node ./tools/scripts/replaceAlias.ts extension-driver-redshift"
9+
},
10+
"dependsOn": [
11+
{
12+
"projects": "self",
13+
"target": "tsc"
14+
},
15+
{
16+
"projects": "self",
17+
"target": "install-dependencies"
18+
}
19+
]
20+
},
21+
"tsc": {
22+
"executor": "@nrwl/js:tsc",
23+
"outputs": ["{options.outputPath}"],
24+
"options": {
25+
"outputPath": "dist/packages/extension-driver-redshift",
26+
"main": "packages/extension-driver-redshift/src/index.ts",
27+
"tsConfig": "packages/extension-driver-redshift/tsconfig.lib.json",
28+
"assets": ["packages/extension-driver-redshift/*.md"],
29+
"buildableProjectDepsInPackageJsonType": "dependencies"
30+
},
31+
"dependsOn": [
32+
{
33+
"projects": "dependencies",
34+
"target": "build"
35+
},
36+
{
37+
"projects": "self",
38+
"target": "install-dependencies"
39+
}
40+
]
41+
},
42+
"lint": {
43+
"executor": "@nrwl/linter:eslint",
44+
"outputs": ["{options.outputFile}"],
45+
"options": {
46+
"lintFilePatterns": ["packages/extension-driver-redshift/**/*.ts"]
47+
}
48+
},
49+
"test": {
50+
"executor": "@nrwl/jest:jest",
51+
"outputs": ["coverage/packages/extension-driver-redshift"],
52+
"options": {
53+
"jestConfig": "packages/extension-driver-redshift/jest.config.ts",
54+
"passWithNoTests": true
55+
},
56+
"dependsOn": [
57+
{
58+
"projects": "self",
59+
"target": "install-dependencies"
60+
}
61+
]
62+
},
63+
"publish": {
64+
"executor": "@nrwl/workspace:run-commands",
65+
"options": {
66+
"command": "node ../../../tools/scripts/publish.mjs {args.tag} {args.version}",
67+
"cwd": "dist/packages/extension-driver-redshift"
68+
},
69+
"dependsOn": [
70+
{
71+
"projects": "self",
72+
"target": "build"
73+
}
74+
]
75+
},
76+
"install-dependencies": {
77+
"executor": "@nrwl/workspace:run-commands",
78+
"options": {
79+
"command": "yarn",
80+
"cwd": "packages/extension-driver-redshift"
81+
}
82+
}
83+
},
84+
"tags": []
85+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export * from './lib/redshiftDataSource';
2+
import { RedShiftDataSource } from './lib/redshiftDataSource';
3+
export default [RedShiftDataSource];
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import {
2+
DataSource,
3+
DataResult,
4+
ExecuteOptions,
5+
InternalError,
6+
RequestParameter,
7+
VulcanExtensionId,
8+
} from '@vulcan-sql/core';
9+
import { Readable } from 'stream';
10+
import { buildSQL } from './sqlBuilder';
11+
import { mapFromRedShiftTypeId } from './typeMapper';
12+
import {
13+
RedshiftDataClient,
14+
RedshiftDataClientConfig,
15+
ExecuteStatementCommand,
16+
ExecuteStatementCommandInput,
17+
ExecuteStatementCommandOutput,
18+
DescribeStatementCommandInput,
19+
DescribeStatementResponse,
20+
DescribeStatementCommand,
21+
GetStatementResultCommandInput,
22+
GetStatementResultCommand,
23+
SqlParameter,
24+
} from '@aws-sdk/client-redshift-data';
25+
26+
export type RedshiftOptions = RedshiftDataClientConfig & Omit<ExecuteStatementCommandInput, "Sql" | "Parameters">;
27+
28+
type RedShiftDataRow = {
29+
[column: string]: any;
30+
}
31+
32+
@VulcanExtensionId('redshift')
33+
export class RedShiftDataSource extends DataSource<any, RedshiftOptions> {
34+
private logger = this.getLogger();
35+
private redshiftClientMapping = new Map<
36+
string,
37+
{
38+
redshiftClient: RedshiftDataClient;
39+
options?: RedshiftOptions;
40+
}
41+
>();
42+
public override async onActivate() {
43+
const profiles = this.getProfiles().values();
44+
for (const profile of profiles) {
45+
this.logger.debug(
46+
`Initializing profile: ${profile.name} using redshift driver`
47+
);
48+
49+
const redshiftClient = new RedshiftDataClient(profile.connection!);
50+
this.redshiftClientMapping.set(profile.name, {
51+
redshiftClient: redshiftClient,
52+
options: profile.connection,
53+
});
54+
55+
await this.testConnection(profile.name);
56+
this.logger.debug(`Profile ${profile.name} initialized`);
57+
}
58+
}
59+
60+
public async execute({
61+
statement: sql,
62+
bindParams,
63+
profileName,
64+
operations,
65+
}: ExecuteOptions): Promise<DataResult> {
66+
this.checkProfileExist(profileName);
67+
const { redshiftClient, options } = this.redshiftClientMapping.get(profileName)!;
68+
69+
try {
70+
const sqlParams: SqlParameter[] = [];
71+
bindParams.forEach((value, key) => {
72+
sqlParams.push({ name: key.replace(':', ''), value: String(value) });
73+
});
74+
75+
const builtSQL = buildSQL(sql, operations);
76+
let executeStatementCommandParams: ExecuteStatementCommandInput = {
77+
Sql: builtSQL,
78+
Database: options!.Database,
79+
WorkgroupName: options!.WorkgroupName,
80+
};
81+
if (sqlParams.length) {
82+
executeStatementCommandParams = {...executeStatementCommandParams, Parameters: sqlParams}
83+
}
84+
85+
const executeStatementCommand = new ExecuteStatementCommand(executeStatementCommandParams);
86+
const statementCommandResult = await redshiftClient.send(executeStatementCommand);
87+
return await this.getResultFromExecuteStatement(statementCommandResult, redshiftClient);
88+
} catch (e: any) {
89+
this.logger.debug(
90+
`Errors occurred, release connection from ${profileName}`
91+
);
92+
redshiftClient.destroy();
93+
throw e;
94+
}
95+
}
96+
97+
public async prepare({ parameterIndex }: RequestParameter) {
98+
// see the section of Running SQL statements with parameters when calling the Amazon Redshift Data API
99+
// https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html
100+
return `:${parameterIndex}`;
101+
}
102+
103+
private async testConnection(profileName: string): Promise<DataResult | undefined> {
104+
const { redshiftClient, options } = this.redshiftClientMapping.get(profileName)!;
105+
const executeStatementCommandParams: ExecuteStatementCommandInput = {
106+
Sql: 'select 1',
107+
Database: options!.Database,
108+
WorkgroupName: options!.WorkgroupName,
109+
};
110+
111+
const executeStatementCommand = new ExecuteStatementCommand(executeStatementCommandParams);
112+
113+
try {
114+
const statementCommandResult = await redshiftClient.send(executeStatementCommand);
115+
return await this.getResultFromExecuteStatement(statementCommandResult, redshiftClient);
116+
} catch (e) {
117+
redshiftClient.destroy();
118+
throw e;
119+
}
120+
}
121+
122+
private async getResultFromExecuteStatement(
123+
statementCommandResult: ExecuteStatementCommandOutput,
124+
redshiftClient: RedshiftDataClient
125+
): Promise<DataResult> {
126+
let describeStatementResponse: DescribeStatementResponse | undefined;
127+
const describeStatementRequestInput: DescribeStatementCommandInput = {
128+
Id: statementCommandResult.Id,
129+
};
130+
131+
// definition of describeStatementResponse.Status
132+
// https://github.com/aws/aws-sdk-js-v3/blob/29056f4ca545f7e5cf951b915bb52178305fc305/clients/client-redshift-data/src/models/models_0.ts#L604
133+
while (!describeStatementResponse || describeStatementResponse.Status !== 'FINISHED') {
134+
const describeStatementCommand = new DescribeStatementCommand(describeStatementRequestInput);
135+
describeStatementResponse = await redshiftClient.send(describeStatementCommand);
136+
137+
if (
138+
describeStatementResponse.Status === 'ABORTED' ||
139+
describeStatementResponse.Status === 'FAILED'
140+
) {
141+
throw describeStatementResponse.Error
142+
}
143+
}
144+
145+
const getStatementResultCommandParams: GetStatementResultCommandInput = {
146+
"Id": describeStatementResponse.Id
147+
};
148+
const getStatementResultCommand = new GetStatementResultCommand(getStatementResultCommandParams);
149+
const getStatementResultResponse = await redshiftClient.send(getStatementResultCommand);
150+
151+
return {
152+
getColumns: () => {
153+
const columns = getStatementResultResponse.ColumnMetadata || [];
154+
return columns.map((column) => ({
155+
name: column.name || '',
156+
type: mapFromRedShiftTypeId(column.typeName?.toLowerCase() || ''),
157+
}));
158+
},
159+
getData: () => new Readable({
160+
objectMode: true,
161+
read() {
162+
const records = getStatementResultResponse.Records! || [];
163+
const columns = getStatementResultResponse.ColumnMetadata || [];
164+
for (const record of records) {
165+
const row: RedShiftDataRow = {};
166+
for (const [i, recordField] of record.entries()) {
167+
row[columns[i].name!] = Object.values(recordField)[0];
168+
}
169+
this.push(row);
170+
}
171+
this.push(null);
172+
},
173+
// automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16
174+
autoDestroy: true,
175+
}),
176+
};
177+
}
178+
179+
private checkProfileExist(profileName: string): void {
180+
if (!this.redshiftClientMapping.has(profileName)) {
181+
throw new InternalError(`Profile instance ${profileName} not found`);
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)