Skip to content

Commit 7920946

Browse files
committed
skip export parquet file if it exists
1 parent de4fb81 commit 7920946

3 files changed

Lines changed: 125 additions & 24 deletions

File tree

packages/core/src/lib/cache-layer/cacheLayerLoader.ts

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,40 @@ export class CacheLayerLoader implements ICacheLayerLoader {
4343
templateName: string,
4444
cache: CacheLayerInfo
4545
): Promise<void> {
46-
const { cacheTableName, sql, profile, indexes } = cache;
46+
const { cacheTableName, sql, profile, indexes, folderSubpath } = cache;
4747
const type = this.options.type!;
4848
const dataSource = this.dataSourceFactory(profile);
4949

5050
// generate directory for cache file path to export
5151
// format => [folderPath]/[schema.templateSource]/[profileName]/[cacheTableName]]/[timestamp]
52+
const subpath = folderSubpath || moment.utc().format('YYYYMMDDHHmmss');
5253
const directory = path.resolve(
5354
this.options.folderPath!,
5455
templateName,
5556
profile,
5657
cacheTableName,
57-
moment.utc().format('YYYYMMDDHHmmss')
58+
subpath
5859
);
59-
60-
if (!fs.existsSync(directory!))
61-
fs.mkdirSync(directory!, { recursive: true });
62-
63-
// 1. export to cache files according to each schema set the cache value
64-
this.logger.debug(`Start to export to ${type} file in "${directory}"`);
65-
await dataSource.export({
66-
sql,
67-
directory,
68-
profileName: profile,
69-
type,
70-
});
60+
const parquetFiles = this.getParquetFiles(directory);
61+
if (!parquetFiles.length) {
62+
if (!fs.existsSync(directory!)) {
63+
fs.mkdirSync(directory!, { recursive: true });
64+
}
65+
// 1. export to cache files according to each schema set the cache value
66+
this.logger.debug(`Start to export to ${type} file in "${directory}"`);
67+
await dataSource.export({
68+
sql,
69+
directory,
70+
profileName: profile,
71+
type,
72+
});
73+
} else {
74+
this.logger.debug(
75+
`Parquet file \n ${parquetFiles.join(
76+
'\n '
77+
)} found in ${directory}, skip export`
78+
);
79+
}
7180
this.logger.debug(`Start to load ${cacheTableName} in "${directory}"`);
7281
// 2. load the files to cache data source
7382
await this.cacheStorage.import({
@@ -81,4 +90,16 @@ export class CacheLayerLoader implements ICacheLayerLoader {
8190
indexes,
8291
});
8392
}
93+
94+
private getParquetFiles(directory: string): string[] {
95+
if (!directory || !fs.existsSync(directory)) return [];
96+
const files = fs.readdirSync(directory);
97+
const parquetFiles = [];
98+
for (const file of files) {
99+
if (/\.parquet$/.test(file)) {
100+
parquetFiles.push(file);
101+
}
102+
}
103+
return parquetFiles;
104+
}
84105
}

packages/core/src/models/artifact.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ export class CacheLayerInfo {
114114
refreshExpression?: RefreshExpression;
115115
// index key name -> index column
116116
indexes?: Record<string, string>;
117+
// cache folder subpath
118+
folderSubpath?: string;
117119
}
118120

