diff --git a/src/throttle/amoc_throttle_config.erl b/src/throttle/amoc_throttle_config.erl index f25974ac..c4faf8e4 100644 --- a/src/throttle/amoc_throttle_config.erl +++ b/src/throttle/amoc_throttle_config.erl @@ -66,13 +66,32 @@ pool_config(Rate, 0) -> Config = #{max_n => Rate, delay => 0, status => inactive, pid => undefined}, PoolConfig = #{1 := First} = maps:from_keys(lists:seq(1, no_of_processes()), Config), PoolConfig#{1 := First#{status => active}}; -pool_config(Rate, Interval) -> +pool_config(Rate, Interval) when ?POS_INT(Rate), ?POS_INT(Interval) -> NoOfProcesses = no_of_processes(), - RatePerMinutePerProcess = (60000 * Rate div Interval) div NoOfProcesses, - DelayPerProcess = (NoOfProcesses * Interval) div Rate, - Rem = ((60000 * Rate div Interval) rem NoOfProcesses) - + ((NoOfProcesses * Interval) rem Rate), - calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem). + BaseRatePerMinute = round((60000 / Interval) * Rate), + RatesPerProcess = calculate_rate_per_process(NoOfProcesses, BaseRatePerMinute, []), + #{} = lists:foldl(fun assign_process/2, #{}, RatesPerProcess). + +calculate_rate_per_process(1, Rate, Acc) -> + [{1, Rate} | Acc]; +calculate_rate_per_process(N, Rate, Acc) when is_integer(N), N > 1 -> + ProcessRate = Rate div N, + calculate_rate_per_process(N - 1, Rate - ProcessRate, [{N, ProcessRate} | Acc]). + +assign_process({N, 0}, Config) -> + Config#{N => #{max_n => 0, + delay => infinity, + status => inactive, + pid => undefined}}; +assign_process({N, RatePerProcess}, Config) -> + Delay = case {60000 div RatePerProcess, 60000 rem RatePerProcess} of + {DelayBetweenExecutions, 0} -> DelayBetweenExecutions; + {DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 + end, + Config#{N => #{max_n => RatePerProcess, + delay => Delay, + status => active, + pid => undefined}}. -spec process_pool_config(pid(), pool_config()) -> pool_config(). process_pool_config(PoolSup, PoolConfig) -> @@ -80,38 +99,9 @@ process_pool_config(PoolSup, PoolConfig) -> Fun1 = fun(N, Config) -> Config#{pid => maps:get(N, Workers)} end, maps:map(Fun1, PoolConfig). --spec calculate_availability(integer(), integer(), pos_integer(), integer()) -> pool_config(). -calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem) -> - Fun = fun(N, {Acc, R}) -> - case {RatePerMinutePerProcess < NoOfProcesses, R} of - {true, 0} -> - Config = #{max_n => RatePerMinutePerProcess, - delay => DelayPerProcess + 1, - status => inactive, pid => undefined}, - {Acc#{N => Config}, R}; - {true, R} -> - Config = #{max_n => RatePerMinutePerProcess, - delay => DelayPerProcess, - status => active, pid => undefined}, - {Acc#{N => Config}, R - 1}; - {false, 0} -> - Config = #{max_n => RatePerMinutePerProcess, - delay => DelayPerProcess, - status => active, pid => undefined}, - {Acc#{N => Config}, R}; - {false, R} -> - Config = #{max_n => RatePerMinutePerProcess, - delay => DelayPerProcess + 1, - status => active, pid => undefined}, - {Acc#{N => Config}, R - 1} - end - end, - {#{} = PoolConfig, _} = lists:foldl(Fun, {#{}, Rem}, lists:seq(1, NoOfProcesses)), - PoolConfig. - -spec no_of_processes() -> non_neg_integer(). no_of_processes() -> - 3 * erlang:system_info(schedulers_online). + 36. -spec do_verify_gradual_config(amoc_throttle:gradual_plan()) -> gradual_plan(). do_verify_gradual_config( @@ -159,4 +149,4 @@ do_verify_gradual_config( calculate_step(N, N, _, _, To) -> To; calculate_step(0, _, _, From, _) -> From; calculate_step(N, _, StepRate, From, _) -> - From + floor(StepRate * N). + From + round(StepRate * N). diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index e819d4f6..f4e44a5c 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -36,6 +36,16 @@ groups() -> change_interarrival_gradually, change_rate_gradually_verify_descriptions, change_rate_gradually_verify_descriptions_properties, + % These tests depend on the available level of concurrency. + % Note that the smallest delay possible for a process is 1ms (receive operations), + % hence if we give for example 10 workers 1ms delays, we get 600_000 ticks per minute. + % and if we give for example 48 workers 1ms delays, we get 2_880_000 ticks per minute. + % That means, that is realistically the maximum rate we could possibly manage + % with a static pool of such number of workers. + pool_config_is_precise_for_low_rates, + pool_config_is_precise_for_medium_rates, + % TODO: introduce dynamically sized pools in order to manage higher rates. + %pool_config_is_precise_for_high_rates, just_wait, wait_for_process_to_die_sends_a_kill, async_runner_dies_while_waiting_raises_exit, @@ -353,6 +363,25 @@ change_rate_gradually_verify_descriptions_properties(_) -> Fun(From, To, Interval, StepInterval, StepCount)), run_prop(?FUNCTION_NAME, Prop, 1 bsl 16, 3). +%% From 1/min to 60000/min +pool_config_is_precise_for_low_rates(_) -> + pool_config_property_tests(integer(1, 60000), + timer:minutes(1), + 100_000). + +%% From 60000/min to 65536 +pool_config_is_precise_for_medium_rates(_) -> + Concurrency = erlang:system_info(schedulers_online), + pool_config_property_tests(integer(1 bsl 16, 1 bsl (16 + Concurrency)), + timer:minutes(1), + 200_000). + +pool_config_is_precise_for_high_rates(_) -> + Concurrency = erlang:system_info(schedulers_online), + pool_config_property_tests(integer(1 bsl (16 + Concurrency), 1 bsl (16 + 8*Concurrency)), + timer:minutes(1), + 300_000). + just_wait(_) -> %% it fails if the throttle wasn't started yet ?assertMatch({error, no_throttle_process_registered}, @@ -431,6 +460,39 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) -> end, ?assert(lists:any(IsLowRateEventFn, TelemetryEvents)). +pool_config_property_tests(RateGen, IntervalGen, Ops) -> + Fun = fun(Rate, Interval) -> + R1 = amoc_throttle_config:pool_config(Rate, Interval), + max_n_and_delays_are_positive(Rate, Interval, R1) + andalso accumulated_is_requested(Rate, Interval, R1) + end, + Prop = ?FORALL({Rate, Interval}, {RateGen, IntervalGen}, Fun(Rate, Interval)), + run_prop(?FUNCTION_NAME, Prop, Ops, 1). + +max_n_and_delays_are_positive(Rate, Interval, Res) -> + Pred = fun(_N, #{max_n := N, delay := D}) -> + 0 > N orelse 0 > D orelse ((Interval / Rate) > D) + end, + Filter = maps:filter(Pred, Res), + 0 =:= map_size(Filter) orelse throw(Filter). + +-define(TOL, 1.0). %% Error is less than 1%. +accumulated_is_requested(Rate, Interval, Res) -> + %% An active process will accept 60k/D actions per minute + Fold = fun(_N, #{status := active, max_n := N, delay := D}, Acc) -> + Acc + (N / D); + (_, _, Acc) -> + Acc + end, + Returned = maps:fold(Fold, +0.0, Res), + Expected = Rate / Interval, + Error = abs((Returned / 1000) - Expected), + ?TOL >= Error orelse throw(#{throttle => #{rate => Rate, interval => Interval}, + expected_rate_per_minute => Expected, + returned_aggregated_rate_per_minute => Returned, + error_percentage => Error, + config => Res}). + run_prop(PropName, Property, NumTests, WorkersPerScheduler) -> Opts = [noshrink, {start_size, 1}, {numtests, NumTests}, {numworkers, WorkersPerScheduler * erlang:system_info(schedulers_online)}],