Skip to content

Commit

Permalink
tmp parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
csansoon committed Jun 19, 2024
1 parent 12994ba commit e907c53
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .changeset/cuddly-baboons-help.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@latitude-data/source-manager": patch
"@latitude-data/storage-driver": patch
---

Parquet files will be stored in a temporal path while materialization is still in process. Only when it finishes, it will then be moved to the actual path.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { ConnectorType } from '@/types'
import findAndMaterializeQueries from './findAndMaterializeQueries'
import { getStorageDriver } from '@latitude-data/storage-driver'
import { v4 as uuidv4 } from 'uuid'
import path from 'path'
import fs from 'fs'

const QUERIES_DIR = 'queries'
const MATERIALIZED_DIR = 'storage'
Expand Down Expand Up @@ -225,4 +227,61 @@ describe('findAndMaterializeQueries', () => {
],
})
})

it('Does not remove older materialized files if new materialization fails', async () => {
mockFs({
[QUERIES_DIR]: {
'query.sql': materializableFailingSql(),
'source.yml': `type: ${ConnectorType.TestInternal}`,
},
[MATERIALIZED_DIR]: {},
})

const storage = getStorageDriver({
type: 'disk',
path: `/${MATERIALIZED_DIR}`,
})
const customManager = new SourceManager(`${QUERIES_DIR}`, { storage })

const materializedFile = await customManager.materializationUrl('query.sql')
const originalContent = 'PARQUET_QUERY_CONTENT'
fs.mkdirSync(path.dirname(materializedFile), { recursive: true })
fs.writeFileSync(materializedFile, originalContent)

await findAndMaterializeQueries({ sourceManager: customManager })

expect(fs.existsSync(materializedFile)).toBe(true)
expect(fs.readFileSync(materializedFile).toString()).toBe(originalContent)
})

it('Updates older materialized files if new materialization succeeds', async () => {
mockFs({
[QUERIES_DIR]: {
'query.sql': materializableSql(),
'source.yml': `type: ${ConnectorType.TestInternal}`,
},
[MATERIALIZED_DIR]: {},
})

const storage = getStorageDriver({
type: 'disk',
path: `/${MATERIALIZED_DIR}`,
})
const customManager = new SourceManager(`${QUERIES_DIR}`, { storage })

const materializedFile = await customManager.materializationUrl('query.sql')
const originalContent = 'PARQUET_QUERY_CONTENT'
fs.mkdirSync(path.dirname(materializedFile), { recursive: true })
fs.writeFileSync(materializedFile, originalContent)

await findAndMaterializeQueries({
sourceManager: customManager,
force: true,
})

expect(fs.existsSync(materializedFile)).toBe(true)
expect(fs.readFileSync(materializedFile).toString()).not.toBe(
originalContent,
)
})
})
13 changes: 10 additions & 3 deletions packages/source_manager/src/manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { createHash } from 'crypto'
import { ParquetWriter } from '@dsnp/parquetjs'
import { WriteStreamMinimal } from '@dsnp/parquetjs/dist/lib/util'
import { buildParquetSchema } from './parquetUtils'
import { v4 as uuidv4 } from 'uuid'

const MATERIALIZED_DIR_IN_STORAGE = 'materialized'

