Skip to content

Commit

Permalink
Fix interval and parallelism according to review
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Nov 25, 2024
1 parent cf7f2b4 commit 012f282
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 22 deletions.
33 changes: 19 additions & 14 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
%% - infinity for effectively pausing the throttle,
%% - zero for effectively unlocking all throttling.

-type interval() :: pos_integer().
-type interval() :: non_neg_integer().
%% In milliseconds, defaults to 60000 (one minute).
%%
%% Note that an interval of zero means effectively allowing `t:rate/0' number of executions in
%% parallel. It might be expected for this to be always `infinity' as a result of the limit when
%% dividing by zero, but this needs to be made explicit in the `t:rate/0' by setting it to infinity.

-type t() :: #{rate := rate(), interval => interval()} |
#{interarrival := interarrival()}.
Expand All @@ -31,13 +35,13 @@
-type gradual_rate_config() :: #{from_rate := non_neg_integer(),
to_rate := non_neg_integer(),
interval => interval(),
step_interval => interval(),
step_interval => pos_integer(),
step_size => pos_integer(),
step_count => pos_integer(),
duration => pos_integer()} |
#{from_interarrival := interarrival(),
to_interarrival := interarrival(),
step_interval => interval(),
step_interval => pos_integer(),
step_size => pos_integer(),
step_count => pos_integer(),
duration => pos_integer()}.
Expand Down Expand Up @@ -109,7 +113,7 @@ change_rate_gradually(Name, Config) ->
%% `Fn' is executed in the context of a new process spawned on the same node on which
%% the process executing `run/2' runs, so a call to `run/2' is non-blocking.
%%
%% <a href="https://sequencediagram.org/index.html#initialData=C4S2BsFMAIEEFsD2BjaATEBnYAnEAjAV2EjQCgAHAQx1GRGoDtgBzHRQi6aAKh4GVwVAG4xGiNJD5lu3arRD0mwaAFVMkHDNny6DKsziYAno1Q5CjRprKRG5XYv3M2HLnwCyVbJujjJ0rLQjkoGKgAqABbswBAwFOzIkJiYtvZk+IgAHtBQAGYqiHlqGjgAXNAgZtA4kACONVQkZGTqvgC0AHy8sCbVFla+FQBili1tONBd0FExccGJyZjQFQDemMiRpIRQADRGpuaW1pMACgCSACIAvhnZNSAskYXFs4ixUAsoSyuV-fWNZpkAC8wOA0Xe4HAVRY6EgQmMoJabw+8UWKSm3V6hxqx00FQ2WzQO1ILUyOXyLwO-TxkwqVVQkCykHMTUgZCo5OgiFEk2xNMGdOgo0YAAoAJQc5CgYRs6lHQV3HJ4J5UlHzBLfDH06pMlmA9mSKjSkCykjy3GK-kKk6Y9WfTVJFJlADklwA8gB1AByLrIkmw7GMFoGJzIQA" target="_blank">Diagram</a>
%% <a href="https://sequencediagram.org/index.html#initialData=C4S2BsFMAIEEFsD2BjaATEBnYAnEAjAV2EjQCgyAHAQx1GRBoDtgBzHRQy6aAKl4DK4agDcYTRGkj8yPHjTogGzYNACqmSDllyF9RtRZxMATyaochJky1lITcnqUGW7Tt34BZati3QJUjJy0E7KhqoAKgAWHMAQMJQcyJCYmHYOZPiIAB7QUABmqoj56po4AFzQOJAAjoQpqiRQ8JC4JtCQYiwUGn4AtAB8fLCm5lVWNjjQlQBiVj1l0IPQ0bHxIUkpmNPQAN6YyFGkhFAANMZmFhN+AAoAkgAiAL6ZOVUgrFFFJauIcVAbFBbHbVOoNaBNSAtNodLrACgAXgRwBif3A4BATFY6EgwhMSIov3+CU2qSWQxGl3G1i0lQORzQJ1Ir1yeE+3xWqOJgOSZMqkGykGQxBAiCYENxUNaOHanXs8MwNGSSwArCy8pBCtBihcxpYaVN+YLhaAxRLmtLZXCyNQsrlEGIppS9ddDdB6ccoGhoNRkKamDa-SARNQSLqrgayIrfZAyFJfaAQ2HnRHJmQU9TJuSietEkDUuUAOQPADyAHUAHKFuMNDjtDP6yZAA" target="_blank">Diagram</a>
%% showing function execution flow in distributed environment.
%% ```
%% title Amoc distributed
Expand All @@ -120,21 +124,22 @@ change_rate_gradually(Name, Config) ->
%% participantgroup **Master node**
%% participant Throttle process
%% end
%% box left of User: inc req rate
%%
%% box left of User: request telemetry event
%%
%% User -> *Async runner : Fun
%%
%%
%% User -> Throttle process : {schedule, Async runner PID}
%% box right of Throttle process : inc req rate
%%
%% box right of Throttle process : request telemetry event
%%
%% ==throtlling delay==
%%
%%
%% Throttle process -> Async runner: scheduled
%%
%% box left of Async runner : inc exec rate
%% abox over Async runner : Fun()
%% box right of Throttle process : execution telemetry event
%% space -5
%% box left of Async runner : execution telemetry event
%% abox over Async runner : scheduled action
%% activate Async runner
%% box right of Throttle process : inc exec rate
%% space
%% deactivate Async runner
%% Async runner ->Throttle process:'DOWN'
%% destroy Async runner
Expand Down
11 changes: 10 additions & 1 deletion src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
-define(DEFAULT_STEP_SIZE, 1).
-define(DEFAULT_INTERVAL, 60000). %% one minute
-define(TIMEOUT(N), (infinity =:= N orelse is_integer(N) andalso N >= 0)).
-define(NON_NEG_INT(N), (is_integer(N) andalso N >= 0)).
-define(POS_INT(N), (is_integer(N) andalso N > 0)).

-record(throttle_info, {
Expand Down Expand Up @@ -343,12 +344,20 @@ pool_config(infinity, _) ->
pool_config(0, _) ->
Config = #{max_n => 0, delay => infinity, status => active, pid => undefined},
maps:from_keys(lists:seq(1, no_of_processes()), Config);
pool_config(Rate, 0) ->
Config = #{max_n => Rate, delay => infinity, status => inactive, pid => undefined},
PoolConfig = #{1 := First} = maps:from_keys(lists:seq(1, no_of_processes()), Config),
PoolConfig#{1 := First#{status => active}};
pool_config(Rate, Interval) ->
NoOfProcesses = no_of_processes(),
RatePerMinutePerProcess = (60000 * Rate div Interval) div NoOfProcesses,
DelayPerProcess = (NoOfProcesses * Interval) div Rate,
Rem = ((60000 * Rate div Interval) rem NoOfProcesses)
+ ((NoOfProcesses * Interval) rem Rate),
calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem).

-spec calculate_availability(integer(), integer(), pos_integer(), integer()) -> pool_config().
calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem) ->
Fun = fun(N, {Acc, R}) ->
case {RatePerMinutePerProcess < NoOfProcesses, R} of
{true, 0} ->
Expand Down Expand Up @@ -441,7 +450,7 @@ verify_config(#{interarrival := Interarrival} = Config)
when 1 =:= map_size(Config), ?POS_INT(Interarrival) ->
#{rate => ?DEFAULT_INTERVAL div Interarrival, interval => ?DEFAULT_INTERVAL};
verify_config(#{rate := Rate, interval := Interval} = Config)
when 2 =:= map_size(Config), ?TIMEOUT(Rate), ?POS_INT(Interval) ->
when 2 =:= map_size(Config), ?TIMEOUT(Rate), ?NON_NEG_INT(Interval) ->
Config;
verify_config(#{rate := Rate} = Config)
when 1 =:= map_size(Config), ?TIMEOUT(Rate) ->
Expand Down
18 changes: 13 additions & 5 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ start_link(Name, MaxN, Delay) ->
run(Pid, RunnerPid) ->
gen_server:cast(Pid, {schedule, RunnerPid}).

%% @doc See `initial_state/1'.
%%
%% Setting the delay to infinity results in the effective pausing of the process.
-spec update(pid(), amoc_throttle:rate(), timeout()) -> ok.
update(Pid, MaxN, Delay) ->
gen_server:cast(Pid, {update, MaxN, Delay}).
Expand Down Expand Up @@ -110,11 +113,16 @@ format_status(#{state := State} = FormatStatus) ->
%% internal functions
%%------------------------------------------------------------------------------

initial_state(Name, infinity, 0) ->
#state{name = Name, max_n = infinity, delay_between_executions = 0};
initial_state(Name, MaxN, infinity) when is_integer(MaxN) ->
#state{name = Name, max_n = 0, delay_between_executions = infinity};
initial_state(Name, MaxN, Delay) when is_integer(MaxN), is_integer(Delay) ->
%% - If `Delay' is infinity, we mean to pause the process, see how at `maybe_start_timer/1'
%% a delay of infinity will set `can_run_fn = false'.
%%
%% - If `MaxN' is infinity and `Delay' is a number, we mean no limits to throttling,
%% see how `maybe_start_timer/1' will not actually start any timer
%% and `maybe_run_fn/1' with `max_n = infinity' will loop without pause.
%%
%% - If both `MaxN' and `Delay' are numbers, this will be the actual rate/interval.
-spec initial_state(Name :: atom(), MaxN :: amoc_throttle:rate(), Delay :: timeout()) -> state().
initial_state(Name, MaxN, Delay) ->
#state{name = Name, max_n = MaxN, delay_between_executions = Delay}.

merge_state(#state{delay_between_executions = D, max_n = MaxN}, #state{} = OldState) ->
Expand Down
19 changes: 17 additions & 2 deletions test/throttle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ groups() ->
low_interval_does_not_get_remapped,
start_and_stop,
change_rate,
change_rate_triggers_paralellism,
interval_equal_zero_limits_parallelism,
change_rate_to_interval_zero_limits_parallelism,
change_rate_triggers_parallelism,
change_rate_gradually,
change_interarrival_gradually,
change_rate_gradually_verify_descriptions,
Expand Down Expand Up @@ -147,7 +149,20 @@ change_rate(_) ->
?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, E3)),
?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, 200)).

change_rate_triggers_paralellism(_) ->
interval_equal_zero_limits_parallelism(_) ->
E1 = #{rate => 36, interval => 0},
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, E1)),
#{pool_config := Config0} = get_throttle_info(?FUNCTION_NAME),
?assertEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config0))).

change_rate_to_interval_zero_limits_parallelism(_) ->
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)),
E1 = #{rate => 100, interval => 0},
?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, E1)),
#{pool_config := Config0} = get_throttle_info(?FUNCTION_NAME),
?assertEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config0))).

change_rate_triggers_parallelism(_) ->
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1)),
#{pool_config := Config0} = get_throttle_info(?FUNCTION_NAME),
?assertEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config0))),
Expand Down

0 comments on commit 012f282

Please sign in to comment.