Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial reload of pipeline #16183

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
# @ready requires thread safety since it is typically polled from outside the pipeline thread
@ready = Concurrent::AtomicBoolean.new(false)
@running = Concurrent::AtomicBoolean.new(false)
@partialReloading = Concurrent::AtomicBoolean.new(false)
@flushing = java.util.concurrent.atomic.AtomicBoolean.new(false)
@flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
@shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
Expand Down Expand Up @@ -201,7 +202,12 @@ def run
transition_to_running
start_flusher # Launches a non-blocking thread for flush events
begin
monitor_inputs_and_workers
loop do
monitor_inputs_and_workers
break if @partialReloading.false?
start_workers
@partialReloading.make_false
end
ensure
transition_to_stopped

Expand Down Expand Up @@ -273,7 +279,7 @@ def start_workers
"pipeline.batch.delay" => batch_delay,
"pipeline.max_inflight" => max_inflight,
"pipeline.sources" => pipeline_source_details)
@logger.info("Starting pipeline", pipeline_log_params)
@logger.info("Starting pipeline workers", pipeline_log_params)

if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
Expand Down Expand Up @@ -317,7 +323,7 @@ def start_workers
# Finally inputs should be started last, after all workers have been initialized and started

begin
start_inputs
start_inputs if @partialReloading.false?
rescue => e
# if there is any exception in starting inputs, make sure we shutdown workers.
# exception will already by logged in start_inputs
Expand All @@ -331,6 +337,17 @@ def start_workers
end
end

def partial_reload(new_pipeline_config)
if reload_workers(new_pipeline_config)
@partialReloading.make_true
shutdown_workers
@shutdownRequested.set(false)
true
else
false
end
end

def resolve_cluster_uuids
outputs.each_with_object(Set.new) do |output, cluster_uuids|
if LogStash::PluginMetadata.exists?(output.id)
Expand All @@ -349,6 +366,7 @@ def monitor_inputs_and_workers

if @input_threads.delete(terminated_thread).nil?
# this is an abnormal worker thread termination, we need to terminate the pipeline
break if @partialReloading

@worker_threads.delete(terminated_thread)

Expand Down
19 changes: 10 additions & 9 deletions logstash-core/lib/logstash/pipeline_action/reload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,16 @@ def execute(agent, pipelines_registry)
# important NOT to explicitly return from block here
# the block must emit a success boolean value

# First shutdown old pipeline
old_pipeline.shutdown

# Then create a new pipeline
new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
success = new_pipeline.start # block until the pipeline is correctly started or crashed

# return success and new_pipeline to registry reload_pipeline
[success, new_pipeline]
if old_pipeline.partial_reload(@pipeline_config)
[true, old_pipeline]
else
old_pipeline.shutdown
new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
success = new_pipeline.start # block until the pipeline is correctly started or crashed

# return success and new_pipeline to registry reload_pipeline
[success, new_pipeline]
end
end