Expand Down Expand Up @@ -179,6 +180,8 @@ export default class SourceManager {
force?: boolean
batchSize?: number
}): Promise<MaterializationInfo> {
const tmpParquetFilepath = `${MATERIALIZED_DIR_IN_STORAGE}/.tmp/${uuidv4()}.parquet`

try {
const source = await this.loadFromQuery(queryPath)
const { config } = await source.getMetadataFromQuery(queryPath)
Expand All @@ -199,8 +202,8 @@ export default class SourceManager {

const startTime = performance.now()
const compiled = await source.compileQuery({ queryPath, params: {} })

const stream = await this.materializedStorage.createWriteStream(filename)
const stream =
await this.materializedStorage.createWriteStream(tmpParquetFilepath)

let writer: ParquetWriter
const ROW_GROUP_SIZE = 4096 // How many rows are in the ParquetWriter file buffer at a time
Expand Down Expand Up @@ -237,12 +240,14 @@ export default class SourceManager {
.catch(reject)
})

const endTime = performance.now()
await this.materializedStorage.move(tmpParquetFilepath, filename)

const fileSize = await this.materializedStorage
.stat(filename)
.then((stat: FileStat) => stat.size)

const endTime = performance.now()

return {
queryPath,
cached: false,
Expand All @@ -253,6 +258,8 @@ export default class SourceManager {
time: endTime - startTime,
}
} catch (error) {
this.materializedStorage.delete(tmpParquetFilepath)

return {
queryPath,
cached: false,
Expand Down
3 changes: 3 additions & 0 deletions packages/storage_driver/src/drivers/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ export abstract class StorageDriver {
encoding?: T,
): Promise<T extends undefined ? Buffer : string>

abstract move(from: string, to: string): Promise<void>
abstract delete(path: string): Promise<void>

abstract createWriteStream(path: string): Promise<Writable>
}
24 changes: 24 additions & 0 deletions packages/storage_driver/src/drivers/disk/DiskDriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,28 @@ describe('DiskDriver', () => {
const content = fs.readFileSync('/mocked/path/file.txt')
expect(content.toString()).toBe('hello world')
})

it('should move a file', async () => {
const fromPath = '/mocked/path/from.txt'
const toPath = '/mocked/path/to.txt'
mockFs({
'/mocked/path': {
'from.txt': 'content',
},
})
await driver.move(fromPath, toPath)
expect(fs.existsSync(toPath)).toBe(true)
expect(fs.existsSync(fromPath)).toBe(false)
})

it('should not move a file if it does not exist', async () => {
const fromPath = '/mocked/path/from.txt'
const toPath = '/mocked/path/to.txt'
mockFs({
'/mocked/path': {},
})
await driver.move(fromPath, toPath)
expect(fs.existsSync(toPath)).toBe(false)
expect(fs.existsSync(fromPath)).toBe(false)
})
})
16 changes: 16 additions & 0 deletions packages/storage_driver/src/drivers/disk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,20 @@ export class DiskDriver extends StorageDriver {
await this.createDirIfNotExists(filepath)
return fs.createWriteStream(await this.resolveUrl(filepath))
}

async move(from: string, to: string): Promise<void> {
const fromPath = await this.resolveUrl(from)
if (!fs.existsSync(fromPath)) return

await this.createDirIfNotExists(to)
const toPath = await this.resolveUrl(to)

fs.renameSync(fromPath, toPath)
}

async delete(filepath: string): Promise<void> {
const filePath = await this.resolveUrl(filepath)
if (!fs.existsSync(filePath)) return
fs.unlinkSync(filePath)
}
}
19 changes: 19 additions & 0 deletions packages/storage_driver/src/drivers/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
GetObjectCommand,
PutObjectCommand,
HeadObjectCommand,
CopyObjectCommand,
DeleteObjectCommand,
} from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import { PassThrough } from 'stream'
Expand Down Expand Up @@ -132,6 +134,23 @@ export class S3Driver extends StorageDriver {
return buffer as T extends undefined ? Buffer : string
}

async move(from: string, to: string): Promise<void> {
await this.client.send(
new CopyObjectCommand({
Bucket: this.bucket,
Key: to,
CopySource: this.bucket + '/' + from,
}),
)
await this.delete(from)
}

async delete(filepath: string): Promise<void> {
await this.client.send(
new DeleteObjectCommand({ Bucket: this.bucket, Key: filepath }),
)
}

async createWriteStream(filepath: string): Promise<Writable> {
const pass = new PassThrough()

Expand Down

0 comments on commit e907c53

Please sign in to comment.