Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Users supervision tree #175

Merged
merged 25 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
620fb31
Optimise user supervision
NelsonVides Dec 2, 2023
4348817
Support interarrival of zero
NelsonVides Dec 6, 2023
487217e
Have amoc_user stop itself encapsulated in proc_lib as well
NelsonVides Dec 15, 2023
42067cd
Pool user supervisors
NelsonVides Dec 16, 2023
d5770ae
Optimise starting many users immediately
NelsonVides Dec 22, 2023
77f92d0
Rename top supervisors and worker supervisors consistently
NelsonVides Jan 15, 2024
ed06eaf
Ensure proper order of init and terminate sup trees
NelsonVides Jan 15, 2024
6b03cd4
Remove support for OTP24 as we use new funs from maps and rand
NelsonVides Jan 15, 2024
c942862
Explain and add specs to amoc_users_sup
NelsonVides Jan 15, 2024
6d6a7f1
Fix issue with stop_children in worker_sup
NelsonVides Jan 15, 2024
921c03e
Keep track of worker_sup counts in the atomics array
NelsonVides Jan 15, 2024
037dc05
Ensure positive assignments to user sups
NelsonVides Jan 15, 2024
392f452
Adapt integration test to the new internal representation
NelsonVides Jan 15, 2024
326fd4b
Update copyrights
NelsonVides Jan 15, 2024
a1bf272
Document that user removal is asynchronous
NelsonVides Feb 20, 2024
ae280fb
Fix supervision flags for the top supervisor
NelsonVides Feb 20, 2024
865cbf7
Ensure better indexes for amoc_users_sup
NelsonVides Feb 20, 2024
6e0c5de
Improve encapsulation of amoc_users_worker_sup
NelsonVides Feb 20, 2024
cf64003
Simplify amoc_users_sup init
NelsonVides Feb 20, 2024
1bf7b3f
Reenable OTP24
NelsonVides Feb 20, 2024
8d53e23
Revert amoc_user stop
NelsonVides Feb 20, 2024
1487478
Store lists, counts and indexes in the amoc_users_sup pt state
NelsonVides Feb 21, 2024
cf9a6e5
Rework removal assignments more efficiently
NelsonVides Feb 21, 2024
11ffbba
Add more tests to the number of users being removed
NelsonVides Feb 21, 2024
623a89f
Apply review to tests and users sup init
NelsonVides Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ jobs:
name: ${{ matrix.test-type }} test on OTP ${{matrix.otp_vsn}}
strategy:
matrix:
otp_vsn: ['26.1', '25.3', '24.3']
otp_vsn: ['26.2', '25.3', '24.3']
rebar_vsn: ['3.22.0']
test-type: ['regular', 'integration']
runs-on: 'ubuntu-22.04'
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.otp_vsn }}
Expand Down
2 changes: 1 addition & 1 deletion elvis.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{elvis_style, invalid_dynamic_call, #{ignore => [amoc_code_server_SUITE]}},
{elvis_style, dont_repeat_yourself, #{min_complexity => 50}},
{elvis_style, no_debug_call, disable},
{elvis_style, no_block_expressions, #{ignore => [amoc_code_server_SUITE]}},
{elvis_style, no_block_expressions, #{ignore => [amoc_code_server_SUITE, controller_SUITE]}},
{elvis_style, no_throw, disable},
{elvis_style, no_import, disable}
]},
Expand Down
6 changes: 4 additions & 2 deletions guides/distributed-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ amoc_dist:remove(50, Force).
```elixir
:amoc_dist.remove(50, force).
```
Remove 50 sessions.
Remove 50 sessions.

Where ``Force`` is a boolean of value:

* ``true`` - to kill the user processes using ``supervisor:terminate_child/2`` function
* ``false`` - to send ``exit(User,shutdown)`` signal to the user process (can be ignored by the user)

All the users are `temporary` children of the `simple_one_for_one` supervisor with the `shutdown` key set to `2000`.
All the users are `temporary` children of a `simple_one_for_one` supervisor with the `shutdown` key set to `2000`.

Note that removal operation is asynchronous, and if we call `amoc_controller:remove_users/2` two times in a row, it may select the same users for removal.

Also all the user processes trap exits.

Expand Down
4 changes: 4 additions & 0 deletions guides/local-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ amoc:remove(10, true).
:amoc.remove(10, true).
```

Note that removal operation is asynchronous, and if we call `amoc_controller:remove_users/2` two times in a row, it may select the same users for removal.

Also note that all the user processes trap exits.

#### Many independent Amoc nodes

Sometimes a need arises to run several Amoc nodes independently from each other.
Expand Down
2 changes: 1 addition & 1 deletion integration_test/extra_code_paths/path1/dummy_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ test_amoc_dist() ->
get_users_info(SlaveNodes) ->
Users = [{Node, Id} ||
Node <- SlaveNodes,
{Id, _Pid} <- rpc:call(Node, ets, tab2list, [amoc_users])],
{_Pid, Id} <- rpc:call(Node, amoc_users_sup, get_all_children, [])],
Ids = lists:usort([Id || {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, _} <- Users]),
N = length(Ids),
Expand Down
145 changes: 67 additions & 78 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%% @copyright 2023 Erlang Solutions Ltd.
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Main controller of a node, responsible for the scenario and the users
%%
%% Note that this module should be rarely used, APIs are fully exposed by `amoc' and `amoc_dist'
Expand All @@ -8,16 +8,14 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(USERS_TABLE, amoc_users).

-required_variable(#{name => interarrival, default_value => 50,
verification => {?MODULE, positive_integer, 1},
verification => {?MODULE, non_neg_integer, 1},
description => "a delay between creating the processes for two "
"consecutive users (ms, def: 50ms)",
update => {?MODULE, maybe_update_interarrival_timer, 2}}).

-record(state, {scenario :: amoc:scenario() | undefined,
no_of_users = 0 :: user_count(),
last_user_id = 0 :: last_user_id(),
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
Expand Down Expand Up @@ -67,7 +65,9 @@
%% ------------------------------------------------------------------
%% Parameters verification functions
%% ------------------------------------------------------------------
-export([maybe_update_interarrival_timer/2, positive_integer/1]).
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).

-export([zero_users_running/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
Expand All @@ -77,6 +77,7 @@
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec start_link() -> {ok, pid()}.
start_link() ->
Expand Down Expand Up @@ -121,17 +122,24 @@ disable() ->
gen_server:call(?SERVER, disable).

%% @private
-spec positive_integer(any()) -> boolean().
positive_integer(Interarrival) ->
is_integer(Interarrival) andalso Interarrival > 0.
-spec non_neg_integer(any()) -> boolean().
non_neg_integer(Interarrival) ->
is_integer(Interarrival) andalso Interarrival >= 0.

%% @private
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
maybe_update_interarrival_timer(interarrival, _) ->
gen_server:cast(?SERVER, maybe_update_interarrival_timer).

%% @private
-spec zero_users_running() -> ok.
zero_users_running() ->
gen_server:cast(?SERVER, zero_users_running).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec init([]) -> {ok, state()}.
init([]) ->
Expand Down Expand Up @@ -174,6 +182,9 @@ handle_call(_Request, _From, State) ->
-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast(maybe_update_interarrival_timer, State) ->
{noreply, maybe_update_interarrival_timer(State)};
handle_cast(zero_users_running, State) ->
NewSate = handle_zero_users_running(State),
{noreply, NewSate};
handle_cast(_Msg, State) ->
{noreply, State}.

Expand All @@ -182,8 +193,8 @@ handle_cast(_Msg, State) ->
handle_info(start_user, State) ->
NewSate = handle_start_user(State),
{noreply, NewSate};
handle_info({'DOWN', _, process, Pid, _}, State) ->
NewSate = handle_stop_user(Pid, State),
handle_info(start_all_users, State) ->
NewSate = handle_start_all_users(State),
{noreply, NewSate};
handle_info(_Msg, State) ->
{noreply, State}.
Expand All @@ -209,12 +220,15 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
terminate_scenario(State),
{ok, State#state{status = finished}};
handle_stop_scenario(#state{status = running} = State) ->
terminate_all_users(),
{ok, State#state{status = terminating}};
case amoc_users_sup:count_no_of_users() of
0 ->
terminate_scenario(State),
{ok, State#state{status = finished}};
_ ->
amoc_users_sup:terminate_all_children(),
{ok, State#state{status = terminating}}
end;
handle_stop_scenario(#state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

Expand All @@ -240,29 +254,25 @@ handle_add(StartId, EndId, #state{last_user_id = LastId,
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef,
last_user_id = EndId}};
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
handle_add(_StartId, _EndId, #state{status = running} = State) ->
{{error, invalid_range}, State};
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
amoc_telemetry:execute([controller, users], #{count => Count},
CountRemove = amoc_users_sup:stop_children(Count, ForceRemove),
amoc_telemetry:execute([controller, users], #{count => CountRemove},
#{scenario => Scenario, type => remove}),
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
'$end_of_table' -> []
end,
amoc_users_sup:stop_children(Pids, ForceRemove),
{ok, length(Pids)};
{ok, CountRemove};
handle_remove(_Count, _ForceRemove, #state{status = Status}) ->
{error, {invalid_status, Status}}.

-spec handle_status(state()) -> amoc_status().
handle_status(#state{status = running, scenario = Scenario,
no_of_users = N, last_user_id = LastId}) ->
last_user_id = LastId}) ->
N = amoc_users_sup:count_no_of_users(),
{running, #{scenario => Scenario, currently_running_users => N, highest_user_id => LastId}};
handle_status(#state{status = terminating, scenario = Scenario}) ->
{terminating, Scenario};
Expand All @@ -279,33 +289,26 @@ handle_disable(#state{status = Status} = State) ->

-spec handle_start_user(state()) -> state().
handle_start_user(#state{create_users = [UserId | T],
no_of_users = N,
scenario = Scenario,
scenario_state = ScenarioState} = State) ->
start_user(Scenario, UserId, ScenarioState),
State#state{create_users = T, no_of_users = N + 1};
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
State#state{create_users = T};
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
State#state{tref = maybe_stop_timer(TRef)}.

-spec handle_stop_user(pid(), state()) -> state().
handle_stop_user(Pid, State) ->
case ets:match(?USERS_TABLE, {'$1', Pid}, 1) of
{[[UserId]], _} ->
ets:delete(?USERS_TABLE, UserId),
dec_no_of_users(State);
_ ->
State
end.
-spec handle_start_all_users(state()) -> state().
handle_start_all_users(#state{create_users = AllUsers,
scenario = Scenario,
scenario_state = ScenarioState,
tref = TRef} = State) ->
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.

%% ------------------------------------------------------------------
%% helpers
%% ------------------------------------------------------------------
-spec start_tables() -> ok.
start_tables() -> %% ETS creation
?USERS_TABLE = ets:new(?USERS_TABLE, [named_table,
ordered_set,
protected,
{read_concurrency, true}]),
amoc_config_utils:create_amoc_config_ets(),
ok.

Expand All @@ -321,11 +324,12 @@ init_scenario(Scenario, Settings) ->
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
amoc_scenario:terminate(Scenario, ScenarioState).

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
{ok, TRef} = timer:send_interval(interarrival(), start_user),
TRef;
maybe_start_timer(TRef) -> TRef.
-spec handle_zero_users_running(state()) -> state().
handle_zero_users_running(#state{status = terminating} = State) ->
terminate_scenario(State),
State#state{status = finished};
handle_zero_users_running(State) ->
State.

-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
maybe_stop_timer(undefined) ->
Expand All @@ -334,43 +338,28 @@ maybe_stop_timer(TRef) ->
{ok, cancel} = timer:cancel(TRef),
undefined.

-spec start_user(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok.
start_user(Scenario, Id, ScenarioState) ->
{ok, Pid} = supervisor:start_child(amoc_users_sup, [Scenario, Id, ScenarioState]),
ets:insert(?USERS_TABLE, {Id, Pid}),
erlang:monitor(process, Pid),
ok.

-spec terminate_all_users() -> any().
terminate_all_users() ->
%stop all the users
Match = ets:match_object(?USERS_TABLE, '$1', 200),
terminate_all_users(Match).

%% ets:continuation/0 type is unfortunately not exported from the ets module.
-spec terminate_all_users({tuple(), term()} | '$end_of_table') -> ok.
terminate_all_users({Objects, Continuation}) ->
Pids = [Pid || {_Id, Pid} <- Objects],
amoc_users_sup:stop_children(Pids, true),
Match = ets:match_object(Continuation),
terminate_all_users(Match);
terminate_all_users('$end_of_table') -> ok.

-spec dec_no_of_users(state()) -> state().
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
terminate_scenario(State),
State#state{no_of_users = 0, status = finished};
dec_no_of_users(#state{no_of_users = N} = State) ->
State#state{no_of_users = N - 1}.

-spec interarrival() -> interarrival().
interarrival() ->
-spec get_interarrival() -> interarrival().
get_interarrival() ->
amoc_config:get(interarrival).

-spec maybe_update_interarrival_timer(state()) -> state().
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
State;
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
{ok, cancel} = timer:cancel(TRef),
{ok, NewTRef} = timer:send_interval(interarrival(), start_user),
Value = get_interarrival(),
NewTRef = do_interarrival(Value),
State#state{tref = NewTRef}.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
Value = get_interarrival(),
do_interarrival(Value);
maybe_start_timer(TRef) -> TRef.

do_interarrival(0) ->
self() ! start_all_users,
undefined;
do_interarrival(Value) ->
{ok, NewTRef} = timer:send_interval(Value, start_user),
NewTRef.
9 changes: 3 additions & 6 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @private
%% @copyright 2023 Erlang Solutions Ltd.
%% @copyright 2024 Erlang Solutions Ltd.
-module(amoc_sup).

-behaviour(supervisor).
Expand All @@ -26,12 +26,9 @@ start_link() ->
%% Supervisor callbacks
%% ===================================================================
-spec init(term()) ->
{ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
MaxR :: non_neg_integer(), MaxT :: pos_integer()},
[ChildSpec :: supervisor:child_spec()]
}}.
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
{ok, {{one_for_one, 5, 10},
{ok, {#{strategy => one_for_all, intensity => 0},
[
?SUP(amoc_users_sup, supervisor),
?SUP(amoc_throttle_sup, supervisor),
Expand Down
6 changes: 3 additions & 3 deletions src/users/amoc_user.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @private
%% @copyright 2023 Erlang Solutions Ltd.
%% @copyright 2024 Erlang Solutions Ltd.
-module(amoc_user).

%% API
Expand All @@ -10,15 +10,15 @@
-type state() :: term().

-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
{ok, pid()}.
{ok, pid()} | {error, term()}.
start_link(Scenario, Id, State) ->
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).

-spec stop() -> no_return().
stop() ->
stop(self(), false).

-spec stop(pid(), boolean()) -> no_return() | ok | {error, any()}.
-spec stop(pid(), boolean()) -> ok.
stop(Pid, Force) when is_pid(Pid) ->
amoc_users_sup:stop_child(Pid, Force).

Expand Down
Loading
Loading