Skip to content

Commit

Permalink
Proper testing for pool config
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Nov 28, 2024
1 parent b1c3f76 commit 98676e8
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 37 deletions.
64 changes: 27 additions & 37 deletions src/throttle/amoc_throttle_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,52 +66,42 @@ pool_config(Rate, 0) ->
Config = #{max_n => Rate, delay => 0, status => inactive, pid => undefined},
PoolConfig = #{1 := First} = maps:from_keys(lists:seq(1, no_of_processes()), Config),
PoolConfig#{1 := First#{status => active}};
pool_config(Rate, Interval) ->
pool_config(Rate, Interval) when ?POS_INT(Rate), ?POS_INT(Interval) ->
NoOfProcesses = no_of_processes(),
RatePerMinutePerProcess = (60000 * Rate div Interval) div NoOfProcesses,
DelayPerProcess = (NoOfProcesses * Interval) div Rate,
Rem = ((60000 * Rate div Interval) rem NoOfProcesses)
+ ((NoOfProcesses * Interval) rem Rate),
calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem).
BaseRatePerMinute = round((60000 / Interval) * Rate),
RatesPerProcess = calculate_rate_per_process(NoOfProcesses, BaseRatePerMinute, []),
#{} = lists:foldl(fun assign_process/2, #{}, RatesPerProcess).

calculate_rate_per_process(1, Rate, Acc) ->
[{1, Rate} | Acc];
calculate_rate_per_process(N, Rate, Acc) when is_integer(N), N > 1 ->
ProcessRate = Rate div N,
calculate_rate_per_process(N - 1, Rate - ProcessRate, [{N, ProcessRate} | Acc]).

assign_process({N, 0}, Config) ->
Config#{N => #{max_n => 0,
delay => infinity,
status => inactive,
pid => undefined}};
assign_process({N, RatePerProcess}, Config) ->
Delay = case {60000 div RatePerProcess, 60000 rem RatePerProcess} of
{DelayBetweenExecutions, 0} -> DelayBetweenExecutions;
{DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1
end,
Config#{N => #{max_n => RatePerProcess,
delay => Delay,
status => active,
pid => undefined}}.

-spec process_pool_config(pid(), pool_config()) -> pool_config().
process_pool_config(PoolSup, PoolConfig) ->
Workers = amoc_throttle_pool:get_workers(PoolSup),
Fun1 = fun(N, Config) -> Config#{pid => maps:get(N, Workers)} end,
maps:map(Fun1, PoolConfig).

