From dc154302b5e6230408b06739716dd81c61b1326e Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Wed, 12 Feb 2025 15:28:55 -0800 Subject: [PATCH] feat(processing_engine): Allow async plugin execution. --- Cargo.lock | 3 + influxdb3/src/commands/create.rs | 5 +- influxdb3_catalog/src/serialize.rs | 9 +- influxdb3_client/Cargo.toml | 1 + influxdb3_client/src/lib.rs | 9 + influxdb3_processing_engine/Cargo.toml | 1 + influxdb3_processing_engine/src/lib.rs | 32 ++- influxdb3_processing_engine/src/manager.rs | 4 +- influxdb3_processing_engine/src/plugins.rs | 252 +++++++++++++-------- influxdb3_py_api/src/system_py.rs | 2 +- influxdb3_server/src/http.rs | 2 + influxdb3_types/Cargo.toml | 1 + influxdb3_types/src/http.rs | 5 +- influxdb3_wal/src/lib.rs | 7 + 14 files changed, 220 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab98e780d77..caae57f7406 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2919,6 +2919,7 @@ dependencies = [ "bytes", "hashbrown 0.15.2", "influxdb3_types", + "influxdb3_wal", "iox_query_params", "mockito", "reqwest 0.11.27", @@ -3007,6 +3008,7 @@ dependencies = [ "cron 0.15.0", "data_types", "datafusion_util", + "futures-util", "hashbrown 0.15.2", "humantime", "hyper 0.14.32", @@ -3205,6 +3207,7 @@ dependencies = [ "hashbrown 0.15.2", "hyper 0.14.32", "influxdb3_cache", + "influxdb3_wal", "iox_http", "iox_query_params", "serde", diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 31876a980ce..7b95884670a 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -229,6 +229,8 @@ pub struct TriggerConfig { /// Create trigger in disabled state #[clap(long)] disabled: bool, + #[clap(long)] + run_asynchronous: bool, /// Name for the new trigger trigger_name: String, } @@ -351,6 +353,7 @@ pub async fn command(config: Config) -> Result<(), Box> { trigger_specification, trigger_arguments, disabled, + run_asynchronous, }) => { let trigger_arguments: Option> = trigger_arguments.map(|a| { a.into_iter() @@ -358,7 +361,6 @@ pub async fn command(config: Config) -> Result<(), Box> { .collect::>() }); - //println!("does this work?"); match client .api_v3_configure_processing_engine_trigger_create( database_name, @@ -367,6 +369,7 @@ pub async fn command(config: Config) -> Result<(), Box> { trigger_specification.string_rep(), trigger_arguments, disabled, + run_asynchronous, ) .await { diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index 3ec4bcb2a06..5f8bea6c393 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -10,8 +10,10 @@ use influxdb3_id::ColumnId; use influxdb3_id::DbId; use influxdb3_id::SerdeVecMap; use influxdb3_id::TableId; -use influxdb3_wal::DistinctCacheDefinition; -use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef, PluginType, TriggerDefinition}; +use influxdb3_wal::{ + DistinctCacheDefinition, LastCacheDefinition, LastCacheValueColumnsDef, PluginType, + TriggerDefinition, TriggerFlag, +}; use schema::InfluxColumnType; use schema::InfluxFieldType; use schema::TIME_DATA_TIMEZONE; @@ -153,6 +155,7 @@ impl From for DatabaseSchema { trigger_name: trigger.trigger_name, plugin_filename: trigger.plugin_filename, trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(), + flags: trigger.flags, trigger_arguments: trigger.trigger_arguments, disabled: trigger.disabled, database_name: trigger.database_name, @@ -223,6 +226,7 @@ struct ProcessingEngineTriggerSnapshot { pub plugin_filename: String, pub database_name: String, pub trigger_specification: String, + pub flags: Vec, pub trigger_arguments: Option>, pub disabled: bool, } @@ -471,6 +475,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot { trigger_name: trigger.trigger_name.to_string(), plugin_filename: trigger.plugin_filename.to_string(), database_name: trigger.database_name.to_string(), + flags: trigger.flags.clone(), trigger_specification: serde_json::to_string(&trigger.trigger) .expect("should be able to serialize trigger specification"), trigger_arguments: trigger.trigger_arguments.clone(), diff --git a/influxdb3_client/Cargo.toml b/influxdb3_client/Cargo.toml index b746af201b1..118630e8caf 100644 --- a/influxdb3_client/Cargo.toml +++ b/influxdb3_client/Cargo.toml @@ -11,6 +11,7 @@ iox_query_params.workspace = true # Local deps influxdb3_types = { path = "../influxdb3_types" } +influxdb3_wal = { path = "../influxdb3_wal" } # crates.io dependencies bytes.workspace = true diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 701e90be0f8..513aa3dfada 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -12,6 +12,7 @@ use url::Url; use influxdb3_types::http::*; pub use influxdb3_types::write::Precision; +use influxdb3_wal::TriggerFlag; /// Primary error type for the [`Client`] #[derive(Debug, thiserror::Error)] @@ -456,6 +457,7 @@ impl Client { } /// Make a request to `POST /api/v3/configure/processing_engine_trigger` + #[allow(clippy::too_many_arguments)] pub async fn api_v3_configure_processing_engine_trigger_create( &self, db: impl Into + Send, @@ -464,7 +466,13 @@ impl Client { trigger_spec: impl Into + Send, trigger_arguments: Option>, disabled: bool, + execute_async: bool, ) -> Result<()> { + let flags = if execute_async { + vec![TriggerFlag::ExecuteAsynchronously] + } else { + vec![] + }; let _bytes = self .send_json_get_bytes( Method::POST, @@ -474,6 +482,7 @@ impl Client { trigger_name: trigger_name.into(), plugin_filename: plugin_filename.into(), trigger_specification: trigger_spec.into(), + flags, trigger_arguments, disabled, }), diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index 5f666e60410..83156b35750 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -15,6 +15,7 @@ bytes.workspace = true chrono.workspace = true cron.workspace = true data_types.workspace = true +futures-util.workspace = true humantime.workspace = true hashbrown.workspace = true hyper.workspace = true diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 9f5ab61e02a..f2a886bfa30 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -19,7 +19,8 @@ use influxdb3_types::http::{ use influxdb3_wal::PluginType; use influxdb3_wal::{ CatalogBatch, CatalogOp, DeleteTriggerDefinition, SnapshotDetails, TriggerDefinition, - TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents, WalFileNotifier, WalOp, + TriggerFlag, TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents, + WalFileNotifier, WalOp, }; use influxdb3_write::WriteBuffer; use iox_time::TimeProvider; @@ -345,6 +346,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { db_name: &str, trigger_name: String, plugin_filename: String, + flags: Vec, trigger_specification: TriggerSpecificationDefinition, trigger_arguments: Option>, disabled: bool, @@ -360,6 +362,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { trigger_name, plugin_filename, trigger: trigger_specification, + flags, trigger_arguments, disabled, database_name: db_name.to_string(), @@ -449,7 +452,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { query_executor, sys_event_store: Arc::clone(&self.sys_event_store), }; - let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?; + let plugin_code = Arc::new(self.read_plugin_code(&trigger.plugin_filename).await?); match trigger.trigger.plugin_type() { PluginType::WalRows => { let rec = self @@ -642,13 +645,15 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { let code = self.read_plugin_code(&request.filename).await?; let code_string = code.code().to_string(); - let res = + let res = tokio::task::spawn_blocking(move || { plugins::run_test_wal_plugin(now, catalog, query_executor, code_string, request) .unwrap_or_else(|e| WalPluginTestResponse { log_lines: vec![], database_writes: Default::default(), errors: vec![e.to_string()], - }); + }) + }) + .await?; return Ok(res); } @@ -674,13 +679,16 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { let code = self.read_plugin_code(&request.filename).await?; let code_string = code.code().to_string(); - let res = plugins::run_test_schedule_plugin( - now, - catalog, - query_executor, - code_string, - request, - ) + let res = tokio::task::spawn_blocking(move || { + plugins::run_test_schedule_plugin( + now, + catalog, + query_executor, + code_string, + request, + ) + }) + .await? .unwrap_or_else(|e| SchedulePluginTestResponse { log_lines: vec![], database_writes: Default::default(), @@ -850,6 +858,7 @@ mod tests { "foo", "test_trigger".to_string(), file_name, + vec![], TriggerSpecificationDefinition::AllTablesWalWrite, None, false, @@ -935,6 +944,7 @@ mod tests { "foo", "test_trigger".to_string(), file_name, + vec![], TriggerSpecificationDefinition::AllTablesWalWrite, None, true, diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs index 8397cccd008..ab55fc1fb29 100644 --- a/influxdb3_processing_engine/src/manager.rs +++ b/influxdb3_processing_engine/src/manager.rs @@ -7,7 +7,7 @@ use influxdb3_types::http::{ SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest, WalPluginTestResponse, }; -use influxdb3_wal::TriggerSpecificationDefinition; +use influxdb3_wal::{TriggerFlag, TriggerSpecificationDefinition}; use influxdb3_write::WriteBuffer; use std::fmt::Debug; use std::sync::Arc; @@ -57,11 +57,13 @@ pub enum ProcessingEngineError { /// #[async_trait::async_trait] pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { + #[allow(clippy::too_many_arguments)] async fn insert_trigger( &self, db_name: &str, trigger_name: String, plugin_filename: String, + flags: Vec, trigger_specification: TriggerSpecificationDefinition, trigger_arguments: Option>, disabled: bool, diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index 2057c9313cb..c18dfbe77f7 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -81,12 +81,15 @@ pub enum PluginError { #[error("error reading file from Github: {0} {1}")] FetchingFromGithub(reqwest::StatusCode, String), + + #[error("Join error, please report: {0}")] + JoinError(#[from] tokio::task::JoinError), } #[cfg(feature = "system-py")] pub(crate) fn run_wal_contents_plugin( db_name: String, - plugin_code: PluginCode, + plugin_code: Arc, trigger_definition: TriggerDefinition, context: PluginContext, plugin_receiver: mpsc::Receiver, @@ -117,7 +120,7 @@ pub struct ProcessingEngineEnvironmentManager { #[cfg(feature = "system-py")] pub(crate) fn run_schedule_plugin( db_name: String, - plugin_code: PluginCode, + plugin_code: Arc, trigger_definition: TriggerDefinition, time_provider: Arc, context: PluginContext, @@ -158,7 +161,7 @@ pub(crate) fn run_schedule_plugin( #[cfg(feature = "system-py")] pub(crate) fn run_request_plugin( db_name: String, - plugin_code: PluginCode, + plugin_code: Arc, trigger_definition: TriggerDefinition, context: PluginContext, plugin_receiver: mpsc::Receiver, @@ -190,10 +193,10 @@ pub(crate) struct PluginContext { } #[cfg(feature = "system-py")] -#[derive(Debug)] +#[derive(Debug, Clone)] struct TriggerPlugin { trigger_definition: TriggerDefinition, - plugin_code: PluginCode, + plugin_code: Arc, db_name: String, write_buffer: Arc, query_executor: Arc, @@ -207,6 +210,8 @@ mod python_plugin { use chrono::{DateTime, Duration, Utc}; use cron::{OwnedScheduleIterator, Schedule as CronSchedule}; use data_types::NamespaceName; + use futures_util::stream::FuturesUnordered; + use futures_util::StreamExt; use humantime::{format_duration, parse_duration}; use hyper::http::HeaderValue; use hyper::{Body, Response, StatusCode}; @@ -215,7 +220,7 @@ mod python_plugin { execute_python_with_batch, execute_request_trigger, execute_schedule_trigger, PluginReturnState, ProcessingEngineLogger, }; - use influxdb3_wal::{WalContents, WalOp}; + use influxdb3_wal::{TriggerFlag, WalContents, WalOp}; use influxdb3_write::Precision; use iox_time::Time; use observability_deps::tracing::{info, warn}; @@ -229,26 +234,37 @@ mod python_plugin { mut receiver: Receiver, ) -> Result<(), PluginError> { info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting wal contents plugin"); - + let run_in_dedicated_task = self + .trigger_definition + .flags + .contains(&TriggerFlag::ExecuteAsynchronously); + let mut futures = FuturesUnordered::new(); loop { - let event = match receiver.recv().await { - Some(event) => event, - None => { - warn!(?self.trigger_definition, "trigger plugin receiver closed"); - break; + tokio::select! { + event = receiver.recv() => { + match event { + Some(WalEvent::WriteWalContents(wal_contents)) => { + if run_in_dedicated_task { + let clone = self.clone(); + futures.push(tokio::task::spawn(async move { + clone.process_wal_contents(wal_contents).await + })); + } else if let Err(e) = self.process_wal_contents(wal_contents).await { + error!(?self.trigger_definition, "error processing wal contents: {}", e); + } + } + Some(WalEvent::Shutdown(sender)) => { + sender.send(()).map_err(|_| PluginError::FailedToShutdown)?; + break; + } + None => {break;} + } } - }; - - match event { - WalEvent::WriteWalContents(wal_contents) => { - if let Err(e) = self.process_wal_contents(wal_contents).await { + Some(result) = futures.next() => { + if let Err(e) = result { error!(?self.trigger_definition, "error processing wal contents: {}", e); } } - WalEvent::Shutdown(sender) => { - sender.send(()).map_err(|_| PluginError::FailedToShutdown)?; - break; - } } } @@ -261,6 +277,11 @@ mod python_plugin { mut runner: ScheduleTriggerRunner, time_provider: Arc, ) -> Result<(), PluginError> { + let run_in_dedicated_task = self + .trigger_definition + .flags + .contains(&TriggerFlag::ExecuteAsynchronously); + let mut futures = FuturesUnordered::new(); loop { let Some(next_run_instant) = runner.next_run_time() else { break; @@ -271,7 +292,21 @@ mod python_plugin { let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { return Err(PluginError::MissingDb); }; - runner.run_at_time(self, schema).await?; + + let Some(trigger_time) = runner.next_trigger_time else { + return Err(anyhow!("running a cron trigger that is finished.").into()); + }; + + runner.advance_time(); + if run_in_dedicated_task { + let trigger =self.clone(); + let fut = async move {tokio::task::spawn_blocking(move || { + ScheduleTriggerRunner::run_at_time(trigger, trigger_time, schema)}).await + }; + futures.push(fut); + } else { + ScheduleTriggerRunner::run_at_time(self.clone(), trigger_time, schema).await?; + } } event = receiver.recv() => { match event { @@ -285,6 +320,11 @@ mod python_plugin { } } } + Some(result) = futures.next() => { + if let Err(e) = result { + error!(?self.trigger_definition, "error running async scheduled: {}", e); + } + } } } @@ -310,19 +350,27 @@ mod python_plugin { error!(?self.trigger_definition, "missing db schema"); return Err(PluginError::MissingDb); }; - let result = execute_request_trigger( - self.plugin_code.code().as_ref(), - Arc::clone(&schema), - Arc::clone(&self.query_executor), - Some(ProcessingEngineLogger::new( - Arc::clone(&self.sys_event_store), - self.trigger_definition.trigger_name.as_str(), - )), - &self.trigger_definition.trigger_arguments, - request.query_params, - request.headers, - request.body, - ); + let plugin_code = self.plugin_code.code(); + let query_executor = Arc::clone(&self.query_executor); + let logger = Some(ProcessingEngineLogger::new( + Arc::clone(&self.sys_event_store), + self.trigger_definition.trigger_name.as_str(), + )); + let trigger_arguments = self.trigger_definition.trigger_arguments.clone(); + + let result = tokio::task::spawn_blocking(move || { + execute_request_trigger( + plugin_code.as_ref(), + schema, + query_executor, + logger, + &trigger_arguments, + request.query_params, + request.headers, + request.body, + ) + }) + .await?; // produce the HTTP response let response = match result { @@ -387,7 +435,7 @@ mod python_plugin { return Err(PluginError::MissingDb); }; - for wal_op in &wal_contents.ops { + for (op_index, wal_op) in wal_contents.ops.iter().enumerate() { match wal_op { WalOp::Write(write_batch) => { // determine if this write batch is for this database @@ -397,46 +445,60 @@ mod python_plugin { continue; } let table_filter = match &self.trigger_definition.trigger { - TriggerSpecificationDefinition::AllTablesWalWrite => { - // no filter - None - } - TriggerSpecificationDefinition::SingleTableWalWrite { - table_name, - } => { - let table_id = schema - .table_name_to_id(table_name.as_ref()) - .context("table not found")?; - Some(table_id) - } - // This should not occur - TriggerSpecificationDefinition::Schedule { - schedule - } => { - return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into()) - } - TriggerSpecificationDefinition::Every { - duration, - } => { - return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into()) - } - TriggerSpecificationDefinition::RequestPath { path } => { - return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into()) - } - }; + TriggerSpecificationDefinition::AllTablesWalWrite => { + // no filter + None + } + TriggerSpecificationDefinition::SingleTableWalWrite { + table_name, + } => { + let table_id = schema + .table_name_to_id(table_name.as_ref()) + .context("table not found")?; + Some(table_id) + } + // This should not occur + TriggerSpecificationDefinition::Schedule { + schedule + } => { + return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into()) + } + TriggerSpecificationDefinition::Every { + duration, + } => { + return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into()) + } + TriggerSpecificationDefinition::RequestPath { path } => { + return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into()) + } + }; - let result = execute_python_with_batch( - self.plugin_code.code().as_ref(), - write_batch, - Arc::clone(&schema), - Arc::clone(&self.query_executor), - Some(ProcessingEngineLogger::new( - Arc::clone(&self.sys_event_store), - self.trigger_definition.trigger_name.as_str(), - )), - table_filter, - &self.trigger_definition.trigger_arguments, - )?; + let logger = Some(ProcessingEngineLogger::new( + Arc::clone(&self.sys_event_store), + self.trigger_definition.trigger_name.as_str(), + )); + let plugin_code = Arc::clone(&self.plugin_code.code()); + let query_executor = Arc::clone(&self.query_executor); + let schema_clone = Arc::clone(&schema); + let trigger_arguments = self.trigger_definition.trigger_arguments.clone(); + let wal_contents_clone = Arc::clone(&wal_contents); + + let result = tokio::task::spawn_blocking(move || { + let write_batch = match &wal_contents_clone.ops[op_index] { + WalOp::Write(wb) => wb, + _ => unreachable!("Index was checked."), + }; + execute_python_with_batch( + plugin_code.as_ref(), + write_batch, + schema_clone, + query_executor, + logger, + table_filter, + &trigger_arguments, + ) + }) + .await??; let errors = self.handle_return_state(result).await; // TODO: here is one spot we'll pick up errors to put into the plugin system table @@ -572,34 +634,34 @@ mod python_plugin { } async fn run_at_time( - &mut self, - plugin: &TriggerPlugin, + plugin: TriggerPlugin, + trigger_time: DateTime, db_schema: Arc, ) -> Result<(), PluginError> { - let Some(trigger_time) = self.next_trigger_time else { - return Err(anyhow!("running a cron trigger that is finished.").into()); - }; - - let result = execute_schedule_trigger( - plugin.plugin_code.code().as_ref(), - trigger_time, - Arc::clone(&db_schema), - Arc::clone(&plugin.query_executor), - Some(ProcessingEngineLogger::new( - Arc::clone(&plugin.sys_event_store), - plugin.trigger_definition.trigger_name.as_str(), - )), - &plugin.trigger_definition.trigger_arguments, - )?; + let plugin_code = plugin.plugin_code.code(); + let query_executor = Arc::clone(&plugin.query_executor); + let logger = Some(ProcessingEngineLogger::new( + Arc::clone(&plugin.sys_event_store), + plugin.trigger_definition.trigger_name.as_str(), + )); + let trigger_arguments = plugin.trigger_definition.trigger_arguments.clone(); + let result = tokio::task::spawn_blocking(move || { + execute_schedule_trigger( + plugin_code.as_ref(), + trigger_time, + db_schema, + query_executor, + logger, + &trigger_arguments, + ) + }) + .await??; let errors = plugin.handle_return_state(result).await; // TODO: here is one spot we'll pick up errors to put into the plugin system table for error in errors { error!(?plugin.trigger_definition, "error running schedule plugin: {}", error); } - - self.advance_time(); - Ok(()) } diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index 506777ba51e..5260120a0cf 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -665,7 +665,7 @@ pub fn execute_schedule_trigger( logger.log( LogLevel::Info, format!( - "finished execution in {:?}", + "finished execution in {}", format_duration(runtime.unwrap_or_default()) ), ); diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index e98ef6c99bc..bcb0e9fbc3a 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -1002,6 +1002,7 @@ where db, plugin_filename, trigger_name, + flags, trigger_specification, trigger_arguments, disabled, @@ -1025,6 +1026,7 @@ where db.as_str(), trigger_name.clone(), plugin_filename, + flags, trigger_spec, trigger_arguments, disabled, diff --git a/influxdb3_types/Cargo.toml b/influxdb3_types/Cargo.toml index 30c705424cc..067f16425b0 100644 --- a/influxdb3_types/Cargo.toml +++ b/influxdb3_types/Cargo.toml @@ -12,6 +12,7 @@ iox_query_params.workspace = true # Local deps influxdb3_cache = { path = "../influxdb3_cache" } +influxdb3_wal = { path = "../influxdb3_wal" } # crates.io dependencies serde.workspace = true diff --git a/influxdb3_types/src/http.rs b/influxdb3_types/src/http.rs index c75d2f5e0eb..76749171467 100644 --- a/influxdb3_types/src/http.rs +++ b/influxdb3_types/src/http.rs @@ -1,13 +1,13 @@ +use crate::write::Precision; use hashbrown::HashMap; use hyper::header::ACCEPT; use hyper::http::HeaderValue; use hyper::HeaderMap; use influxdb3_cache::distinct_cache::MaxCardinality; +use influxdb3_wal::TriggerFlag; use iox_query_params::StatementParams; use serde::{Deserialize, Serialize}; -use crate::write::Precision; - #[derive(Debug, thiserror::Error)] pub enum Error { #[error("invalid mime type ({0})")] @@ -158,6 +158,7 @@ pub struct ProcessingEngineTriggerCreateRequest { pub db: String, pub plugin_filename: String, pub trigger_name: String, + pub flags: Vec, pub trigger_specification: String, pub trigger_arguments: Option>, pub disabled: bool, diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index cab415f0e47..45d90e3f9f4 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -642,10 +642,17 @@ pub struct TriggerDefinition { pub plugin_filename: String, pub database_name: String, pub trigger: TriggerSpecificationDefinition, + pub flags: Vec, pub trigger_arguments: Option>, pub disabled: bool, } +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum TriggerFlag { + ExecuteAsynchronously, +} + #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct DeleteTriggerDefinition { pub trigger_name: String,