Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Persist retried hog function logs #29193

Merged
merged 29 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,7 @@
"KAFKA_HOSTS": "localhost:9092",
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"PLUGIN_SERVER_MODE": "all-v2",
"HOG_TRANSFORMATIONS_ENABLED": "True",
"HOG_TRANSFORMATIONS_COMPARISON_PERCENTAGE": "1"
"HOG_HOOK_URL": "http://localhost:3300/hoghook"
},
"presentation": {
"group": "main"
Expand Down
4 changes: 3 additions & 1 deletion frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2008,7 +2008,9 @@ const api = {
data: {
configuration: Record<string, any>
mock_async_functions: boolean
globals: any
globals?: any
clickhouse_event?: any
invocation_id?: string
}
): Promise<HogFunctionTestInvocationResult> {
return await new ApiRequest().hogFunction(id).withAction('invocations').create({ data })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,46 @@ import { delay } from 'lib/utils'

import { HogQLQuery, NodeKind } from '~/queries/schema/schema-general'
import { hogql } from '~/queries/utils'
import { HogFunctionInvocationGlobals, LogEntryLevel } from '~/types'
import { LogEntryLevel } from '~/types'

import type { hogFunctionLogsLogicType } from './hogFunctionLogsLogicType'
import { GroupedLogEntry, logsViewerLogic, LogsViewerLogicProps } from './logsViewerLogic'

export type RetryInvocationState = 'pending' | 'success' | 'failure'

const loadEventGlobals = async (eventId: string): Promise<Pick<HogFunctionInvocationGlobals, 'event' | 'person'>> => {
const loadClickhouseEvent = async (eventId: string): Promise<any> => {
const query: HogQLQuery = {
kind: NodeKind.HogQLQuery,
query: hogql`
select uuid, distinct_id, event, timestamp, properties, person.id, person.properties
select uuid, distinct_id, event, timestamp, properties, elements_chain, person.id, person.properties, person.created_at
from events
where uuid = ${eventId}
limit 1`,
}

const response = await api.query(query, undefined, undefined, true)
const [uuid, distinct_id, event, timestamp, properties, person_id, person_properties] = response.results[0]
const [
uuid,
distinct_id,
event,
timestamp,
properties,
elements_chain,
person_id,
person_properties,
person_created_at,
] = response.results[0]

return {
event: {
uuid,
distinct_id,
event,
properties: JSON.parse(properties),
timestamp,
url: '',
elements_chain: '',
},
person: {
id: person_id,
properties: JSON.parse(person_properties),
url: '',
name: '',
},
uuid,
event,
distinct_id,
person_id,
timestamp,
properties,
elements_chain,
person_created_at,
person_properties,
}
}

Expand Down Expand Up @@ -93,12 +97,14 @@ export const hogFunctionLogsLogic = kea<hogFunctionLogsLogicType>([
await delay(1000)

try {
const globals = await loadEventGlobals(eventId)

const res = await api.hogFunctions.createTestInvocation(props.sourceId, {
globals,
clickhouse_event: await loadClickhouseEvent(eventId),
mock_async_functions: false,
configuration: {},
configuration: {
// For retries we don't care about filters
filters: {},
},
invocation_id: groupedLogEntry.instanceId,
})

const existingLogGroup = values.logs.find((x) => x.instanceId === groupedLogEntry.instanceId)
Expand Down
48 changes: 18 additions & 30 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,28 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
: null
const sharedCapabilities = !isTestEnv() ? { http: true } : {}

const singleProcessCapabilities: PluginServerCapabilities = {
mmdb: true,
ingestion: true,
ingestionOverflow: true,
ingestionHistorical: true,
eventsIngestionPipelines: true, // with null PluginServerMode we run all of them
pluginScheduledTasks: true,
processPluginJobs: true,
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED,
sessionRecordingBlobIngestionV2: true,
sessionRecordingBlobIngestionV2Overflow: config.SESSION_RECORDING_OVERFLOW_ENABLED,
appManagementSingleton: true,
preflightSchedules: true,
cdpProcessedEvents: true,
cdpInternalEvents: true,
cdpCyclotronWorker: true,
cdpCyclotronWorkerPlugins: true,
cdpApi: true,
syncInlinePlugins: true,
...sharedCapabilities,
}

switch (mode) {
case null:
return {
...singleProcessCapabilities,
}
case PluginServerMode.all_v2:
return {
...singleProcessCapabilities,
mmdb: true,
ingestionV2Combined: true,
pluginScheduledTasks: true,
processPluginJobs: true,
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED,
sessionRecordingBlobIngestionV2: true,
sessionRecordingBlobIngestionV2Overflow: config.SESSION_RECORDING_OVERFLOW_ENABLED,
appManagementSingleton: true,
preflightSchedules: true,
cdpProcessedEvents: true,
cdpInternalEvents: true,
cdpCyclotronWorker: true,
cdpCyclotronWorkerPlugins: true,
cdpApi: true,
syncInlinePlugins: true,
...sharedCapabilities,
}

case PluginServerMode.ingestion_v2:
Expand Down
34 changes: 30 additions & 4 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import { DateTime } from 'luxon'

import { Hub, PluginServerService } from '../types'
import { status } from '../utils/status'
import { delay, UUIDT } from '../utils/utils'
import { delay, UUID, UUIDT } from '../utils/utils'
import { HogTransformerService } from './hog-transformations/hog-transformer.service'
import { createCdpRedisPool } from './redis'
import { FetchExecutorService } from './services/fetch-executor.service'
import { HogExecutorService, MAX_ASYNC_STEPS } from './services/hog-executor.service'
import { HogFunctionManagerService } from './services/hog-function-manager.service'
import { HogFunctionMonitoringService } from './services/hog-function-monitoring.service'
import { HogWatcherService, HogWatcherState } from './services/hog-watcher.service'
import { HOG_FUNCTION_TEMPLATES } from './templates'
import {
Expand All @@ -19,20 +20,23 @@ import {
HogFunctionType,
LogEntry,
} from './types'
import { convertToHogFunctionInvocationGlobals } from './utils'

export class CdpApi {
private hogExecutor: HogExecutorService
private hogFunctionManager: HogFunctionManagerService
private fetchExecutor: FetchExecutorService
private hogWatcher: HogWatcherService
private hogTransformer: HogTransformerService
private hogFunctionMonitoringService: HogFunctionMonitoringService

constructor(private hub: Hub) {
this.hogFunctionManager = new HogFunctionManagerService(hub)
this.hogExecutor = new HogExecutorService(hub, this.hogFunctionManager)
this.fetchExecutor = new FetchExecutorService(hub)
this.hogWatcher = new HogWatcherService(hub, createCdpRedisPool(hub))
this.hogTransformer = new HogTransformerService(hub)
this.hogFunctionMonitoringService = new HogFunctionMonitoringService(hub)
}

public get service(): PluginServerService {
Expand Down Expand Up @@ -114,12 +118,16 @@ export class CdpApi {
private postFunctionInvocation = async (req: express.Request, res: express.Response): Promise<any> => {
try {
const { id, team_id } = req.params
const { globals, mock_async_functions, configuration } = req.body
const { clickhouse_event, mock_async_functions, configuration, invocation_id } = req.body
let { globals } = req.body

status.info('⚡️', 'Received invocation', { id, team_id, body: req.body })

if (!globals || !globals.event) {
res.status(400).json({ error: 'Missing event' })
const invocationID = invocation_id ?? new UUIDT().toString()

// Check the invocationId is a valid UUID
if (!UUID.validateString(invocationID)) {
res.status(400).json({ error: 'Invalid invocation ID' })
return
}

Expand All @@ -134,6 +142,19 @@ export class CdpApi {
return res.status(404).json({ error: 'Team not found' })
}

globals = clickhouse_event
? convertToHogFunctionInvocationGlobals(
clickhouse_event,
team,
this.hub.SITE_URL ?? 'http://localhost:8000'
)
: globals

if (!globals || !globals.event) {
res.status(400).json({ error: 'Missing event' })
return
}

// NOTE: We allow the hog function to be null if it is a "new" hog function
// The real security happens at the django layer so this is more of a sanity check
if (!isNewFunction && (!hogFunction || hogFunction.team_id !== team.id)) {
Expand Down Expand Up @@ -189,6 +210,7 @@ export class CdpApi {
for (const _invocation of invocations) {
let count = 0
let invocation = _invocation
invocation.id = invocationID

while (!lastResponse || !lastResponse.finished) {
if (count > MAX_ASYNC_STEPS * 2) {
Expand Down Expand Up @@ -240,6 +262,8 @@ export class CdpApi {
if (response.error) {
errors.push(response.error)
}

await this.hogFunctionMonitoringService.processInvocationResults([response])
}
}
} else if (compoundConfiguration.type === 'transformation') {
Expand Down Expand Up @@ -276,6 +300,8 @@ export class CdpApi {
} catch (e) {
console.error(e)
res.status(500).json({ errors: [e.message] })
} finally {
await this.hogFunctionMonitoringService.produceQueuedMessages()
}
}
}
Loading
Loading