LogStash::ConvergeResult::ActionResult.create(self, success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,19 @@ public final class CompiledPipeline {
/**
* Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}.
*/
private final Map<String, AbstractFilterDelegatorExt> filters;
private Map<String, AbstractFilterDelegatorExt> filters;

/**
* Configured outputs.
*/
private final Map<String, AbstractOutputDelegatorExt> outputs;
private Map<String, AbstractOutputDelegatorExt> outputs;

/**
* Parsed pipeline configuration graph.
*/
private final PipelineIR pipelineIR;
public final PipelineIR pipelineIR;

private ConfigVariableExpander cve;

/**
* Ruby plugin factory instance.
Expand All @@ -110,9 +112,10 @@ public CompiledPipeline(
{
this.pipelineIR = pipelineIR;
this.pluginFactory = pluginFactory;
try (ConfigVariableExpander cve = new ConfigVariableExpander(
this.cve = new ConfigVariableExpander(
secretStore,
EnvironmentVariableProvider.defaultProvider())) {
EnvironmentVariableProvider.defaultProvider());
try {
inputs = setupInputs(cve);
filters = setupFilters(cve);
outputs = setupOutputs(cve);
Expand All @@ -121,6 +124,12 @@ public CompiledPipeline(
}
}

public void resetFilterOutputLIR(PipelineIR newPipelineIR) {
this.pipelineIR.updateFilterOutputLIR(newPipelineIR);
this.filters = setupFilters(cve);
this.outputs = setupOutputs(cve);
}

public Collection<AbstractOutputDelegatorExt> outputs() {
return Collections.unmodifiableCollection(outputs.values());
}
Expand Down
31 changes: 29 additions & 2 deletions logstash-core/src/main/java/org/logstash/config/ir/PipelineIR.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,21 @@ public Graph getGraph() {
return graph;
}

public Graph getInputGraph() {
return inputGraph;
}

public QueueVertex getQueue() {
return queue;
}

private final Graph graph;
private Graph graph;

private final QueueVertex queue;
private final Graph inputGraph;
private final Graph filterGraph;
private final Graph outputGraph;

private QueueVertex queue;

// Temporary until we have LIR execution
// Then we will no longer need this property here
Expand All @@ -54,6 +62,10 @@ public PipelineIR(Graph inputSection, Graph filterSection, Graph outputSection)
public PipelineIR(Graph inputSection, Graph filterSection, Graph outputSection, String originalSource) throws InvalidIRException {
this.originalSource = originalSource;

this.inputGraph = inputSection.copy(); // useful for checking if partial reload is possible
this.filterGraph = filterSection.copy(); // useful for partial reload
this.outputGraph = outputSection.copy(); // useful for partial reload

Graph tempGraph = inputSection.copy(); // The input section are our roots, so we can import that wholesale

// Connect all the input vertices out to the queue
Expand Down Expand Up @@ -145,4 +157,19 @@ private static QueueVertex selectQueueVertex(final Graph graph, final QueueVerte
return tempQueue;
}
}

public void updateFilterOutputLIR(PipelineIR newPipelineIR) {
try {
this.graph = inputGraph.copy();
QueueVertex tempQueueVertex = new QueueVertex();
this.graph = this.graph.chain(tempQueueVertex);
this.graph = this.graph.chain(newPipelineIR.filterGraph.copy());
this.graph = this.graph.chain(new SeparatorVertex("filter_to_output"));
this.graph = this.graph.chain(newPipelineIR.outputGraph.copy());
this.queue = selectQueueVertex(this.graph, tempQueueVertex);
this.graph.validate();
} catch (InvalidIRException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ public class AbstractPipelineExt extends RubyBasicObject {
private @SuppressWarnings("rawtypes") RubyArray inputs;
private @SuppressWarnings("rawtypes") RubyArray filters;
private @SuppressWarnings("rawtypes") RubyArray outputs;
private PluginMetricsFactoryExt metricsFactoryExt;
private ExecutionContextFactoryExt executionContextFactoryExt;

public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand All @@ -171,16 +173,18 @@ public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) {
public AbstractPipelineExt initialize(final ThreadContext context, final IRubyObject[] args)
throws IncompleteSourceWithMetadataException, NoSuchAlgorithmException {
initialize(context, args[0], args[1], args[2]);
this.metricsFactoryExt = new PluginMetricsFactoryExt(
context.runtime, RubyUtil.PLUGIN_METRICS_FACTORY_CLASS
).initialize(context, pipelineId(), metric());
this.executionContextFactoryExt = new ExecutionContextFactoryExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS
).initialize(context, args[3], this, dlqWriter(context));
lirExecution = new CompiledPipeline(
lir,
new PluginFactoryExt(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init(
lir,
new PluginMetricsFactoryExt(
context.runtime, RubyUtil.PLUGIN_METRICS_FACTORY_CLASS
).initialize(context, pipelineId(), metric()),
new ExecutionContextFactoryExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS
).initialize(context, args[3], this, dlqWriter(context)),
metricsFactoryExt,
executionContextFactoryExt,
RubyUtil.FILTER_DELEGATOR_CLASS
),
getSecretStore(context)
Expand Down Expand Up @@ -407,6 +411,29 @@ public final IRubyObject closeDlqWriter(final ThreadContext context) {
return context.nil;
}

@JRubyMethod(name = "reload_workers")
public final IRubyObject reloadWorkers(final ThreadContext context,
final IRubyObject pipelineConfig) {

PipelineIR new_lir;
PipelineConfig newPipelineConfig = pipelineConfig.toJava(PipelineConfig.class);
boolean supportEscapes = getSetting(context, "config.support_escapes").isTrue();
try (ConfigVariableExpander cve = new ConfigVariableExpander(getSecretStore(context), EnvironmentVariableProvider.defaultProvider())) {
new_lir = ConfigCompiler.configToPipelineIR(newPipelineConfig.getConfigParts(), supportEscapes, cve);
} catch (InvalidIRException iirex) {
throw new IllegalArgumentException(iirex);
}
if (new_lir.getInputGraph().sourceComponentEquals(lir.getInputGraph())) {
LOGGER.info("WOOHOOO WE CAN PARTIALLY RELOAD");
lirExecution.resetFilterOutputLIR(new_lir);
this.lir = lirExecution.pipelineIR;
this.pipelineSettings = pipelineConfig;
return context.tru;
} else {
return context.fals;
}
}

@JRubyMethod
public final PipelineReporterExt reporter() {
return reporter;
Expand Down