From e907c5361faeacfb720958d3003da55143c009a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Sans=C3=B3n?= Date: Tue, 18 Jun 2024 18:16:44 +0200 Subject: [PATCH] tmp parquet files --- .changeset/cuddly-baboons-help.md | 6 ++ .../manager/findAndMaterializeQueries.test.ts | 59 +++++++++++++++++++ packages/source_manager/src/manager/index.ts | 13 +++- packages/storage_driver/src/drivers/base.ts | 3 + .../src/drivers/disk/DiskDriver.test.ts | 24 ++++++++ .../storage_driver/src/drivers/disk/index.ts | 16 +++++ .../storage_driver/src/drivers/s3/index.ts | 19 ++++++ 7 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 .changeset/cuddly-baboons-help.md diff --git a/.changeset/cuddly-baboons-help.md b/.changeset/cuddly-baboons-help.md new file mode 100644 index 00000000..0179e51b --- /dev/null +++ b/.changeset/cuddly-baboons-help.md @@ -0,0 +1,6 @@ +--- +"@latitude-data/source-manager": patch +"@latitude-data/storage-driver": patch +--- + +Parquet files will be stored in a temporal path while materialization is still in process. Only when it finishes, it will then be moved to the actual path. diff --git a/packages/source_manager/src/manager/findAndMaterializeQueries.test.ts b/packages/source_manager/src/manager/findAndMaterializeQueries.test.ts index fa15e222..bee664b2 100644 --- a/packages/source_manager/src/manager/findAndMaterializeQueries.test.ts +++ b/packages/source_manager/src/manager/findAndMaterializeQueries.test.ts @@ -5,6 +5,8 @@ import { ConnectorType } from '@/types' import findAndMaterializeQueries from './findAndMaterializeQueries' import { getStorageDriver } from '@latitude-data/storage-driver' import { v4 as uuidv4 } from 'uuid' +import path from 'path' +import fs from 'fs' const QUERIES_DIR = 'queries' const MATERIALIZED_DIR = 'storage' @@ -225,4 +227,61 @@ describe('findAndMaterializeQueries', () => { ], }) }) + + it('Does not remove older materialized files if new materialization fails', async () => { + mockFs({ + [QUERIES_DIR]: { + 'query.sql': materializableFailingSql(), + 'source.yml': `type: ${ConnectorType.TestInternal}`, + }, + [MATERIALIZED_DIR]: {}, + }) + + const storage = getStorageDriver({ + type: 'disk', + path: `/${MATERIALIZED_DIR}`, + }) + const customManager = new SourceManager(`${QUERIES_DIR}`, { storage }) + + const materializedFile = await customManager.materializationUrl('query.sql') + const originalContent = 'PARQUET_QUERY_CONTENT' + fs.mkdirSync(path.dirname(materializedFile), { recursive: true }) + fs.writeFileSync(materializedFile, originalContent) + + await findAndMaterializeQueries({ sourceManager: customManager }) + + expect(fs.existsSync(materializedFile)).toBe(true) + expect(fs.readFileSync(materializedFile).toString()).toBe(originalContent) + }) + + it('Updates older materialized files if new materialization succeeds', async () => { + mockFs({ + [QUERIES_DIR]: { + 'query.sql': materializableSql(), + 'source.yml': `type: ${ConnectorType.TestInternal}`, + }, + [MATERIALIZED_DIR]: {}, + }) + + const storage = getStorageDriver({ + type: 'disk', + path: `/${MATERIALIZED_DIR}`, + }) + const customManager = new SourceManager(`${QUERIES_DIR}`, { storage }) + + const materializedFile = await customManager.materializationUrl('query.sql') + const originalContent = 'PARQUET_QUERY_CONTENT' + fs.mkdirSync(path.dirname(materializedFile), { recursive: true }) + fs.writeFileSync(materializedFile, originalContent) + + await findAndMaterializeQueries({ + sourceManager: customManager, + force: true, + }) + + expect(fs.existsSync(materializedFile)).toBe(true) + expect(fs.readFileSync(materializedFile).toString()).not.toBe( + originalContent, + ) + }) }) diff --git a/packages/source_manager/src/manager/index.ts b/packages/source_manager/src/manager/index.ts index 46b03268..86aadf78 100644 --- a/packages/source_manager/src/manager/index.ts +++ b/packages/source_manager/src/manager/index.ts @@ -19,6 +19,7 @@ import { createHash } from 'crypto' import { ParquetWriter } from '@dsnp/parquetjs' import { WriteStreamMinimal } from '@dsnp/parquetjs/dist/lib/util' import { buildParquetSchema } from './parquetUtils' +import { v4 as uuidv4 } from 'uuid' const MATERIALIZED_DIR_IN_STORAGE = 'materialized' @@ -179,6 +180,8 @@ export default class SourceManager { force?: boolean batchSize?: number }): Promise { + const tmpParquetFilepath = `${MATERIALIZED_DIR_IN_STORAGE}/.tmp/${uuidv4()}.parquet` + try { const source = await this.loadFromQuery(queryPath) const { config } = await source.getMetadataFromQuery(queryPath) @@ -199,8 +202,8 @@ export default class SourceManager { const startTime = performance.now() const compiled = await source.compileQuery({ queryPath, params: {} }) - - const stream = await this.materializedStorage.createWriteStream(filename) + const stream = + await this.materializedStorage.createWriteStream(tmpParquetFilepath) let writer: ParquetWriter const ROW_GROUP_SIZE = 4096 // How many rows are in the ParquetWriter file buffer at a time @@ -237,12 +240,14 @@ export default class SourceManager { .catch(reject) }) - const endTime = performance.now() + await this.materializedStorage.move(tmpParquetFilepath, filename) const fileSize = await this.materializedStorage .stat(filename) .then((stat: FileStat) => stat.size) + const endTime = performance.now() + return { queryPath, cached: false, @@ -253,6 +258,8 @@ export default class SourceManager { time: endTime - startTime, } } catch (error) { + this.materializedStorage.delete(tmpParquetFilepath) + return { queryPath, cached: false, diff --git a/packages/storage_driver/src/drivers/base.ts b/packages/storage_driver/src/drivers/base.ts index 3e8dac84..67b3ff19 100644 --- a/packages/storage_driver/src/drivers/base.ts +++ b/packages/storage_driver/src/drivers/base.ts @@ -17,5 +17,8 @@ export abstract class StorageDriver { encoding?: T, ): Promise + abstract move(from: string, to: string): Promise + abstract delete(path: string): Promise + abstract createWriteStream(path: string): Promise } diff --git a/packages/storage_driver/src/drivers/disk/DiskDriver.test.ts b/packages/storage_driver/src/drivers/disk/DiskDriver.test.ts index b6de2610..6ddb15e8 100644 --- a/packages/storage_driver/src/drivers/disk/DiskDriver.test.ts +++ b/packages/storage_driver/src/drivers/disk/DiskDriver.test.ts @@ -121,4 +121,28 @@ describe('DiskDriver', () => { const content = fs.readFileSync('/mocked/path/file.txt') expect(content.toString()).toBe('hello world') }) + + it('should move a file', async () => { + const fromPath = '/mocked/path/from.txt' + const toPath = '/mocked/path/to.txt' + mockFs({ + '/mocked/path': { + 'from.txt': 'content', + }, + }) + await driver.move(fromPath, toPath) + expect(fs.existsSync(toPath)).toBe(true) + expect(fs.existsSync(fromPath)).toBe(false) + }) + + it('should not move a file if it does not exist', async () => { + const fromPath = '/mocked/path/from.txt' + const toPath = '/mocked/path/to.txt' + mockFs({ + '/mocked/path': {}, + }) + await driver.move(fromPath, toPath) + expect(fs.existsSync(toPath)).toBe(false) + expect(fs.existsSync(fromPath)).toBe(false) + }) }) diff --git a/packages/storage_driver/src/drivers/disk/index.ts b/packages/storage_driver/src/drivers/disk/index.ts index d91a56e6..986c2872 100644 --- a/packages/storage_driver/src/drivers/disk/index.ts +++ b/packages/storage_driver/src/drivers/disk/index.ts @@ -84,4 +84,20 @@ export class DiskDriver extends StorageDriver { await this.createDirIfNotExists(filepath) return fs.createWriteStream(await this.resolveUrl(filepath)) } + + async move(from: string, to: string): Promise { + const fromPath = await this.resolveUrl(from) + if (!fs.existsSync(fromPath)) return + + await this.createDirIfNotExists(to) + const toPath = await this.resolveUrl(to) + + fs.renameSync(fromPath, toPath) + } + + async delete(filepath: string): Promise { + const filePath = await this.resolveUrl(filepath) + if (!fs.existsSync(filePath)) return + fs.unlinkSync(filePath) + } } diff --git a/packages/storage_driver/src/drivers/s3/index.ts b/packages/storage_driver/src/drivers/s3/index.ts index d8a9c817..be3ad983 100644 --- a/packages/storage_driver/src/drivers/s3/index.ts +++ b/packages/storage_driver/src/drivers/s3/index.ts @@ -7,6 +7,8 @@ import { GetObjectCommand, PutObjectCommand, HeadObjectCommand, + CopyObjectCommand, + DeleteObjectCommand, } from '@aws-sdk/client-s3' import { Upload } from '@aws-sdk/lib-storage' import { PassThrough } from 'stream' @@ -132,6 +134,23 @@ export class S3Driver extends StorageDriver { return buffer as T extends undefined ? Buffer : string } + async move(from: string, to: string): Promise { + await this.client.send( + new CopyObjectCommand({ + Bucket: this.bucket, + Key: to, + CopySource: this.bucket + '/' + from, + }), + ) + await this.delete(from) + } + + async delete(filepath: string): Promise { + await this.client.send( + new DeleteObjectCommand({ Bucket: this.bucket, Key: filepath }), + ) + } + async createWriteStream(filepath: string): Promise { const pass = new PassThrough()