Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processing_engine): Allow async plugin execution. #25994

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ pub struct TriggerConfig {
/// Create trigger in disabled state
#[clap(long)]
disabled: bool,
#[clap(long)]
run_asynchronous: bool,
/// Name for the new trigger
trigger_name: String,
}
Expand Down Expand Up @@ -351,14 +353,14 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification,
trigger_arguments,
disabled,
run_asynchronous,
}) => {
let trigger_arguments: Option<HashMap<String, String>> = trigger_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});

//println!("does this work?");
match client
.api_v3_configure_processing_engine_trigger_create(
database_name,
Expand All @@ -367,6 +369,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification.string_rep(),
trigger_arguments,
disabled,
run_asynchronous,
)
.await
{
Expand Down
9 changes: 7 additions & 2 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::DistinctCacheDefinition;
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef, PluginType, TriggerDefinition};
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, LastCacheValueColumnsDef, PluginType,
TriggerDefinition, TriggerFlag,
};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
use schema::TIME_DATA_TIMEZONE;
Expand Down Expand Up @@ -153,6 +155,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
trigger_name: trigger.trigger_name,
plugin_filename: trigger.plugin_filename,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
flags: trigger.flags,
trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled,
database_name: trigger.database_name,
Expand Down Expand Up @@ -223,6 +226,7 @@ struct ProcessingEngineTriggerSnapshot {
pub plugin_filename: String,
pub database_name: String,
pub trigger_specification: String,
pub flags: Vec<TriggerFlag>,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
Expand Down Expand Up @@ -471,6 +475,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(),
database_name: trigger.database_name.to_string(),
flags: trigger.flags.clone(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
trigger_arguments: trigger.trigger_arguments.clone(),
Expand Down
1 change: 1 addition & 0 deletions influxdb3_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ iox_query_params.workspace = true

# Local deps
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_wal = { path = "../influxdb3_wal" }

# crates.io dependencies
bytes.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use url::Url;

use influxdb3_types::http::*;
pub use influxdb3_types::write::Precision;
use influxdb3_wal::TriggerFlag;

/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -456,6 +457,7 @@ impl Client {
}

/// Make a request to `POST /api/v3/configure/processing_engine_trigger`
#[allow(clippy::too_many_arguments)]
pub async fn api_v3_configure_processing_engine_trigger_create(
&self,
db: impl Into<String> + Send,
Expand All @@ -464,7 +466,13 @@ impl Client {
trigger_spec: impl Into<String> + Send,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
execute_async: bool,
) -> Result<()> {
let flags = if execute_async {
vec![TriggerFlag::ExecuteAsynchronously]
} else {
vec![]
};
let _bytes = self
.send_json_get_bytes(
Method::POST,
Expand All @@ -474,6 +482,7 @@ impl Client {
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
flags,
trigger_arguments,
disabled,
}),
Expand Down
1 change: 1 addition & 0 deletions influxdb3_processing_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes.workspace = true
chrono.workspace = true
cron.workspace = true
data_types.workspace = true
futures-util.workspace = true
humantime.workspace = true
hashbrown.workspace = true
hyper.workspace = true
Expand Down
32 changes: 21 additions & 11 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use influxdb3_types::http::{
use influxdb3_wal::PluginType;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteTriggerDefinition, SnapshotDetails, TriggerDefinition,
TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents, WalFileNotifier, WalOp,
TriggerFlag, TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents,
WalFileNotifier, WalOp,
};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
Expand Down Expand Up @@ -345,6 +346,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
Expand All @@ -360,6 +362,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
trigger_name,
plugin_filename,
trigger: trigger_specification,
flags,
trigger_arguments,
disabled,
database_name: db_name.to_string(),
Expand Down Expand Up @@ -449,7 +452,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
query_executor,
sys_event_store: Arc::clone(&self.sys_event_store),
};
let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?;
let plugin_code = Arc::new(self.read_plugin_code(&trigger.plugin_filename).await?);
match trigger.trigger.plugin_type() {
PluginType::WalRows => {
let rec = self
Expand Down Expand Up @@ -642,13 +645,15 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();

let res =
let res = tokio::task::spawn_blocking(move || {
plugins::run_test_wal_plugin(now, catalog, query_executor, code_string, request)
.unwrap_or_else(|e| WalPluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
});
})
})
.await?;

return Ok(res);
}
Expand All @@ -674,13 +679,16 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();

let res = plugins::run_test_schedule_plugin(
now,
catalog,
query_executor,
code_string,
request,
)
let res = tokio::task::spawn_blocking(move || {
plugins::run_test_schedule_plugin(
now,
catalog,
query_executor,
code_string,
request,
)
})
.await?
.unwrap_or_else(|e| SchedulePluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
Expand Down Expand Up @@ -850,6 +858,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
false,
Expand Down Expand Up @@ -935,6 +944,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
true,
Expand Down
4 changes: 3 additions & 1 deletion influxdb3_processing_engine/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_wal::{TriggerFlag, TriggerSpecificationDefinition};
use influxdb3_write::WriteBuffer;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -57,11 +57,13 @@ pub enum ProcessingEngineError {
///
#[async_trait::async_trait]
pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
#[allow(clippy::too_many_arguments)]
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
Expand Down
Loading