diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 532bf9989cc..97cdba006a8 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -22,6 +22,13 @@ class LogStash::Agent include LogStash::Util::Loggable STARTED_AT = Time.now.freeze + # Only one agent object should be active at a time + # This invariant is mostly here for tests, to ensure we don't accidentally start two + # We record the backtrace of every agent start to make it possible to locate the source + # of the conflict, since it will be the preceeding instantiation that is the likely leak + CURRENT = java.util.concurrent.atomic.AtomicReference.new + CURRENT_INITIALIZED_BACKTRACE = java.util.concurrent.atomic.AtomicReference.new + attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipelines, :pipeline_bus attr_accessor :logger @@ -31,6 +38,14 @@ class LogStash::Agent # :auto_reload [Boolean] - enable reloading of pipelines # :reload_interval [Integer] - reload pipelines every X seconds def initialize(settings = LogStash::SETTINGS, source_loader = nil) + + # Check that we are the sole live instance + if (CURRENT.compare_and_set(nil, self)) + CURRENT_INITIALIZED_BACKTRACE.set(Kernel.caller); + else + raise "Only one agent may be active at a time! Cannot start a new one, old instance initialized in #{CURRENT_INITIALIZED_BACKTRACE.get.inspect}" + end + @logger = self.class.logger @settings = settings @auto_reload = setting("config.reload.automatic") @@ -178,6 +193,11 @@ def shutdown transition_to_stopped converge_result = shutdown_pipelines converge_result + + # Allow other agents to be created again + if (!CURRENT.compare_and_set(self, nil)) + raise "Current agent is not the one that is shutting down, #{CURRENT.get.object_id} should be shut down. This should never happen" + end end def id diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 6588347ae7b..a23501252ac 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -50,11 +50,8 @@ end after :each do - subject.shutdown - LogStash::SETTINGS.reset - FileUtils.rm(config_file) - FileUtils.rm_rf(subject.id_path) + LogStash::SETTINGS.reset end it "fallback to hostname when no name is provided" do @@ -120,7 +117,6 @@ Stud.stop!(t) t.join - subject.shutdown end end @@ -141,7 +137,6 @@ Stud.stop!(t) t.join - subject.shutdown end end @@ -162,7 +157,6 @@ Stud.stop!(t) t.join - subject.shutdown end end @@ -185,7 +179,6 @@ Stud.stop!(t) t.join - subject.shutdown end end @@ -206,7 +199,6 @@ Stud.stop!(t) t.join - subject.shutdown end end end @@ -229,10 +221,6 @@ subject { described_class.new(mock_settings(agent_args), source_loader) } - after do - subject.shutdown - end - context "environment variable templating" do before :each do @foo_content = ENV["FOO"] @@ -273,18 +261,13 @@ expect(subject.converge_state_and_update).to be_a_successful_converge end - after(:each) do - # new pipelines will be created part of the upgrade process so we need - # to close any initialized pipelines - subject.shutdown - end - context "when the upgrade fails" do it "leaves the state untouched" do expect(subject.converge_state_and_update).not_to be_a_successful_converge expect(subject.get_pipeline(default_pipeline_id).config_str).to eq(pipeline_config) end + # TODO(ph): This valid? xcontext "and current state is empty" do it "should not start a pipeline" do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 30ab077c047..313ad0ba314 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -52,6 +52,17 @@ def puts(payload) example.run end end + + c.after(:each) do + # Sometime tests instantiate an agent but don't shut it down + # We conveniently handle this, to prevent tests from polluting each other + current_agent = ::LogStash::Agent::CURRENT.get + if current_agent + current_agent.shutdown + + FileUtils.rm_rf(current_agent.id_path) + end + end end def installed_plugins