Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Persist retried hog function logs #29193

Merged
merged 29 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2008,7 +2008,9 @@ const api = {
data: {
configuration: Record<string, any>
mock_async_functions: boolean
globals: any
globals?: any
clickhouse_event?: any
invocation_id?: string
}
): Promise<HogFunctionTestInvocationResult> {
return await new ApiRequest().hogFunction(id).withAction('invocations').create({ data })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,47 @@ import { delay } from 'lib/utils'

import { HogQLQuery, NodeKind } from '~/queries/schema/schema-general'
import { hogql } from '~/queries/utils'
import { HogFunctionInvocationGlobals, LogEntryLevel } from '~/types'
import { LogEntryLevel } from '~/types'

import type { hogFunctionLogsLogicType } from './hogFunctionLogsLogicType'
import { GroupedLogEntry, logsViewerLogic, LogsViewerLogicProps } from './logsViewerLogic'

export type RetryInvocationState = 'pending' | 'success' | 'failure'

const loadEventGlobals = async (eventId: string): Promise<Pick<HogFunctionInvocationGlobals, 'event' | 'person'>> => {
// TODO: Do we have a better type?
const loadClickhouseEvent = async (eventId: string): Promise<any> => {
const query: HogQLQuery = {
kind: NodeKind.HogQLQuery,
query: hogql`
select uuid, distinct_id, event, timestamp, properties, person.id, person.properties
select uuid, distinct_id, event, timestamp, properties, elements_chain, person.id, person.properties, person.created_at
from events
where uuid = ${eventId}
limit 1`,
}

const response = await api.query(query, undefined, undefined, true)
const [uuid, distinct_id, event, timestamp, properties, person_id, person_properties] = response.results[0]
const [
uuid,
distinct_id,
event,
timestamp,
properties,
elements_chain,
person_id,
person_properties,
person_created_at,
] = response.results[0]

return {
event: {
uuid,
distinct_id,
event,
properties: JSON.parse(properties),
timestamp,
url: '',
elements_chain: '',
},
person: {
id: person_id,
properties: JSON.parse(person_properties),
url: '',
name: '',
},
uuid,
event,
distinct_id,
person_id,
timestamp,
properties,
elements_chain,
person_created_at,
person_properties,
}
}

Expand Down Expand Up @@ -93,12 +98,16 @@ export const hogFunctionLogsLogic = kea<hogFunctionLogsLogicType>([
await delay(1000)

try {
const globals = await loadEventGlobals(eventId)
const clickhouseEvent = await loadClickhouseEvent(eventId)

const res = await api.hogFunctions.createTestInvocation(props.sourceId, {
globals,
clickhouse_event: clickhouseEvent,
mock_async_functions: false,
configuration: {},
configuration: {
// For retries we don't care about filters
filters: {},
},
invocation_id: groupedLogEntry.instanceId,
})

const existingLogGroup = values.logs.find((x) => x.instanceId === groupedLogEntry.instanceId)
Expand Down
34 changes: 30 additions & 4 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import { DateTime } from 'luxon'

import { Hub, PluginServerService } from '../types'
import { status } from '../utils/status'
import { delay, UUIDT } from '../utils/utils'
import { delay, UUID, UUIDT } from '../utils/utils'
import { HogTransformerService } from './hog-transformations/hog-transformer.service'
import { createCdpRedisPool } from './redis'
import { FetchExecutorService } from './services/fetch-executor.service'
import { HogExecutorService, MAX_ASYNC_STEPS } from './services/hog-executor.service'
import { HogFunctionManagerService } from './services/hog-function-manager.service'
import { HogFunctionMonitoringService } from './services/hog-function-monitoring.service'
import { HogWatcherService, HogWatcherState } from './services/hog-watcher.service'
import { HOG_FUNCTION_TEMPLATES } from './templates'
import {
Expand All @@ -19,20 +20,23 @@ import {
HogFunctionType,
LogEntry,
} from './types'
import { convertToHogFunctionInvocationGlobals } from './utils'

export class CdpApi {
private hogExecutor: HogExecutorService
private hogFunctionManager: HogFunctionManagerService
private fetchExecutor: FetchExecutorService
private hogWatcher: HogWatcherService
private hogTransformer: HogTransformerService
private hogFunctionMonitoringService: HogFunctionMonitoringService

constructor(private hub: Hub) {
this.hogFunctionManager = new HogFunctionManagerService(hub)
this.hogExecutor = new HogExecutorService(hub, this.hogFunctionManager)
this.fetchExecutor = new FetchExecutorService(hub)
this.hogWatcher = new HogWatcherService(hub, createCdpRedisPool(hub))
this.hogTransformer = new HogTransformerService(hub)
this.hogFunctionMonitoringService = new HogFunctionMonitoringService(hub)
}

public get service(): PluginServerService {
Expand Down Expand Up @@ -114,12 +118,16 @@ export class CdpApi {
private postFunctionInvocation = async (req: express.Request, res: express.Response): Promise<any> => {
try {
const { id, team_id } = req.params
const { globals, mock_async_functions, configuration } = req.body
const { clickhouse_event, mock_async_functions, configuration, invocation_id } = req.body
let { globals } = req.body

status.info('⚡️', 'Received invocation', { id, team_id, body: req.body })

if (!globals || !globals.event) {
res.status(400).json({ error: 'Missing event' })
const invocationID = invocation_id ?? new UUIDT().toString()

// Check the invocationId is a valid UUID
if (!UUID.validateString(invocationID)) {
res.status(400).json({ error: 'Invalid invocation ID' })
return
}

Expand All @@ -134,6 +142,19 @@ export class CdpApi {
return res.status(404).json({ error: 'Team not found' })
}

globals = clickhouse_event
? convertToHogFunctionInvocationGlobals(
clickhouse_event,
team,
this.hub.SITE_URL ?? 'http://localhost:8000'
)
: globals

if (!globals || !globals.event) {
res.status(400).json({ error: 'Missing event' })
return
}

// NOTE: We allow the hog function to be null if it is a "new" hog function
// The real security happens at the django layer so this is more of a sanity check
if (!isNewFunction && (!hogFunction || hogFunction.team_id !== team.id)) {
Expand Down Expand Up @@ -189,6 +210,7 @@ export class CdpApi {
for (const _invocation of invocations) {
let count = 0
let invocation = _invocation
invocation.id = invocationID

while (!lastResponse || !lastResponse.finished) {
if (count > MAX_ASYNC_STEPS * 2) {
Expand Down Expand Up @@ -240,6 +262,8 @@ export class CdpApi {
if (response.error) {
errors.push(response.error)
}

await this.hogFunctionMonitoringService.processInvocationResults([response])
}
}
} else if (compoundConfiguration.type === 'transformation') {
Expand Down Expand Up @@ -276,6 +300,8 @@ export class CdpApi {
} catch (e) {
console.error(e)
res.status(500).json({ errors: [e.message] })
} finally {
await this.hogFunctionMonitoringService.produceQueuedMessages()
}
}
}
137 changes: 5 additions & 132 deletions plugin-server/src/cdp/consumers/cdp-base.consumer.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,22 @@
import { Message } from 'node-rdkafka'
import { Counter, Gauge, Histogram } from 'prom-client'

import { KAFKA_APP_METRICS_2, KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_LOG_ENTRIES } from '../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../kafka/config'
import { KafkaProducerWrapper } from '../../kafka/producer'
import { addSentryBreadcrumbsEventListeners } from '../../main/ingestion-queues/kafka-metrics'
import { runInstrumentedFunction } from '../../main/utils'
import { AppMetric2Type, Hub, PluginServerService, TeamId, TimestampFormat } from '../../types'
import { safeClickhouseString } from '../../utils/db/utils'
import { Hub, PluginServerService, TeamId } from '../../types'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUIDT } from '../../utils/utils'
import { CdpRedis, createCdpRedisPool } from '../redis'
import { FetchExecutorService } from '../services/fetch-executor.service'
import { GroupsManagerService } from '../services/groups-manager.service'
import { HogExecutorService } from '../services/hog-executor.service'
import { HogFunctionManagerService } from '../services/hog-function-manager.service'
import { HogFunctionMonitoringService } from '../services/hog-function-monitoring.service'
import { HogMaskerService } from '../services/hog-masker.service'
import { HogWatcherService } from '../services/hog-watcher.service'
import {
HogFunctionAppMetric,
HogFunctionInvocationResult,
HogFunctionLogEntrySerialized,
HogFunctionMessageToProduce,
HogFunctionType,
HogFunctionTypeType,
} from '../types'
import { fixLogDeduplication } from '../utils'
import { convertToCaptureEvent } from '../utils'
import { HogFunctionTypeType } from '../types'

// Metrics that were at the top of the file
export const histogramKafkaBatchSize = new Histogram({
Expand Down Expand Up @@ -80,7 +69,7 @@ export abstract class CdpConsumerBase {
hogMasker: HogMaskerService
groupsManager: GroupsManagerService
isStopping = false
messagesToProduce: HogFunctionMessageToProduce[] = []
hogFunctionMonitoringService: HogFunctionMonitoringService
redis: CdpRedis

protected hogTypes: HogFunctionTypeType[] = []
Expand All @@ -97,6 +86,7 @@ export abstract class CdpConsumerBase {
this.hogExecutor = new HogExecutorService(this.hub, this.hogFunctionManager)
this.fetchExecutor = new FetchExecutorService(this.hub)
this.groupsManager = new GroupsManagerService(this.hub)
this.hogFunctionMonitoringService = new HogFunctionMonitoringService(this.hub)
}

public get service(): PluginServerService {
Expand Down Expand Up @@ -127,123 +117,6 @@ export abstract class CdpConsumerBase {
return results
}

protected async produceQueuedMessages() {
const messages = [...this.messagesToProduce]
this.messagesToProduce = []

await this.kafkaProducer!.queueMessages(
messages.map((x) => ({
topic: x.topic,
messages: [
{
value: safeClickhouseString(JSON.stringify(x.value)),
key: x.key,
},
],
}))
).catch((reason) => {
status.error('⚠️', `failed to produce message: ${reason}`)
})
}

protected produceAppMetric(metric: HogFunctionAppMetric) {
const appMetric: AppMetric2Type = {
app_source: 'hog_function',
...metric,
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}

this.messagesToProduce.push({
topic: KAFKA_APP_METRICS_2,
value: appMetric,
key: appMetric.app_source_id,
})

counterFunctionInvocation.inc({ outcome: appMetric.metric_name }, appMetric.count)
}

protected produceLogs(result: HogFunctionInvocationResult) {
const logs = fixLogDeduplication(
result.logs.map((logEntry) => ({
...logEntry,
team_id: result.invocation.hogFunction.team_id,
log_source: 'hog_function',
log_source_id: result.invocation.hogFunction.id,
instance_id: result.invocation.id,
}))
)

logs.forEach((logEntry) => {
this.messagesToProduce.push({
topic: KAFKA_LOG_ENTRIES,
value: logEntry,
key: logEntry.instance_id,
})
})
}

protected logFilteringError(item: HogFunctionType, error: string) {
const logEntry: HogFunctionLogEntrySerialized = {
team_id: item.team_id,
log_source: 'hog_function',
log_source_id: item.id,
instance_id: new UUIDT().toString(), // random UUID, like it would be for an invocation
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
level: 'error',
message: error,
}

this.messagesToProduce.push({
topic: KAFKA_LOG_ENTRIES,
value: logEntry,
key: logEntry.instance_id,
})
}

protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise<void> {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.produceResults`,
func: async () => {
await this.hogWatcher.observeResults(results)

await Promise.all(
results.map(async (result) => {
if (result.finished || result.error) {
this.produceAppMetric({
team_id: result.invocation.teamId,
app_source_id: result.invocation.hogFunction.id,
metric_kind: result.error ? 'failure' : 'success',
metric_name: result.error ? 'failed' : 'succeeded',
count: 1,
})
}

this.produceLogs(result)

// Clear the logs so we don't pass them on to the next invocation
result.logs = []

// PostHog capture events
const capturedEvents = result.capturedPostHogEvents
delete result.capturedPostHogEvents

for (const event of capturedEvents ?? []) {
const team = await this.hub.teamManager.fetchTeam(event.team_id)
if (!team) {
continue
}
this.messagesToProduce.push({
topic: KAFKA_EVENTS_PLUGIN_INGESTION,
value: convertToCaptureEvent(event, team),
key: `${team!.api_token}:${event.distinct_id}`,
})
}
})
)
},
})
}

protected async startKafkaConsumer(options: {
topic: string
groupId: string
Expand Down
Loading
Loading