Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
set -e

yarn install

bazel build //packages/@dataform/cli:bin

bazel run packages/@dataform/cli:package.pack
bazel run packages/@dataform/core:package.pack

mv dataform-cli-3.0.59.tgz ../hephaestus-worker-base/dataform/dataform-cli-3.0.59.tgz
mv dataform-core-3.0.59.tgz ../hephaestus-worker-base/dataform/dataform-core-3.0.59.tgz
1 change: 0 additions & 1 deletion cli/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ ts_library(
"@npm//@types/yargs",
"@npm//chokidar",
"@npm//glob",
"@npm//parse-duration",
"@npm//readline-sync",
"@npm//untildify",
"@npm//yargs",
Expand Down
1 change: 1 addition & 0 deletions cli/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ts_library(
"@npm//deepmerge",
"@npm//fs-extra",
"@npm//glob",
"@npm//google-auth-library",
"@npm//google-sql-syntax-ts",
"@npm//js-beautify",
"@npm//js-yaml",
Expand Down
66 changes: 45 additions & 21 deletions cli/api/dbadapters/bigquery.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery";
import { GoogleAuth, Impersonated } from "google-auth-library";
import Long from "long";
import { PromisePoolExecutor } from "promise-pool-executor";

Expand All @@ -17,7 +18,9 @@ import { coerceAsError } from "df/common/errors/errors";
import { retry } from "df/common/promises";
import { dataform } from "df/protos/ts";

const GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform";
const EXTRA_GOOGLE_SCOPES = ["https://www.googleapis.com/auth/drive"];
const IMPERSONATION_GOOGLE_SCOPES = [GOOGLE_CLOUD_PLATFORM_SCOPE, ...EXTRA_GOOGLE_SCOPES];

const BIGQUERY_DATE_RELATED_FIELDS = [
"BigQueryDate",
Expand All @@ -37,24 +40,40 @@ export interface IBigQueryExecutionOptions {
reservation?: string;
}

export type BigQueryClientProvider = (projectId?: string) => BigQuery;
export type BigQueryClientProvider = (projectId?: string) => BigQuery | Promise<BigQuery>;

export function createBigQueryClientProvider(
credentials: dataform.IBigQuery
): BigQueryClientProvider {
const clients = new Map<string, BigQuery>();
return (projectId?: string) => {
return async (projectId?: string) => {
projectId = projectId || credentials.projectId;
if (!clients.has(projectId)) {
clients.set(
const clientConfig: any = {
projectId,
new BigQuery({
scopes: EXTRA_GOOGLE_SCOPES,
location: credentials.location
};

if (credentials.impersonateServiceAccount) {
const sourceAuth = new GoogleAuth({
projectId,
scopes: EXTRA_GOOGLE_SCOPES,
location: credentials.location,
scopes: IMPERSONATION_GOOGLE_SCOPES,
credentials: credentials.credentials && JSON.parse(credentials.credentials)
})
);
});

const authClient = await sourceAuth.getClient();

clientConfig.authClient = new Impersonated({
sourceClient: authClient,
targetPrincipal: credentials.impersonateServiceAccount,
targetScopes: IMPERSONATION_GOOGLE_SCOPES
});
} else {
clientConfig.credentials = credentials.credentials && JSON.parse(credentials.credentials);
}

clients.set(projectId, new BigQuery(clientConfig));
}
return clients.get(projectId);
};
Expand Down Expand Up @@ -143,7 +162,7 @@ export class BigQueryDbAdapter implements IDbAdapter {
return this.pool
.addSingleTask({
generator: async () => {
const [rows, , apiResponse] = await this.getClient().query({
const [rows, , apiResponse] = await (await this.getClient()).query({
...this.prepareQueryOptions(statement, options.rowLimit, options.bigquery, options.params),
skipParsing: true
} as any);
Expand All @@ -166,8 +185,8 @@ export class BigQueryDbAdapter implements IDbAdapter {
try {
await this.pool
.addSingleTask({
generator: () =>
this.getClient().query({
generator: async () =>
(await this.getClient()).query({
useLegacySql: false,
query,
dryRun: true
Expand All @@ -193,11 +212,12 @@ export class BigQueryDbAdapter implements IDbAdapter {

public async tables(database: string, schema?: string): Promise<dataform.ITableMetadata[]> {
const datasetIds = schema ? [schema] : await this.schemas(database);
const client = await this.getClient(database);
const tablesMetadata: dataform.ITableMetadata[] = [];

await Promise.all(
datasetIds.map(async datasetId => {
const [tables] = await this.getClient(database)
const [tables] = await client
.dataset(datasetId)
.getTables({ autoPaginate: true, maxResults: 1000 });
await Promise.all(
Expand Down Expand Up @@ -290,14 +310,17 @@ export class BigQueryDbAdapter implements IDbAdapter {
}

public async deleteTable(target: dataform.ITarget): Promise<void> {
await this.getClient(target.database)
await (await this.getClient(target.database))
.dataset(target.schema)
.table(target.name)
.delete({ ignoreNotFound: true });
}

public async schemas(database: string): Promise<string[]> {
const data = await this.getClient(database).getDatasets({ autoPaginate: true, maxResults: 1000 });
const data = await (await this.getClient(database)).getDatasets({
autoPaginate: true,
maxResults: 1000
});
return data[0].map(dataset => dataset.id);
}

Expand All @@ -317,7 +340,7 @@ export class BigQueryDbAdapter implements IDbAdapter {
metadata.schema.fields
);

await this.getClient(target.database)
await (await this.getClient(target.database))
.dataset(target.schema)
.table(target.name)
.setMetadata({
Expand All @@ -329,7 +352,7 @@ export class BigQueryDbAdapter implements IDbAdapter {

private async getMetadata(target: dataform.ITarget): Promise<TableMetadata> {
try {
const table = await this.getClient(target.database)
const table = await (await this.getClient(target.database))
.dataset(target.schema)
.table(target.name)
.getMetadata();
Expand All @@ -344,8 +367,8 @@ export class BigQueryDbAdapter implements IDbAdapter {
}
}

private getClient(projectId?: string) {
return this.clientProvider(projectId);
private async getClient(projectId?: string) {
Comment thread
kolina marked this conversation as resolved.
return await this.clientProvider(projectId);
}

private async runQuery(
Expand All @@ -355,12 +378,12 @@ export class BigQueryDbAdapter implements IDbAdapter {
byteLimit?: number,
location?: string
): Promise<IExecutionResult> {
const results = await new Promise<any[]>((resolve, reject) => {
const results = await new Promise<any[]>(async (resolve, reject) => {
const allRows = new LimitedResultSet({
rowLimit,
byteLimit
});
const stream = this.getClient().createQueryStream({
const stream = (await this.getClient()).createQueryStream({
query,
params,
location
Expand Down Expand Up @@ -416,7 +439,8 @@ export class BigQueryDbAdapter implements IDbAdapter {
return retry(
async () => {
try {
const job = await this.getClient().createQueryJob(
const client = await this.getClient();
const job = await client.createQueryJob(
this.prepareQueryOptions(
query,
rowLimit,
Expand Down
29 changes: 25 additions & 4 deletions cli/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as chokidar from "chokidar";
import * as fs from "fs";
import * as glob from "glob";
import parseDuration from "parse-duration";
import * as path from "path";
import yargs from "yargs";

Expand All @@ -28,6 +27,7 @@ import {
actuallyResolve,
assertPathExists,
compiledGraphHasErrors,
parseCliDuration,
promptForIcebergConfig,
} from "df/cli/util";
import { createYargsCli, INamedOption } from "df/cli/yargswrapper";
Expand Down Expand Up @@ -174,7 +174,7 @@ const timeoutOption: INamedOption<yargs.Options> = {
type: "string",
default: null,
coerce: (rawTimeoutString: string | null) =>
rawTimeoutString ? parseDuration(rawTimeoutString) : null
rawTimeoutString ? parseCliDuration(rawTimeoutString) : null
}
};

Expand Down Expand Up @@ -207,6 +207,13 @@ const bigqueryJobLabelsOption: INamedOption<yargs.Options> = {
}
};

const impersonateServiceAccountOption: INamedOption<yargs.Options> = {
name: "impersonate-service-account",
option: {
describe: "Service account email to impersonate during authentication.",
type: "string"
}
};
const quietCompileOption: INamedOption<yargs.Options> = {
name: "quiet",
option: {
Expand Down Expand Up @@ -503,7 +510,13 @@ export function runCli() {
format: `test [${projectDirMustExistOption.name}]`,
description: "Run the dataform project's unit tests.",
positionalOptions: [projectDirMustExistOption],
options: [credentialsOption, timeoutOption, jsonOutputOption, ...ProjectConfigOptions.allYargsOptions],
options: [
credentialsOption,
impersonateServiceAccountOption,
timeoutOption,
jsonOutputOption,
...ProjectConfigOptions.allYargsOptions
],
processFn: async argv => {
if (!argv[jsonOutputOption.name]) {
print("Compiling...\n");
Expand All @@ -523,6 +536,10 @@ export function runCli() {
const readCredentials = credentials.read(
getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name])
);
if (argv[impersonateServiceAccountOption.name]) {
(readCredentials as any).impersonateServiceAccount =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend dataform.IBigQuery with your new option to avoid dynamic casts breaking static typing?

argv[impersonateServiceAccountOption.name];
}

if (!compiledGraph.tests.length) {
printError("No unit tests found.");
Expand Down Expand Up @@ -574,10 +591,10 @@ export function runCli() {
},
actionsOption,
credentialsOption,
impersonateServiceAccountOption,
fullRefreshOption,
includeDepsOption,
includeDependentsOption,
credentialsOption,
jsonOutputOption,
timeoutOption,
tagsOption,
Expand Down Expand Up @@ -610,6 +627,10 @@ export function runCli() {
const readCredentials = credentials.read(
getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name])
);
if (argv[impersonateServiceAccountOption.name]) {
(readCredentials as any).impersonateServiceAccount =
argv[impersonateServiceAccountOption.name];
}

const dbadapter = new BigQueryDbAdapter(readCredentials);
const executionGraph = await build(
Expand Down
Loading
Loading