119121
export class APISchema {

packages/core/test/cache-layer/cacheLayerLoader.spec.ts

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import * as fs from 'fs';
2+
import * as duckdb from 'duckdb';
23
import * as sinon from 'ts-sinon';
4+
import * as path from 'path';
35
import {
46
CacheLayerInfo,
57
CacheLayerLoader,
@@ -9,7 +11,7 @@ import {
911
vulcanCacheSchemaName,
1012
} from '@vulcan-sql/core';
1113
import { MockDataSource, getQueryResults } from './mockDataSource';
12-
14+
const db = new duckdb.Database(':memory:');
1315
describe('Test cache layer loader', () => {
1416
const folderPath = 'loader-test-exported-parquets';
1517
const profiles = [
@@ -52,13 +54,70 @@ describe('Test cache layer loader', () => {
5254
fs.rmSync(folderPath, { recursive: true, force: true });
5355
});
5456

57+
// it.each([
58+
// {
59+
// templateName: 'template-1',
60+
// cache: {
61+
// cacheTableName: 'employees',
62+
// sql: sinon.default.stub() as any,
63+
// profile: profiles[0].name,
64+
// } as CacheLayerInfo,
65+
// },
66+
// {
67+
// templateName: 'template-1',
68+
// cache: {
69+
// cacheTableName: 'departments',
70+
// sql: sinon.default.stub() as any,
71+
// profile: profiles[1].name,
72+
// } as CacheLayerInfo,
73+
// },
74+
// {
75+
// templateName: 'template-2',
76+
// cache: {
77+
// cacheTableName: 'jobs',
78+
// sql: sinon.default.stub() as any,
79+
// profile: profiles[2].name,
80+
// } as CacheLayerInfo,
81+
// },
82+
// ])(
83+
// 'Should export and load $cache.cacheTableName successful when start loader with cache settings for $templateName',
84+
// async ({ templateName, cache }) => {
85+
// // Arrange
86+
// // Act
87+
// const loader = new CacheLayerLoader(options, stubFactory as any);
88+
// await loader.load(templateName, cache);
89+
90+
// // Assert
91+
// const actual = (
92+
// await getQueryResults(
93+
// "select * from information_schema.tables where table_schema = 'vulcan'"
94+
// )
95+
// ).map((row) => {
96+
// return {
97+
// table: row['table_name'],
98+
// schema: row['table_schema'],
99+
// };
100+
// });
101+
// expect(actual).toEqual(
102+
// expect.arrayContaining([
103+
// {
104+
// table: cache.cacheTableName,
105+
// schema: vulcanCacheSchemaName,
106+
// },
107+
// ])
108+
// );
109+
// },
110+
// // Set 50s timeout to test cache loader export and load data
111+
// 50 * 10000
112+
// );
55113
it.each([
56114
{
57115
templateName: 'template-1',
58116
cache: {
59117
cacheTableName: 'employees',
60118
sql: sinon.default.stub() as any,
61119
profile: profiles[0].name,
120+
folderSubpath: '2023',
62121
} as CacheLayerInfo,
63122
},
64123
{
@@ -67,20 +126,22 @@ describe('Test cache layer loader', () => {
67126
cacheTableName: 'departments',
68127
sql: sinon.default.stub() as any,
69128
profile: profiles[1].name,
70-
} as CacheLayerInfo,
71-
},
72-
{
73-
templateName: 'template-2',
74-
cache: {
75-
cacheTableName: 'jobs',
76-
sql: sinon.default.stub() as any,
77-
profile: profiles[2].name,
129+
folderSubpath: '2023',
78130
} as CacheLayerInfo,
79131
},
80132
])(
81-
'Should export and load $cache.cacheTableName successful when start loader with cache settings for $templateName',
133+
'Should use existed parquet to load cache table: $cache.cacheTableName',
82134
async ({ templateName, cache }) => {
83135
// Arrange
136+
const { profile, cacheTableName, folderSubpath } = cache;
137+
const dir = path.resolve(
138+
folderPath,
139+
templateName,
140+
profile,
141+
cacheTableName,
142+
folderSubpath!
143+
);
144+
await createParquetFile(dir, cacheTableName);
84145
// Act
85146
const loader = new CacheLayerLoader(options, stubFactory as any);
86147
await loader.load(templateName, cache);
@@ -109,3 +170,20 @@ describe('Test cache layer loader', () => {
109170
50 * 10000
110171
);
111172
});
173+
174+
async function createParquetFile(path: string, fileName: string) {
175+
if (!fs.existsSync(path)) {
176+
fs.mkdirSync(path, { recursive: true });
177+
}
178+
db.run(`CREATE OR REPLACE TABLE parquet_table (i integer)`);
179+
db.run(`INSERT INTO parquet_table (i) VALUES (1)`);
180+
return new Promise((resolve, reject) => {
181+
db.run(
182+
`COPY (SELECT * FROM parquet_table) TO '${path}/${fileName}.parquet' (FORMAT PARQUET);`,
183+
(err: any) => {
184+
if (err) reject(err);
185+
resolve(true);
186+
}
187+
);
188+
});
189+
}

0 commit comments

Comments
 (0)