diff --git a/build.sh b/build.sh new file mode 100755 index 000000000..8eed8681b --- /dev/null +++ b/build.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e + +yarn install + +bazel build //packages/@dataform/cli:bin + +bazel run packages/@dataform/cli:package.pack +bazel run packages/@dataform/core:package.pack + +mv dataform-cli-3.0.59.tgz ../hephaestus-worker-base/dataform/dataform-cli-3.0.59.tgz +mv dataform-core-3.0.59.tgz ../hephaestus-worker-base/dataform/dataform-core-3.0.59.tgz diff --git a/cli/BUILD b/cli/BUILD index 2eebdabb1..ee3b03e8d 100644 --- a/cli/BUILD +++ b/cli/BUILD @@ -28,7 +28,6 @@ ts_library( "@npm//@types/yargs", "@npm//chokidar", "@npm//glob", - "@npm//parse-duration", "@npm//readline-sync", "@npm//untildify", "@npm//yargs", diff --git a/cli/api/BUILD b/cli/api/BUILD index 111ff2a89..79575ed77 100644 --- a/cli/api/BUILD +++ b/cli/api/BUILD @@ -38,6 +38,7 @@ ts_library( "@npm//deepmerge", "@npm//fs-extra", "@npm//glob", + "@npm//google-auth-library", "@npm//google-sql-syntax-ts", "@npm//js-beautify", "@npm//js-yaml", diff --git a/cli/api/dbadapters/bigquery.ts b/cli/api/dbadapters/bigquery.ts index 80dfe39d0..45696b35a 100644 --- a/cli/api/dbadapters/bigquery.ts +++ b/cli/api/dbadapters/bigquery.ts @@ -1,4 +1,5 @@ import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery"; +import { GoogleAuth, Impersonated } from "google-auth-library"; import Long from "long"; import { PromisePoolExecutor } from "promise-pool-executor"; @@ -17,7 +18,9 @@ import { coerceAsError } from "df/common/errors/errors"; import { retry } from "df/common/promises"; import { dataform } from "df/protos/ts"; +const GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; const EXTRA_GOOGLE_SCOPES = ["https://www.googleapis.com/auth/drive"]; +const IMPERSONATION_GOOGLE_SCOPES = [GOOGLE_CLOUD_PLATFORM_SCOPE, ...EXTRA_GOOGLE_SCOPES]; const BIGQUERY_DATE_RELATED_FIELDS = [ "BigQueryDate", @@ -37,24 +40,40 @@ export interface IBigQueryExecutionOptions { reservation?: string; } -export type BigQueryClientProvider = (projectId?: string) => BigQuery; +export type BigQueryClientProvider = (projectId?: string) => BigQuery | Promise; export function createBigQueryClientProvider( credentials: dataform.IBigQuery ): BigQueryClientProvider { const clients = new Map(); - return (projectId?: string) => { + return async (projectId?: string) => { projectId = projectId || credentials.projectId; if (!clients.has(projectId)) { - clients.set( + const clientConfig: any = { projectId, - new BigQuery({ + scopes: EXTRA_GOOGLE_SCOPES, + location: credentials.location + }; + + if (credentials.impersonateServiceAccount) { + const sourceAuth = new GoogleAuth({ projectId, - scopes: EXTRA_GOOGLE_SCOPES, - location: credentials.location, + scopes: IMPERSONATION_GOOGLE_SCOPES, credentials: credentials.credentials && JSON.parse(credentials.credentials) - }) - ); + }); + + const authClient = await sourceAuth.getClient(); + + clientConfig.authClient = new Impersonated({ + sourceClient: authClient, + targetPrincipal: credentials.impersonateServiceAccount, + targetScopes: IMPERSONATION_GOOGLE_SCOPES + }); + } else { + clientConfig.credentials = credentials.credentials && JSON.parse(credentials.credentials); + } + + clients.set(projectId, new BigQuery(clientConfig)); } return clients.get(projectId); }; @@ -143,7 +162,7 @@ export class BigQueryDbAdapter implements IDbAdapter { return this.pool .addSingleTask({ generator: async () => { - const [rows, , apiResponse] = await this.getClient().query({ + const [rows, , apiResponse] = await (await this.getClient()).query({ ...this.prepareQueryOptions(statement, options.rowLimit, options.bigquery, options.params), skipParsing: true } as any); @@ -166,8 +185,8 @@ export class BigQueryDbAdapter implements IDbAdapter { try { await this.pool .addSingleTask({ - generator: () => - this.getClient().query({ + generator: async () => + (await this.getClient()).query({ useLegacySql: false, query, dryRun: true @@ -193,11 +212,12 @@ export class BigQueryDbAdapter implements IDbAdapter { public async tables(database: string, schema?: string): Promise { const datasetIds = schema ? [schema] : await this.schemas(database); + const client = await this.getClient(database); const tablesMetadata: dataform.ITableMetadata[] = []; await Promise.all( datasetIds.map(async datasetId => { - const [tables] = await this.getClient(database) + const [tables] = await client .dataset(datasetId) .getTables({ autoPaginate: true, maxResults: 1000 }); await Promise.all( @@ -290,14 +310,17 @@ export class BigQueryDbAdapter implements IDbAdapter { } public async deleteTable(target: dataform.ITarget): Promise { - await this.getClient(target.database) + await (await this.getClient(target.database)) .dataset(target.schema) .table(target.name) .delete({ ignoreNotFound: true }); } public async schemas(database: string): Promise { - const data = await this.getClient(database).getDatasets({ autoPaginate: true, maxResults: 1000 }); + const data = await (await this.getClient(database)).getDatasets({ + autoPaginate: true, + maxResults: 1000 + }); return data[0].map(dataset => dataset.id); } @@ -317,7 +340,7 @@ export class BigQueryDbAdapter implements IDbAdapter { metadata.schema.fields ); - await this.getClient(target.database) + await (await this.getClient(target.database)) .dataset(target.schema) .table(target.name) .setMetadata({ @@ -329,7 +352,7 @@ export class BigQueryDbAdapter implements IDbAdapter { private async getMetadata(target: dataform.ITarget): Promise { try { - const table = await this.getClient(target.database) + const table = await (await this.getClient(target.database)) .dataset(target.schema) .table(target.name) .getMetadata(); @@ -344,8 +367,8 @@ export class BigQueryDbAdapter implements IDbAdapter { } } - private getClient(projectId?: string) { - return this.clientProvider(projectId); + private async getClient(projectId?: string) { + return await this.clientProvider(projectId); } private async runQuery( @@ -355,12 +378,12 @@ export class BigQueryDbAdapter implements IDbAdapter { byteLimit?: number, location?: string ): Promise { - const results = await new Promise((resolve, reject) => { + const results = await new Promise(async (resolve, reject) => { const allRows = new LimitedResultSet({ rowLimit, byteLimit }); - const stream = this.getClient().createQueryStream({ + const stream = (await this.getClient()).createQueryStream({ query, params, location @@ -416,7 +439,8 @@ export class BigQueryDbAdapter implements IDbAdapter { return retry( async () => { try { - const job = await this.getClient().createQueryJob( + const client = await this.getClient(); + const job = await client.createQueryJob( this.prepareQueryOptions( query, rowLimit, diff --git a/cli/index.ts b/cli/index.ts index 452d71f5f..45c056797 100644 --- a/cli/index.ts +++ b/cli/index.ts @@ -1,7 +1,6 @@ import * as chokidar from "chokidar"; import * as fs from "fs"; import * as glob from "glob"; -import parseDuration from "parse-duration"; import * as path from "path"; import yargs from "yargs"; @@ -28,6 +27,7 @@ import { actuallyResolve, assertPathExists, compiledGraphHasErrors, + parseCliDuration, promptForIcebergConfig, } from "df/cli/util"; import { createYargsCli, INamedOption } from "df/cli/yargswrapper"; @@ -174,7 +174,7 @@ const timeoutOption: INamedOption = { type: "string", default: null, coerce: (rawTimeoutString: string | null) => - rawTimeoutString ? parseDuration(rawTimeoutString) : null + rawTimeoutString ? parseCliDuration(rawTimeoutString) : null } }; @@ -207,6 +207,13 @@ const bigqueryJobLabelsOption: INamedOption = { } }; +const impersonateServiceAccountOption: INamedOption = { + name: "impersonate-service-account", + option: { + describe: "Service account email to impersonate during authentication.", + type: "string" + } +}; const quietCompileOption: INamedOption = { name: "quiet", option: { @@ -503,7 +510,13 @@ export function runCli() { format: `test [${projectDirMustExistOption.name}]`, description: "Run the dataform project's unit tests.", positionalOptions: [projectDirMustExistOption], - options: [credentialsOption, timeoutOption, jsonOutputOption, ...ProjectConfigOptions.allYargsOptions], + options: [ + credentialsOption, + impersonateServiceAccountOption, + timeoutOption, + jsonOutputOption, + ...ProjectConfigOptions.allYargsOptions + ], processFn: async argv => { if (!argv[jsonOutputOption.name]) { print("Compiling...\n"); @@ -523,6 +536,10 @@ export function runCli() { const readCredentials = credentials.read( getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) ); + if (argv[impersonateServiceAccountOption.name]) { + (readCredentials as any).impersonateServiceAccount = + argv[impersonateServiceAccountOption.name]; + } if (!compiledGraph.tests.length) { printError("No unit tests found."); @@ -574,10 +591,10 @@ export function runCli() { }, actionsOption, credentialsOption, + impersonateServiceAccountOption, fullRefreshOption, includeDepsOption, includeDependentsOption, - credentialsOption, jsonOutputOption, timeoutOption, tagsOption, @@ -610,6 +627,10 @@ export function runCli() { const readCredentials = credentials.read( getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) ); + if (argv[impersonateServiceAccountOption.name]) { + (readCredentials as any).impersonateServiceAccount = + argv[impersonateServiceAccountOption.name]; + } const dbadapter = new BigQueryDbAdapter(readCredentials); const executionGraph = await build( diff --git a/cli/util.ts b/cli/util.ts index dcd77b37b..921e26c13 100644 --- a/cli/util.ts +++ b/cli/util.ts @@ -52,6 +52,147 @@ export function formatBytesInHumanReadableFormat(bytes: number): string { return `${value} ${units[i]}`; } +const DURATION_UNITS_IN_MILLIS: { [unit: string]: number } = { + ms: 1, + msec: 1, + msecs: 1, + millisecond: 1, + milliseconds: 1, + s: 1000, + sec: 1000, + secs: 1000, + second: 1000, + seconds: 1000, + m: 60 * 1000, + min: 60 * 1000, + mins: 60 * 1000, + minute: 60 * 1000, + minutes: 60 * 1000, + h: 60 * 60 * 1000, + hr: 60 * 60 * 1000, + hrs: 60 * 60 * 1000, + hour: 60 * 60 * 1000, + hours: 60 * 60 * 1000, + d: 24 * 60 * 60 * 1000, + day: 24 * 60 * 60 * 1000, + days: 24 * 60 * 60 * 1000, + w: 7 * 24 * 60 * 60 * 1000, + week: 7 * 24 * 60 * 60 * 1000, + weeks: 7 * 24 * 60 * 60 * 1000 +}; + +export function parseCliDuration(rawDuration: string): number { + const normalizedDuration = rawDuration?.trim().toLowerCase(); + if (!normalizedDuration) { + throw new Error("Duration cannot be empty."); + } + + if (isCliDurationNumber(normalizedDuration)) { + return Number(normalizedDuration); + } + + let totalDurationMillis = 0; + let matchFound = false; + let cursor = 0; + + while (cursor < normalizedDuration.length) { + while (normalizedDuration[cursor] === " ") { + cursor++; + } + if (cursor >= normalizedDuration.length) { + break; + } + + const numberStart = cursor; + if (normalizedDuration[cursor] === "+" || normalizedDuration[cursor] === "-") { + cursor++; + } + + const integerStart = cursor; + while (isAsciiDigit(normalizedDuration[cursor])) { + cursor++; + } + if (cursor === integerStart) { + throw new Error(`Invalid duration: ${rawDuration}`); + } + + if (normalizedDuration[cursor] === ".") { + cursor++; + const fractionStart = cursor; + while (isAsciiDigit(normalizedDuration[cursor])) { + cursor++; + } + if (cursor === fractionStart) { + throw new Error(`Invalid duration: ${rawDuration}`); + } + } + + while (normalizedDuration[cursor] === " ") { + cursor++; + } + + const unitStart = cursor; + while (isAsciiLetter(normalizedDuration[cursor])) { + cursor++; + } + if (cursor === unitStart) { + throw new Error(`Invalid duration: ${rawDuration}`); + } + + const durationValue = Number(normalizedDuration.slice(numberStart, unitStart).trim()); + const durationUnit = normalizedDuration.slice(unitStart, cursor); + const unitMillis = DURATION_UNITS_IN_MILLIS[durationUnit]; + if (unitMillis === undefined) { + throw new Error(`Unsupported duration unit: ${durationUnit}`); + } + + totalDurationMillis += durationValue * unitMillis; + matchFound = true; + } + + if (!matchFound) { + throw new Error(`Invalid duration: ${rawDuration}`); + } + + return totalDurationMillis; +} + +function isCliDurationNumber(value: string): boolean { + let cursor = 0; + if (value[cursor] === "+" || value[cursor] === "-") { + cursor++; + } + + const integerStart = cursor; + while (isAsciiDigit(value[cursor])) { + cursor++; + } + if (cursor === integerStart) { + return false; + } + + if (value[cursor] === ".") { + cursor++; + const fractionStart = cursor; + while (isAsciiDigit(value[cursor])) { + cursor++; + } + if (cursor === fractionStart) { + return false; + } + } + + return cursor === value.length; +} + +function isAsciiDigit(value: string): boolean { + return value >= "0" && value <= "9"; +} + +function isAsciiLetter(value: string): boolean { + return value >= "a" && value <= "z"; +} + /** * Handles prompting and validation for defaultBucketName, defaultTableFolderRoot * and defaultTableFolderSubpath if the user provides the --iceberg flag when diff --git a/cli/util_test.ts b/cli/util_test.ts index b86f89370..cc15781fe 100644 --- a/cli/util_test.ts +++ b/cli/util_test.ts @@ -3,6 +3,7 @@ import { expect } from "chai"; import { formatBytesInHumanReadableFormat, formatExecutionSuffix, + parseCliDuration, validateIcebergConfigBucketName, validateIcebergConfigTableFolderRoot, validateIcebergConfigTableFolderSubpath, @@ -35,6 +36,30 @@ suite('format bytes in human readable format', () => { }); }); +suite("parse cli duration", () => { + test("parses numeric durations as milliseconds", () => { + expect(parseCliDuration("1500")).equals(1500); + }); + + test("parses single-unit durations", () => { + expect(parseCliDuration("1s")).equals(1000); + expect(parseCliDuration("10m")).equals(600000); + expect(parseCliDuration("2 hours")).equals(7200000); + }); + + test("parses compound and fractional durations", () => { + expect(parseCliDuration("1h30m")).equals(5400000); + expect(parseCliDuration("1.5m")).equals(90000); + expect(parseCliDuration("1 week 2 days")).equals(777600000); + }); + + test("rejects invalid durations", () => { + expect(() => parseCliDuration("")).to.throw("Duration cannot be empty."); + expect(() => parseCliDuration("tomorrow")).to.throw("Invalid duration: tomorrow"); + expect(() => parseCliDuration("1fortnight")).to.throw("Unsupported duration unit: fortnight"); + }); +}); + suite('Iceberg Config Validation', () => { suite('validateIcebergConfigBucketName', () => { test('valid bucket names do not throw errors', () => { diff --git a/package.json b/package.json index f102c21c4..47ae198c5 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "estraverse": "^5.1.0", "fs-extra": "^9.0.0", "glob": "13.0.6", + "google-auth-library": "^10.0.0-rc.1", "google-sql-syntax-ts": "^1.0.3", "js-beautify": "^1.10.2", "js-yaml": "^4.1.1", @@ -45,7 +46,6 @@ "minimist": "^1.2.6", "moo": "^0.5.0", "object-sizeof": "^1.6.1", - "parse-duration": "^1.0.0", "prettier": "^1.14.2", "promise-pool-executor": "^1.1.1", "protobufjs": "^7.5.8", diff --git a/packages/@dataform/cli/BUILD b/packages/@dataform/cli/BUILD index 7a1d7085c..60ca46549 100644 --- a/packages/@dataform/cli/BUILD +++ b/packages/@dataform/cli/BUILD @@ -33,12 +33,12 @@ externals = [ "deepmerge", "fs-extra", "glob", + "google-auth-library", "google-sql-syntax-ts", "js-beautify", "js-yaml", "moo", "object-sizeof", - "parse-duration", "promise-pool-executor", "protobufjs", "readline-sync", diff --git a/protos/profiles.proto b/protos/profiles.proto index 09eabcdb8..37b982ac9 100644 --- a/protos/profiles.proto +++ b/protos/profiles.proto @@ -11,6 +11,8 @@ message BigQuery { string credentials = 3; // Options are listed here: https://cloud.google.com/bigquery/docs/locations string location = 4; + // Service account email to impersonate during authentication + string impersonate_service_account = 5; reserved 2; } diff --git a/yarn.lock b/yarn.lock index e9a1d126a..44cb73285 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3027,7 +3027,7 @@ mkdirp@^0.5.1, mkdirp@~0.5.1: mkdirp@^1.0.4: version "1.0.4" resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-1.0.4.tgz#3eb5ed62622756d79a5f0e2a221dfebad75c2f7e" - integrity "sha1-PrXtYmInVteaXw4qIh3+utdcL34= sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==" + integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw== monaco-editor@^0.44.0: version "0.44.0" @@ -3236,11 +3236,6 @@ package-json-from-dist@^1.0.0: resolved "https://registry.yarnpkg.com/package-json-from-dist/-/package-json-from-dist-1.0.1.tgz#4f1471a010827a86f94cfd9b0727e36d267de505" integrity sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw== -parse-duration@^1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/parse-duration/-/parse-duration-1.0.0.tgz#8605651745f61088f6fb14045c887526c291858c" - integrity "sha1-hgVlF0X2EIj2+xQEXIh1JsKRhYw= sha512-X4kUkCTHU1N/kEbwK9FpUJ0UZQa90VzeczfS704frR30gljxDG0pSziws06XlK+CGRSo/1wtG1mFIdBFQTMQNw==" - parse-semver@^1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/parse-semver/-/parse-semver-1.1.1.tgz#9a4afd6df063dc4826f93fba4a99cf223f666cb8"