From e99b317eb4e23cc717f32e050b07fdeed8cbd8aa Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 27 Feb 2025 08:54:15 +1300 Subject: [PATCH] Integrate `async-container-supervisor`. --- bake/falcon/supervisor.rb | 4 +- examples/benchmark/small.js | 21 +++++ examples/dungeons/Modelfile | 8 ++ examples/dungeons/characters/cat.model | 37 ++++++++ examples/hello-event/scheduler.rb | 89 ++++++++++++++++++ examples/hello-event/server.rb | 33 +++++++ examples/hello/falcon.rb | 2 +- falcon.gemspec | 2 +- gems.rb | 1 + lib/falcon/command/supervisor.rb | 73 --------------- lib/falcon/command/top.rb | 2 - lib/falcon/environment/supervisor.rb | 26 +----- lib/falcon/service/server.rb | 4 + lib/falcon/service/supervisor.rb | 123 ------------------------- releases.md | 21 +++++ test/falcon/service/supervisor.rb | 13 +-- 16 files changed, 228 insertions(+), 231 deletions(-) create mode 100644 examples/benchmark/small.js create mode 100644 examples/dungeons/Modelfile create mode 100644 examples/dungeons/characters/cat.model create mode 100644 examples/hello-event/scheduler.rb create mode 100755 examples/hello-event/server.rb delete mode 100644 lib/falcon/command/supervisor.rb delete mode 100644 lib/falcon/service/supervisor.rb diff --git a/bake/falcon/supervisor.rb b/bake/falcon/supervisor.rb index 1aad062..ad73c6c 100644 --- a/bake/falcon/supervisor.rb +++ b/bake/falcon/supervisor.rb @@ -4,7 +4,5 @@ # Copyright, 2020-2025, by Samuel Williams. def restart - require_relative "../../lib/falcon/command/supervisor" - - Falcon::Command::Supervisor["restart"].call + context.lookup("async:container:supervisor:restart").call end diff --git a/examples/benchmark/small.js b/examples/benchmark/small.js new file mode 100644 index 0000000..38d70fe --- /dev/null +++ b/examples/benchmark/small.js @@ -0,0 +1,21 @@ +export const options = { + stages: [ + // Warmup: Gradually ramp up: + {duration: '10s', target: 64}, + + // Main test: Sustained load: + {duration: '1m', target: 64}, + ], +}; + +import http from 'k6/http'; +import { check, sleep } from 'k6'; + +export default function () { + const res = http.get('http://localhost:9292/small'); + + check(res, { + 'is status 200': (r) => r.status === 200, + 'response time < 200ms': (r) => r.timings.duration < 200, + }); +} \ No newline at end of file diff --git a/examples/dungeons/Modelfile b/examples/dungeons/Modelfile new file mode 100644 index 0000000..db51d9b --- /dev/null +++ b/examples/dungeons/Modelfile @@ -0,0 +1,8 @@ +FROM llama2 +# sets the temperature to 1 [higher is more creative, lower is more coherent] +PARAMETER temperature 1 +# sets the context window size to 4096, this controls how many tokens the LLM can use as context to generate the next token +PARAMETER num_ctx 4096 + +# sets a custom system message to specify the behavior of the chat assistant +SYSTEM You find yourself at the entrance to a dungeon. What do you do next? \ No newline at end of file diff --git a/examples/dungeons/characters/cat.model b/examples/dungeons/characters/cat.model new file mode 100644 index 0000000..1a30060 --- /dev/null +++ b/examples/dungeons/characters/cat.model @@ -0,0 +1,37 @@ +FROM llama2:13b + +PARAMETER num_ctx 16384 + +SYSTEM """ +You are an entity in a role playing game. You will receive input from the game world and must respond with actions. Input from the game world will include nearby events and dialogue. You can perform one or more actions include speaking, moving and using things in the world. You must ONLY respond with actions in the specified format otherwise the game will respond with ERROR to inform you that the input was not understood, and you should try again using one of the defined actions. + +Inputs will be formatted in the following way: + +TIME [time] + The game world has advanced to the specified time. +SEE [entity] [distance] [action] + You see something. +HEAR [entity] [distance] [dialogue] + You hear something. +ARRIVE [entity] + You arrive somewhere. +ERROR [message] + Your action was not understood (perhaps try again). + +Actions must be formatted in the following way: + +MOVE TOWARDS [entity] + You will begin moving towards the named entity. +SPEAK "dialogue" + You will speak the specified dialogue. +USE [entity] + You will use the named entity. +LOOK + You will look around and the game will inform you of nearby entities. +PASS + Do nothing. + +Before you try to interact with things, ensure you LOOK to see what is available. Then, MOVE TOWARDS the entity. Finally the game world will inform you when you ARRIVE at the entity. You may perform more than one action at a time if you list them on separate lines. + +The entity you are simulating is a cat. +""" diff --git a/examples/hello-event/scheduler.rb b/examples/hello-event/scheduler.rb new file mode 100644 index 0000000..de31bdf --- /dev/null +++ b/examples/hello-event/scheduler.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2021-2023, by Samuel Williams. + +$LOAD_PATH << File.expand_path("../../lib", __dir__) +$LOAD_PATH << File.expand_path("../../ext", __dir__) + +require "io/event" + +require "socket" +require "fiber" + +class Scheduler + def initialize(selector = nil) + @fiber = Fiber.current + @selector = selector || IO::Event::Selector.new(@fiber) + @pending = [] + @waiting = {} + + unless @selector.respond_to?(:io_close) + instance_eval{undef io_close} + end + + @mutex = Mutex.new + end + + def block(blocker, timeout) + raise NotImplementedError + end + + def unblock(blocker, fiber) + raise NotImplementedError + end + + def io_wait(io, events, timeout) + fiber = Fiber.current + @waiting[fiber] = io + @selector.io_wait(fiber, io, events) + ensure + @waiting.delete(fiber) + end + + def io_close(io) + @selector.io_close(io) + end + + def kernel_sleep(duration) + @selector.defer + end + + def close + while @selector.ready? || @waiting.any? + begin + @selector.select(nil) + rescue Errno::EINTR + # Ignore. + end + end + rescue Interrupt + # Exit. + end + + def fiber(&block) + fiber = Fiber.new(&block) + + @selector.resume(fiber) + + return fiber + end +end + +class DirectScheduler < Scheduler + def io_read(io, buffer, length) + fiber = Fiber.current + @waiting[fiber] = io + result = @selector.io_read(fiber, io, buffer, length) + ensure + @waiting.delete(fiber) + end + + def io_write(io, buffer, length) + fiber = Fiber.current + @waiting[fiber] = io + @selector.io_write(fiber, io, buffer, length) + ensure + @waiting.delete(fiber) + end +end diff --git a/examples/hello-event/server.rb b/examples/hello-event/server.rb new file mode 100755 index 0000000..4582244 --- /dev/null +++ b/examples/hello-event/server.rb @@ -0,0 +1,33 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2021-2023, by Samuel Williams. + +require_relative "scheduler" +require "io/nonblock" + +#scheduler = DirectScheduler.new +scheduler = Scheduler.new +Fiber.set_scheduler(scheduler) + +port = Integer(ARGV.pop || 3020) + +RESPONSE = "HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World" + +Fiber.schedule do + server = TCPServer.new("localhost", port) + server.listen(Socket::SOMAXCONN) + + loop do + peer, address = server.accept + + Fiber.schedule do + while request_line = peer.readpartial(1024) rescue nil + peer.write(RESPONSE) + end + ensure + peer.close + end + end +end diff --git a/examples/hello/falcon.rb b/examples/hello/falcon.rb index b8318e3..ea6e757 100755 --- a/examples/hello/falcon.rb +++ b/examples/hello/falcon.rb @@ -21,7 +21,7 @@ # append preload "preload.rb" - + include Async::Container::Supervisor::Supervised end service "supervisor" do diff --git a/falcon.gemspec b/falcon.gemspec index a31d9ca..e18d555 100644 --- a/falcon.gemspec +++ b/falcon.gemspec @@ -28,13 +28,13 @@ Gem::Specification.new do |spec| spec.add_dependency "async" spec.add_dependency "async-container", "~> 0.20" + spec.add_dependency "async-container-supervisor", "~> 0.4.0" spec.add_dependency "async-http", "~> 0.75" spec.add_dependency "async-http-cache", "~> 0.4" spec.add_dependency "async-service", "~> 0.10" spec.add_dependency "bundler" spec.add_dependency "localhost", "~> 1.1" spec.add_dependency "openssl", "~> 3.0" - spec.add_dependency "process-metrics", "~> 0.2" spec.add_dependency "protocol-http", "~> 0.31" spec.add_dependency "protocol-rack", "~> 0.7" spec.add_dependency "samovar", "~> 2.3" diff --git a/gems.rb b/gems.rb index b9258d6..9a77f1b 100644 --- a/gems.rb +++ b/gems.rb @@ -11,6 +11,7 @@ # gem "async-http", path: "../async-http-native-io" # gem "openssl", git: "https://github.com/ruby/openssl.git" # gem "async-container", path: "../async-container" +gem "async-container-supervisor", path: "../async-container-supervisor" # gem "async-websocket", path: "../async-websocket" # gem "async-http", path: "../async-http" # gem "async-http-cache", path: "../async-http-cache" diff --git a/lib/falcon/command/supervisor.rb b/lib/falcon/command/supervisor.rb deleted file mode 100644 index 99d6924..0000000 --- a/lib/falcon/command/supervisor.rb +++ /dev/null @@ -1,73 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2019-2024, by Samuel Williams. - -require "samovar" -require "async" -require "json" - -require "io/endpoint/unix_endpoint" -require "io/stream" - -module Falcon - module Command - # Implements the `falcon supervisor` command. - # - # Talks to an instance of the supervisor to issue commands and print results. - class Supervisor < Samovar::Command - self.description = "Control and query a specific supervisor." - - # The command line options. - # @attribute [Samovar::Options] - options do - option "--path ", "The control IPC path.", default: "supervisor.ipc" - end - - # Implements the `falcon supervisor restart` command. - class Restart < Samovar::Command - self.description = "Restart the process group." - - # Send the restart message to the supervisor. - def call(stream) - stream.puts({please: "restart"}.to_json, separator: "\0") - end - end - - # Implements the `falcon supervisor metrics` command. - class Metrics < Samovar::Command - self.description = "Show metrics about the falcon processes." - - # Send the metrics message to the supervisor and print the results. - def call(stream) - stream.puts({please: "metrics"}.to_json, separator: "\0", chomp: true) - response = JSON.parse(stream.read_until("\0"), symbolize_names: true) - - $stdout.puts response - end - end - - # The nested command to execute. - # @name nested - # @attribute [Command] - nested :command, { - "restart" => Restart, - "metrics" => Metrics, - }, default: "metrics" - - # The endpoint the supervisor is bound to. - def endpoint - ::IO::Endpoint.unix(@options[:path]) - end - - # Connect to the supervisor and execute the requested command. - def call - Sync do - endpoint.connect do |socket| - @command.call(IO::Stream(socket)) - end - end - end - end - end -end diff --git a/lib/falcon/command/top.rb b/lib/falcon/command/top.rb index 6d7ae17..44bdc31 100644 --- a/lib/falcon/command/top.rb +++ b/lib/falcon/command/top.rb @@ -8,7 +8,6 @@ require_relative "virtual" require_relative "proxy" require_relative "redirect" -require_relative "supervisor" require_relative "../version" @@ -38,7 +37,6 @@ class Top < Samovar::Command "virtual" => Virtual, "proxy" => Proxy, "redirect" => Redirect, - "supervisor" => Supervisor, }, default: "serve" # Whether verbose logging is enabled. diff --git a/lib/falcon/environment/supervisor.rb b/lib/falcon/environment/supervisor.rb index c10f3b0..2bc16e7 100644 --- a/lib/falcon/environment/supervisor.rb +++ b/lib/falcon/environment/supervisor.rb @@ -3,26 +3,15 @@ # Released under the MIT License. # Copyright, 2019-2024, by Samuel Williams. -require_relative "../service/supervisor" require_relative "../environment" -require "io/endpoint/unix_endpoint" +require "async/container/supervisor" module Falcon module Environment # Provides an environment for hosting a supervisor which can monitor multiple applications. module Supervisor - # The service class to use for the supervisor. - # @returns [Class] - def service_class - ::Falcon::Service::Supervisor - end - - # The name of the supervisor - # @returns [String] - def name - "supervisor" - end + include Async::Container::Supervisor::Environment # The IPC path to use for communication with the supervisor. # @returns [String] @@ -30,15 +19,8 @@ def ipc_path ::File.expand_path("supervisor.ipc", root) end - # The endpoint the supervisor will bind to. - # @returns [::IO::Endpoint::Generic] - def endpoint - ::IO::Endpoint.unix(ipc_path) - end - - # Options to use when creating the container. - def container_options - {restart: true, count: 1, health_check_timeout: 30} + def monitors + [Async::Container::Supervisor::MemoryMonitor.new(interval: 10)] end end diff --git a/lib/falcon/service/server.rb b/lib/falcon/service/server.rb index 42b064d..02b4d23 100644 --- a/lib/falcon/service/server.rb +++ b/lib/falcon/service/server.rb @@ -58,6 +58,10 @@ def setup(container) evaluator = @environment.evaluator Async do |task| + if @environment.implements?(Async::Container::Supervisor::Supervised) + evaluator.make_supervised_worker(instance).run + end + server = evaluator.make_server(@bound_endpoint) server.run diff --git a/lib/falcon/service/supervisor.rb b/lib/falcon/service/supervisor.rb deleted file mode 100644 index 7c3421a..0000000 --- a/lib/falcon/service/supervisor.rb +++ /dev/null @@ -1,123 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2019-2024, by Samuel Williams. - -require "process/metrics" -require "json" - -require "async/service/generic" - -require "io/endpoint/bound_endpoint" -require "io/stream" - -module Falcon - module Service - # Implements a host supervisor which can restart the host services and provide various metrics about the running processes. - class Supervisor < Async::Service::Generic - # Initialize the supervisor using the given environment. - # @parameter environment [Build::Environment] - def initialize(...) - super - - @bound_endpoint = nil - end - - # The endpoint which the supervisor will bind to. - # Typically a unix pipe in the same directory as the host. - def endpoint - @evaluator.endpoint - end - - # Restart the process group that the supervisor belongs to. - def do_restart(message) - # Tell the parent of this process group to spin up a new process group/container. - # Wait for that to start accepting new connections. - # Stop accepting connections. - # Wait for existing connnections to drain. - # Terminate this process group. - signal = message[:signal] || :INT - - Process.kill(signal, Process.ppid) - end - - # Capture process metrics relating to the process group that the supervisor belongs to. - def do_metrics(message) - Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid) - end - - # Handle an incoming request. - # @parameter message [Hash] The decoded message. - def handle(message) - case message[:please] - when "restart" - self.do_restart(message) - when "metrics" - self.do_metrics(message) - end - end - - # Bind the supervisor to the specified endpoint. - def start - Console.logger.info(self) {"Binding to #{self.endpoint}..."} - - @bound_endpoint = Sync{self.endpoint.bound} - - super - end - - # Start the supervisor process which accepts connections from the bound endpoint and processes JSON formatted messages. - # @parameter container [Async::Container::Generic] - def setup(container) - container_options = @evaluator.container_options - health_check_timeout = container_options[:health_check_timeout] - - container.run(name: self.name, **container_options) do |instance| - Async do - @bound_endpoint.accept do |peer| - stream = ::IO::Stream(peer) - - while message = stream.read_until("\0") - response = handle(JSON.parse(message, symbolize_names: true)) - stream.puts(response.to_json, separator: "\0") - end - end - - instance.ready! - - if health_check_timeout - Async(transient: true) do - while true - sleep(health_check_timeout / 2) - instance.ready! - end - end - end - end - end - - super - end - - # Release the bound endpoint. - def stop - @bound_endpoint&.close - @bound_endpoint = nil - - super - end - - def invoke(command) - @bound_endpoint.local_address_endpoint.connect do |peer| - stream = ::IO::Stream(peer) - - stream.puts(command.to_json, separator: "\0") - - response = JSON.parse(stream.read_until("\0"), symbolize_names: true) - - return response - end - end - end - end -end diff --git a/releases.md b/releases.md index 310d350..1fa1993 100644 --- a/releases.md +++ b/releases.md @@ -4,6 +4,27 @@ - Introduce {ruby Falcon::Environment::Server#make_server} which gives you full control over the server creation process. +### Introduce `Async::Container::Supervisor`. + +`Async::Container::Supervisor` is a new supervisor implementation that replaces Falcon's own supervisor. This allows you to use the same supervisor for all your services, and provides a more consistent interface for managing services. The supervisor is now a separate gem, `async-container-supervisor`. + +By default, the supervisor does not perform any monitoring, but you may add monitoring by defining them in the service definition. For example: + +``` ruby +service "supervisor" do + include Async::Container::Supervisor::Environment + + monitors do + [ + # Limit total memory usage to 512MiB: + Async::Container::Supervisor::MemoryMonitor.new(interval: 10, limit: 1024 * 1024 * 512), + ] + end +end +``` + +We retain the `falcon:supervisor:restart` task, but you may prefer to use `async:container:supervisor:restart` directly. + ## v0.50.0 - Add {ruby Falcon::Environment::Server#endpoint_options} to allow configuration of the endpoint options more easily. diff --git a/test/falcon/service/supervisor.rb b/test/falcon/service/supervisor.rb index 18c8d84..d023b0c 100644 --- a/test/falcon/service/supervisor.rb +++ b/test/falcon/service/supervisor.rb @@ -35,14 +35,15 @@ expect(container.group.running).to have_attributes(size: be == 1) - response = supervisor.invoke({please: "metrics"}) + response = supervisor.invoke(do: "status") - expect(response).to be_a(Hash) + expect(response).to be_a(Array) + expect(response.size).to be == 1 - # The supervisor should report itself: - expect(response.values).to have_value(have_keys( - command: be == "supervisor" - )) + first = response.first + expect(first).to have_keys( + memory_monitor: be_a(Hash), + ) ensure supervisor.stop container.stop