-spec calculate_availability(integer(), integer(), pos_integer(), integer()) -> pool_config().
calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem) ->
Fun = fun(N, {Acc, R}) ->
case {RatePerMinutePerProcess < NoOfProcesses, R} of
{true, 0} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess + 1,
status => inactive, pid => undefined},
{Acc#{N => Config}, R};
{true, R} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess,
status => active, pid => undefined},
{Acc#{N => Config}, R - 1};
{false, 0} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess,
status => active, pid => undefined},
{Acc#{N => Config}, R};
{false, R} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess + 1,
status => active, pid => undefined},
{Acc#{N => Config}, R - 1}
end
end,
{#{} = PoolConfig, _} = lists:foldl(Fun, {#{}, Rem}, lists:seq(1, NoOfProcesses)),
PoolConfig.

-spec no_of_processes() -> non_neg_integer().
no_of_processes() ->
3 * erlang:system_info(schedulers_online).
36.

-spec do_verify_gradual_config(amoc_throttle:gradual_plan()) -> gradual_plan().
do_verify_gradual_config(
Expand Down Expand Up @@ -159,4 +149,4 @@ do_verify_gradual_config(
calculate_step(N, N, _, _, To) -> To;
calculate_step(0, _, _, From, _) -> From;
calculate_step(N, _, StepRate, From, _) ->
From + floor(StepRate * N).
From + round(StepRate * N).
62 changes: 62 additions & 0 deletions test/throttle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ groups() ->
change_interarrival_gradually,
change_rate_gradually_verify_descriptions,
change_rate_gradually_verify_descriptions_properties,
% These tests depend on the available level of concurrency.
% Note that the smallest delay possible for a process is 1ms (receive operations),
% hence if we give for example 10 workers 1ms delays, we get 600_000 ticks per minute.
% and if we give for example 48 workers 1ms delays, we get 2_880_000 ticks per minute.
% That means, that is realistically the maximum rate we could possibly manage
% with a static pool of such number of workers.
pool_config_is_precise_for_low_rates,
pool_config_is_precise_for_medium_rates,
% TODO: introduce dynamically sized pools in order to manage higher rates.
%pool_config_is_precise_for_high_rates,
just_wait,
wait_for_process_to_die_sends_a_kill,
async_runner_dies_while_waiting_raises_exit,
Expand Down Expand Up @@ -353,6 +363,25 @@ change_rate_gradually_verify_descriptions_properties(_) ->
Fun(From, To, Interval, StepInterval, StepCount)),
run_prop(?FUNCTION_NAME, Prop, 1 bsl 16, 3).

%% From 1/min to 60000/min
pool_config_is_precise_for_low_rates(_) ->
pool_config_property_tests(integer(1, 60000),
timer:minutes(1),
100_000).

%% From 60000/min to 65536
pool_config_is_precise_for_medium_rates(_) ->
Concurrency = erlang:system_info(schedulers_online),
pool_config_property_tests(integer(1 bsl 16, 1 bsl (16 + Concurrency)),
timer:minutes(1),
200_000).

pool_config_is_precise_for_high_rates(_) ->
Concurrency = erlang:system_info(schedulers_online),
pool_config_property_tests(integer(1 bsl (16 + Concurrency), 1 bsl (16 + 8*Concurrency)),
timer:minutes(1),
300_000).

just_wait(_) ->
%% it fails if the throttle wasn't started yet
?assertMatch({error, no_throttle_process_registered},
Expand Down Expand Up @@ -431,6 +460,39 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) ->
end,
?assert(lists:any(IsLowRateEventFn, TelemetryEvents)).

pool_config_property_tests(RateGen, IntervalGen, Ops) ->
Fun = fun(Rate, Interval) ->
R1 = amoc_throttle_config:pool_config(Rate, Interval),
max_n_and_delays_are_positive(Rate, Interval, R1)
andalso accumulated_is_requested(Rate, Interval, R1)
end,
Prop = ?FORALL({Rate, Interval}, {RateGen, IntervalGen}, Fun(Rate, Interval)),
run_prop(?FUNCTION_NAME, Prop, Ops, 1).

max_n_and_delays_are_positive(Rate, Interval, Res) ->
Pred = fun(_N, #{max_n := N, delay := D}) ->
0 > N orelse 0 > D orelse ((Interval / Rate) > D)
end,
Filter = maps:filter(Pred, Res),
0 =:= map_size(Filter) orelse throw(Filter).

-define(TOL, 1.0). %% Error is less than 1%.
accumulated_is_requested(Rate, Interval, Res) ->
%% An active process will accept 60k/D actions per minute
Fold = fun(_N, #{status := active, max_n := N, delay := D}, Acc) ->
Acc + (N / D);
(_, _, Acc) ->
Acc
end,
Returned = maps:fold(Fold, +0.0, Res),
Expected = Rate / Interval,
Error = abs((Returned / 1000) - Expected),
?TOL >= Error orelse throw(#{throttle => #{rate => Rate, interval => Interval},
expected_rate_per_minute => Expected,
returned_aggregated_rate_per_minute => Returned,
error_percentage => Error,
config => Res}).

run_prop(PropName, Property, NumTests, WorkersPerScheduler) ->
Opts = [noshrink, {start_size, 1}, {numtests, NumTests},
{numworkers, WorkersPerScheduler * erlang:system_info(schedulers_online)}],
Expand Down

0 comments on commit 98676e8

Please sign in to comment.