Skip to content

Commit

Permalink
S3 storage driver (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
csansoon authored Jun 18, 2024
1 parent 4f12d38 commit 12994ba
Showing 24 changed files with 596 additions and 484 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-swans-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latitude-data/server": patch
---

Added more information when materializing queries with the --debug option
5 changes: 5 additions & 0 deletions .changeset/hip-colts-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latitude-data/duckdb-connector": minor
---

Added new s3 options to the DuckDB source configuration, to be able to access S3 files from DuckDB queries. Also, upgraded DuckDB version from 0.9.2 to 0.10.0 to support this new feature.
5 changes: 5 additions & 0 deletions .changeset/hungry-adults-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latitude-data/display_table": patch
---

Added new debug heap measurement on the bottom right corner
5 changes: 5 additions & 0 deletions .changeset/ten-masks-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latitude-data/source-manager": patch
---

Added URL to returned MaterializationInfo when materializing queries
5 changes: 5 additions & 0 deletions .changeset/tender-years-rhyme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latitude-data/storage-driver": minor
---

Added new S3 driver
5 changes: 5 additions & 0 deletions .changeset/wise-lions-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latitude-data/cli": minor
---

Updated schema to allow new S3 storage configuration.
32 changes: 24 additions & 8 deletions apps/server/scripts/materialize_queries/index.ts
Original file line number Diff line number Diff line change
@@ -54,16 +54,17 @@ function humanizeFileSize(bytes: number) {
}

function humanizeTime(time: number) {
if (time < 60) return `${time} ms`
if (time < 60) return `${time.toFixed(3)} ms`
const seconds = time / 1000
const minutes = seconds / 60
const hours = minutes / 60
if (time < 60 * 1000) return `${(time / 1000).toFixed(2)} s`
if (time < 60 * 60 * 1000)
return `${minutes.toFixed(0)}:${(seconds % 60)
.toString()
.padStart(2, '0')} min`
return `${hours.toFixed(0)}:${(minutes % 60).toString().padStart(2, '0')} h`
if (time < 60 * 60 * 1000) {
const fixedMinutes = minutes.toFixed()
const fixedSeconds = (seconds % 60).toFixed().padStart(2, '0')
return `${fixedMinutes}:${fixedSeconds} min`
}
return `${hours.toFixed()}:${(minutes % 60).toFixed().padStart(2, '0')} h`
}

function removeIndex<Key extends string, T extends Record<Key, unknown>>(
@@ -79,6 +80,7 @@ function removeIndex<Key extends string, T extends Record<Key, unknown>>(

function successMaterializationToTable(
materializations: MaterializationInfo[],
debug: boolean,
) {
const table = materializations
.filter((info) => info.cached || info.success)
@@ -87,6 +89,7 @@ function successMaterializationToTable(
return {
queryPath: info.queryPath,
cached: true,
...(debug ? { url: info.url } : {}),
rows: undefined,
fileSize: undefined,
time: undefined,
@@ -96,6 +99,7 @@ function successMaterializationToTable(
return {
queryPath: info.queryPath,
cached: false,
...(debug ? { url: info.url } : {}),
rows: info.rows,
fileSize: humanizeFileSize(info.fileSize),
time: humanizeTime(info.time),
@@ -126,12 +130,18 @@ async function materializeQueries({
let spinner = ora().start('Starting materialization process')
let currentText = ''
let debugMessage = ''
let maxHeap = 0
let totalHeap = 0
let heapCount = 0
const updateProgress = () => {
spinner.text = [currentText, debugMessage].filter(Boolean).join(' - ')
}
const measureHeap = () => {
if (!debug) return
const heap = process.memoryUsage().heapUsed
if (heap > maxHeap) maxHeap = heap
totalHeap += heap
heapCount++
debugMessage = `Memory: ${humanizeFileSize(heap)}`
updateProgress()
}
@@ -173,7 +183,7 @@ async function materializeQueries({

if (result.materializations.some((info) => info.cached || info.success)) {
console.log('Materialized queries:')
console.table(successMaterializationToTable(result.materializations))
console.table(successMaterializationToTable(result.materializations, debug))
}

if (result.materializations.some((info) => !info.cached && !info.success)) {
@@ -183,7 +193,13 @@ async function materializeQueries({

console.log('Summary:')
console.table({
'Total time': `⏰ ${humanizeTime(result.totalTime)}`,
'Total time': `⏱️ ${humanizeTime(result.totalTime)}`,
...(debug
? {
'Max heap': `🔥 ${humanizeFileSize(maxHeap)}`,
'Mean heap': `🌡️ ${humanizeFileSize(totalHeap / heapCount)}`,
}
: {}),
'Materialized queries': result.materializations.filter(
(info) => info.cached || info.success,
).length,
2 changes: 1 addition & 1 deletion apps/server/vite.config.ts
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ export default defineConfig({
},
build: {
rollupOptions: {
external: ['@latitude-data/source-manager', 'path'],
external: ['@latitude-data/source-manager', 'path', 'child_process'],
},
},
})
2 changes: 2 additions & 0 deletions packages/cli/core/src/commands/run/index.ts
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import setup from '$src/lib/decorators/setup'
import spawn from '$src/lib/spawn'
import syncQueries from '$src/lib/sync/syncQueries'
import tracked from '$src/lib/decorators/tracked'
import syncLatitudeJson from '$src/lib/sync/syncLatitudeJson'

async function run(
queryName: string,
@@ -19,6 +20,7 @@ async function run(
typeof opts?.param === 'string' ? [opts.param] : opts?.param ?? []

await syncQueries({ watch: true })
await syncLatitudeJson({ watch })

const args = [
'run',
24 changes: 24 additions & 0 deletions packages/cli/core/src/lib/v1.schema.json
Original file line number Diff line number Diff line change
@@ -36,6 +36,30 @@
"description": "Path to the storage directory."
}
}
},
{
"properties": {
"type": {
"type": "string",
"enum": ["s3"]
},
"bucket": {
"type": "string",
"description": "S3 bucket name."
},
"region": {
"type": "string",
"description": "S3 region."
},
"accessKeyId": {
"type": "string",
"description": "S3 access key ID."
},
"secretAccessKey": {
"type": "string",
"description": "S3 secret access key."
}
}
}
]
}
37 changes: 36 additions & 1 deletion packages/cli/display_table/src/app.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import React, { useCallback, useEffect, useState } from 'react'
import React, { useCallback, useEffect, useMemo, useState } from 'react'
import type QueryResult from '@latitude-data/query_result'
import { Box, Spacer, Text, useInput } from 'ink'
import type {
@@ -11,6 +11,7 @@ import Table from './components/Table.js'
import ErrorDisplay from './components/ErrorDisplay.js'
import CompiledQueryDisplay from './components/CompiledQueryDisplay.js'
import { formatElapsedTime } from './utils.js'
import v8 from 'v8'

export type AppProps = {
queriesDir: string
@@ -21,6 +22,16 @@ export type AppProps = {
debug?: boolean
}

function humanizeFileSize(bytes: number) {
const kb = 1024
const mb = kb * 1024
const gb = mb * 1024
if (bytes < kb) return `${bytes} B`
if (bytes < mb) return `${(bytes / kb).toFixed(2)} KB`
if (bytes < gb) return `${(bytes / mb).toFixed(2)} MB`
return `${(bytes / gb).toFixed(2)} GB`
}

export default function App({
queriesDir,
sourceManager,
@@ -34,10 +45,30 @@ export default function App({
const [error, setError] = useState<Error | null>(null)
const [isLoading, setIsLoading] = useState(true)
const [showCompiledQuery, setShowCompiledQuery] = useState(false)
const [heap, setHeap] = useState(0)
const [availableMemory, setAvailableMemory] = useState(0)
const heapTextColor = useMemo(() => {
const usedMemoryPercentage = heap / availableMemory
if (usedMemoryPercentage > 0.8) return 'red'
if (usedMemoryPercentage > 0.5) return 'yellow'
return 'green'
}, [heap, availableMemory])

const [startTime, setStartTime] = useState<number>(Date.now())
const [elapsedTime, setElapsedTime] = useState<number>(0)

useEffect(() => {
const heapLimit = v8.getHeapStatistics().heap_size_limit
setAvailableMemory(heapLimit)

const interval = setInterval(() => {
const currentHeap = v8.getHeapStatistics().used_heap_size
setHeap(currentHeap)
}, 1000)

return () => clearInterval(interval)
}, [])

const refresh = useCallback(async () => {
setIsLoading(true)
setStartTime(Date.now())
@@ -133,6 +164,10 @@ export default function App({
</Text>
)}
<Text inverse>Esc / Ctrl+C / Q to exit </Text>
<Spacer />
<Text color={heapTextColor}>
{humanizeFileSize(heap)} / {humanizeFileSize(availableMemory)}
</Text>
</Box>
</Box>
)
2 changes: 1 addition & 1 deletion packages/connectors/duckdb/package.json
Original file line number Diff line number Diff line change
@@ -32,6 +32,6 @@
"dependencies": {
"@latitude-data/query_result": "workspace:^",
"@latitude-data/source-manager": "workspace:^",
"duckdb-async": "^0.9.2"
"duckdb-async": "^0.10.0"
}
}
32 changes: 31 additions & 1 deletion packages/connectors/duckdb/src/index.ts
Original file line number Diff line number Diff line change
@@ -20,19 +20,28 @@ import {
TableData,
} from 'duckdb-async'

type S3Credentials = {
accessKeyId: string
secretAccessKey: string
region: string
}

export type ConnectionParams = {
url?: string
s3?: S3Credentials
}

export class MaterializedFileNotFoundError extends Error {}

export default class DuckdbConnector extends BaseConnector<ConnectionParams> {
private client?: Database
private url: string
private s3?: S3Credentials

constructor(options: ConnectorOptions<ConnectionParams>) {
super(options)
this.url = options.connectionParams.url || ':memory:'
this.s3 = options.connectionParams.s3
}

end(): Promise<void> {
@@ -109,7 +118,9 @@ export default class DuckdbConnector extends BaseConnector<ConnectionParams> {
const materializedStorage = this.source.manager.materializedStorage
const materializedPath =
await this.source.manager.localMaterializationPath(fullSubQueryPath)
if (!(await materializedStorage.exists(materializedPath))) {
const materializedFileExists =
await materializedStorage.exists(materializedPath)
if (!materializedFileExists) {
throw new MaterializedFileNotFoundError(
`Query '${fullSubQueryPath}' is not materialized. Run 'latitude materialize' to materialize it.`,
)
@@ -134,6 +145,25 @@ export default class DuckdbConnector extends BaseConnector<ConnectionParams> {
this.url,
this.url === ':memory:' ? OPEN_READWRITE : OPEN_READONLY,
)
if (this.s3) {
await this.connectS3(this.s3)
}
}

private async connectS3(credentials: ConnectionParams['s3']): Promise<void> {
if (!credentials) return
const { accessKeyId, secretAccessKey, region } = credentials
const query = `
INSTALL httpfs;
LOAD httpfs;
CREATE SECRET (
TYPE S3,
KEY_ID ${JSON.stringify(accessKeyId)},
SECRET ${JSON.stringify(secretAccessKey)},
REGION ${JSON.stringify(region)}
);`

await this.client!.exec(query)
}

async runQuery(compiledQuery: CompiledQuery): Promise<QueryResult> {
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ describe('findAndMaterializeQueries', () => {
materializations: [
{
queryPath: 'query.sql',
url: expect.any(String),
cached: false,
success: true,
fileSize: expect.any(Number),
@@ -65,6 +66,7 @@ describe('findAndMaterializeQueries', () => {
},
{
queryPath: 'subdir/query2.sql',
url: expect.any(String),
cached: false,
success: true,
fileSize: expect.any(Number),
@@ -95,6 +97,7 @@ describe('findAndMaterializeQueries', () => {
materializations: [
{
queryPath: 'subdir/query2.sql',
url: expect.any(String),
cached: false,
success: true,
fileSize: expect.any(Number),
@@ -134,6 +137,7 @@ describe('findAndMaterializeQueries', () => {
},
{
queryPath: 'subdir/query2',
url: expect.any(String),
cached: false,
success: true,
fileSize: expect.any(Number),
@@ -190,6 +194,7 @@ describe('findAndMaterializeQueries', () => {
materializations: [
{
queryPath: 'query.sql',
url: expect.any(String),
cached: true,
},
],
@@ -214,6 +219,7 @@ describe('findAndMaterializeQueries', () => {
materializations: [
{
queryPath: 'query.sql',
url: expect.any(String),
cached: true,
},
],
Loading

0 comments on commit 12994ba

Please sign in to comment.