Skip to content

Commit

Permalink
Some adjustments for Storage Driver and SourceManager to run on a nodejs
Browse files Browse the repository at this point in the history
server
We want to use some methods from these 2 packages in our Latitude cloud
and we have to make it work
  • Loading branch information
andresgutgon committed Jun 19, 2024
1 parent 1b25797 commit 4c129f2
Show file tree
Hide file tree
Showing 18 changed files with 189 additions and 55 deletions.
6 changes: 6 additions & 0 deletions .changeset/wet-bikes-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@latitude-data/source-manager": patch
"@latitude-data/storage-driver": patch
---

Dry resolve secrets helper function
11 changes: 9 additions & 2 deletions apps/server/scripts.rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ const materializeScript = {
format: 'esm',
sourcemap: true,
},
external: ['fs', 'ora', '@latitude-data/source-manager', 'node:util'],
external: [
'fs',
'ora',
'@latitude-data/storage-driver',
'@latitude-data/source-manager',
'node:util',
],
...common,
}

Expand All @@ -25,11 +31,12 @@ const runScript = {
sourcemap: true,
},
external: [
'@latitude-data/display_table',
'node:util',
'path',
'@latitude-data/source-manager',
'@latitude-data/storage-driver',
'@latitude-data/custom_types',
'@latitude-data/display_table',
'fs',
],
...common,
Expand Down
4 changes: 3 additions & 1 deletion apps/server/scripts/materialize_queries/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
FailedMaterializationInfo,
findAndMaterializeQueries,
MaterializationInfo,
} from '@latitude-data/source-manager'
Expand Down Expand Up @@ -113,7 +114,8 @@ function successMaterializationToTable(
function failedMaterializationToTable(materializations: MaterializationInfo[]) {
const table = materializations
.filter((info) => info.cached === false && info.success === false)
.map((info) => {
.map((i) => {
const info = i as FailedMaterializationInfo
return {
queryPath: info.queryPath,
error: info.error.message,
Expand Down
15 changes: 7 additions & 8 deletions apps/server/src/lib/server/storageDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
StorageType,
getStorageDriver,
} from '@latitude-data/storage-driver'
import { resolveSecrets } from '@latitude-data/source-manager'

const DEFAULT_STORAGE_CONFIG = {
type: StorageType.disk,
Expand All @@ -17,14 +18,12 @@ function readStorageConfig(): StorageDriverConfig {
}

const file = fs.readFileSync(APP_CONFIG_PATH, 'utf8')
try {
const latitudeJson = JSON.parse(file)
const storageConfig = latitudeJson?.storage ?? {}
if (!storageConfig.type) return DEFAULT_STORAGE_CONFIG
return storageConfig
} catch (e) {
return DEFAULT_STORAGE_CONFIG
}
const latitudeJson = resolveSecrets({ unresolvedSecrets: JSON.parse(file) })
const storageConfig = latitudeJson?.storage

if (!storageConfig) return DEFAULT_STORAGE_CONFIG

return storageConfig
}

const storageDriver = getStorageDriver(readStorageConfig())
Expand Down
9 changes: 8 additions & 1 deletion apps/server/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ export default defineConfig({
},
build: {
rollupOptions: {
external: ['@latitude-data/source-manager', 'path', 'child_process'],
external: [
'path',
'child_process',
'@latitude-data/source-manager',
'@latitude-data/storage-driver',
'@latitude-data/display_table',
'@latitude-data/custome_types',
],
},
},
})
1 change: 1 addition & 0 deletions packages/cli/display_table/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export default {
external: [
'react',
'ink',
'v8',
'ink-spinner',
'chokidar',
'@latitude-data/source-manager',
Expand Down
1 change: 1 addition & 0 deletions packages/source_manager/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export default [
'fs',
'path',
'crypto',
'stream',
'uuid',
'dotenv/config',
'@latitude-data/sql-compiler',
Expand Down
1 change: 1 addition & 0 deletions packages/source_manager/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from './types'
export { default as SourceManager } from './manager'
export * from './source'
export * from './baseConnector'
export * from './utils'
export { CONNECTOR_PACKAGES } from './baseConnector/connectorFactory'
export { default as TestConnectorInternal } from './testConnector'
export {
Expand Down
5 changes: 4 additions & 1 deletion packages/source_manager/src/manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { createHash } from 'crypto'
import { ParquetWriter } from '@dsnp/parquetjs'
import { WriteStreamMinimal } from '@dsnp/parquetjs/dist/lib/util'
import { buildParquetSchema } from './parquetUtils'
import { Writable } from 'stream'

const MATERIALIZED_DIR_IN_STORAGE = 'materialized'

Expand Down Expand Up @@ -201,7 +202,9 @@ export default class SourceManager {
const startTime = performance.now()
const compiled = await source.compileQuery({ queryPath, params: {} })

const stream = await this.materializedStorage.createWriteStream(filename)
const stream = (await this.materializedStorage.createWriteStream(
filename,
)) as Writable

let writer: ParquetWriter
const ROW_GROUP_SIZE = 4096 // How many rows are in the ParquetWriter file buffer at a time
Expand Down
16 changes: 4 additions & 12 deletions packages/source_manager/src/source/readConfig.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import fs from 'fs'
import yaml from 'yaml'
import { SourceSchema, SourceFileNotFoundError } from '@/types'
import { resolveSecrets } from '@/utils'

export class InvalidSourceConfigError extends Error {
constructor(message: string) {
Expand All @@ -15,18 +16,9 @@ export default function readSourceConfig(sourcePath: string): SourceSchema {
}

const file = fs.readFileSync(sourcePath, 'utf8')
const config = yaml.parse(file, (_, value) => {
// if key starts with 'LATITUDE__', replace it with the environment variable
if (typeof value === 'string' && value.startsWith('LATITUDE__')) {
if (process.env[value]) return process.env[value]

throw new Error(`
Invalid configuration. Environment variable ${value} was not found in the environment. You can review how to set up secret source credentials in the documentation: https://docs.latitude.so/sources/credentials
`)
} else {
return value
}
})
const config = resolveSecrets({
unresolvedSecrets: yaml.parse(file),
}) as unknown as SourceSchema

// Validation requirements
if (!config?.type) {
Expand Down
2 changes: 1 addition & 1 deletion packages/source_manager/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ interface SuccessMaterializationInfo extends IMissMaterializationInfo {
time: number
}

interface FailedMaterializationInfo extends IMissMaterializationInfo {
export interface FailedMaterializationInfo extends IMissMaterializationInfo {
cached: false
success: false
error: Error
Expand Down
54 changes: 54 additions & 0 deletions packages/source_manager/src/utils/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { describe, expect, it } from 'vitest'
import { resolveSecrets } from './index'

describe('resolveSecrets', () => {
it('should resolve secrets', () => {
const config = {
secret1: 'LATITUDE__SECRET1',
secret2: 'LATITUDE__SECRET2',
}

// eslint-disable-next-line turbo/no-undeclared-env-vars
process.env['LATITUDE__SECRET1'] = 'supersecret1'
// eslint-disable-next-line turbo/no-undeclared-env-vars
process.env['LATITUDE__SECRET2'] = 'supersecret2'

const resolvedSecrets = resolveSecrets({ unresolvedSecrets: config })

expect(resolvedSecrets.secret1).toBe('supersecret1')
expect(resolvedSecrets.secret2).toBe('supersecret2')
})

it('should resolve nested secrets', () => {
const config = {
secret: {
secret: 'LATITUDE__SECRET1',
notASecret: 33,
},
}
// eslint-disable-next-line turbo/no-undeclared-env-vars
process.env['LATITUDE__SECRET1'] = 'supersecret1'
const resolvedSecrets = resolveSecrets({ unresolvedSecrets: config })
const firstLevel = resolvedSecrets.secret as {
secret: string
notASecret: number
}

expect(firstLevel.secret).toBe('supersecret1')
expect(firstLevel.notASecret).toBe(33)
})

it('should throw error when environment variable is not found', () => {
const config = {
secret3: 'LATITUDE__SECRET3',
}

expect(() => {
resolveSecrets({ unresolvedSecrets: config })
}).toThrowError(
new Error(
`Invalid configuration. Environment variable LATITUDE__SECRET3 was not found in the environment.`,
),
)
})
})
31 changes: 31 additions & 0 deletions packages/source_manager/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export function resolveSecrets({
unresolvedSecrets,
}: {
unresolvedSecrets: Record<string, unknown>
}) {
return Object.entries(unresolvedSecrets).reduce(
(acc, [key, value]) => {
if (typeof value === 'object') {
acc[key] = resolveSecrets({
unresolvedSecrets: value as Record<string, unknown>,
})
return acc
}

if (typeof value === 'string' && value.startsWith('LATITUDE__')) {
if (process.env[value]) {
acc[key] = process.env[value]
return acc
}

throw new Error(
`Invalid configuration. Environment variable ${value} was not found in the environment.`,
)
}

acc[key] = value
return acc
},
{} as Record<string, unknown>,
)
}
3 changes: 3 additions & 0 deletions packages/storage_driver/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ export default {
],
external: [
'fs',
'@aws-sdk/client-s3',
'@aws-sdk/lib-storage',
'stream',
'path',
],
}
8 changes: 7 additions & 1 deletion packages/storage_driver/src/drivers/base.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { FileStat } from '$/types'
import { FileStat, StorageType } from '$/types'
import { Writable } from 'stream'

export abstract class StorageDriver {
constructor() {}

public abstract type: StorageType

get isS3Driver(): boolean {
return this.type === StorageType.s3
}

abstract resolveUrl(path: string): Promise<string>

abstract listFiles(path: string, recursive: boolean): Promise<string[]>
Expand Down
3 changes: 2 additions & 1 deletion packages/storage_driver/src/drivers/disk/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FileStat } from '$/types'
import { FileStat, StorageType } from '$/types'
import { StorageDriver } from '$/drivers/base'
import fs from 'fs'
import path from 'path'
Expand All @@ -9,6 +9,7 @@ export type DiskDriverConfig = {
}

export class DiskDriver extends StorageDriver {
public type = StorageType.disk
private root: string

constructor(config: DiskDriverConfig) {
Expand Down
3 changes: 2 additions & 1 deletion packages/storage_driver/src/drivers/s3/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FileStat } from '$/types'
import { FileStat, StorageType } from '$/types'
import { StorageDriver } from '$/drivers/base'
import { Writable, Readable } from 'stream'
import {
Expand All @@ -20,6 +20,7 @@ export type S3DriverConfig = {
}

export class S3Driver extends StorageDriver {
public type = StorageType.s3
private client: S3Client
private bucket: string

Expand Down
Loading

0 comments on commit 4c129f2

Please sign in to comment.