Skip to content

Commit

Permalink
Integrate async-container-supervisor.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Feb 27, 2025
1 parent be54958 commit f322ffb
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 131 deletions.
2 changes: 1 addition & 1 deletion falcon.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ Gem::Specification.new do |spec|

spec.add_dependency "async"
spec.add_dependency "async-container", "~> 0.20"
spec.add_dependency "async-container-supervisor", "~> 0.3.0"
spec.add_dependency "async-http", "~> 0.75"
spec.add_dependency "async-http-cache", "~> 0.4"
spec.add_dependency "async-service", "~> 0.10"
spec.add_dependency "bundler"
spec.add_dependency "localhost", "~> 1.1"
spec.add_dependency "openssl", "~> 3.0"
spec.add_dependency "process-metrics", "~> 0.2"
spec.add_dependency "protocol-http", "~> 0.31"
spec.add_dependency "protocol-rack", "~> 0.7"
spec.add_dependency "samovar", "~> 2.3"
Expand Down
21 changes: 5 additions & 16 deletions lib/falcon/environment/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,28 @@
require_relative "../service/supervisor"
require_relative "../environment"

require "io/endpoint/unix_endpoint"
require "async/container/supervisor"

module Falcon
module Environment
# Provides an environment for hosting a supervisor which can monitor multiple applications.
module Supervisor
include Async::Container::Supervisor::Environment

# The service class to use for the supervisor.
# @returns [Class]
def service_class
::Falcon::Service::Supervisor
end

# The name of the supervisor
# @returns [String]
def name
"supervisor"
end

# The IPC path to use for communication with the supervisor.
# @returns [String]
def ipc_path
::File.expand_path("supervisor.ipc", root)
end

# The endpoint the supervisor will bind to.
# @returns [::IO::Endpoint::Generic]
def endpoint
::IO::Endpoint.unix(ipc_path)
end

# Options to use when creating the container.
def container_options
{restart: true, count: 1, health_check_timeout: 30}
def monitors
[Async::Container::Supervisor::MemoryMonitor.new(interval: 10)]
end
end

Expand Down
116 changes: 8 additions & 108 deletions lib/falcon/service/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,119 +3,19 @@
# Released under the MIT License.
# Copyright, 2019-2024, by Samuel Williams.

require "process/metrics"
require "json"

require "async/service/generic"

require "io/endpoint/bound_endpoint"
require "io/stream"
require "async/container/supervisor/service"

module Falcon
module Service
# Implements a host supervisor which can restart the host services and provide various metrics about the running processes.
class Supervisor < Async::Service::Generic
# Initialize the supervisor using the given environment.
# @parameter environment [Build::Environment]
def initialize(...)
super

@bound_endpoint = nil
end

# The endpoint which the supervisor will bind to.
# Typically a unix pipe in the same directory as the host.
def endpoint
@evaluator.endpoint
end

# Restart the process group that the supervisor belongs to.
def do_restart(message)
# Tell the parent of this process group to spin up a new process group/container.
# Wait for that to start accepting new connections.
# Stop accepting connections.
# Wait for existing connnections to drain.
# Terminate this process group.
signal = message[:signal] || :INT

Process.kill(signal, Process.ppid)
end

# Capture process metrics relating to the process group that the supervisor belongs to.
def do_metrics(message)
Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid)
end

# Handle an incoming request.
# @parameter message [Hash] The decoded message.
def handle(message)
case message[:please]
when "restart"
self.do_restart(message)
when "metrics"
self.do_metrics(message)
end
end

# Bind the supervisor to the specified endpoint.
def start
Console.logger.info(self) {"Binding to #{self.endpoint}..."}

@bound_endpoint = Sync{self.endpoint.bound}

super
end

# Start the supervisor process which accepts connections from the bound endpoint and processes JSON formatted messages.
# @parameter container [Async::Container::Generic]
def setup(container)
container_options = @evaluator.container_options
health_check_timeout = container_options[:health_check_timeout]

container.run(name: self.name, **container_options) do |instance|
Async do
@bound_endpoint.accept do |peer|
stream = ::IO::Stream(peer)

while message = stream.read_until("\0")
response = handle(JSON.parse(message, symbolize_names: true))
stream.puts(response.to_json, separator: "\0")
end
end

instance.ready!

if health_check_timeout
Async(transient: true) do
while true
sleep(health_check_timeout / 2)
instance.ready!
end
end
end
end
end

super
end

# Release the bound endpoint.
def stop
@bound_endpoint&.close
@bound_endpoint = nil

super
end

def invoke(command)
@bound_endpoint.local_address_endpoint.connect do |peer|
stream = ::IO::Stream(peer)

stream.puts(command.to_json, separator: "\0")
class Supervisor < Async::Container::Supervisor::Service
def invoke(message)
Sync do
client = Async::Container::Supervisor::Client.new(endpoint: self.endpoint)

response = JSON.parse(stream.read_until("\0"), symbolize_names: true)

return response
client.connect do |connection|
connection.call(**message)
end
end
end
end
Expand Down
13 changes: 7 additions & 6 deletions test/falcon/service/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@

expect(container.group.running).to have_attributes(size: be == 1)

response = supervisor.invoke({please: "metrics"})
response = supervisor.invoke(do: "status")

expect(response).to be_a(Hash)
expect(response).to be_a(Array)
expect(response.size).to be == 1

# The supervisor should report itself:
expect(response.values).to have_value(have_keys(
command: be == "supervisor"
))
first = response.first
expect(first).to have_keys(
memory_monitor: be_a(Hash),
)
ensure
supervisor.stop
container.stop
Expand Down

0 comments on commit f322ffb

Please sign in to comment.