Skip to content

Commit

Permalink
Rework unlock as just a wrapper around change_rate
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Nov 28, 2024
1 parent 3df276c commit 6e2b022
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 28 deletions.
10 changes: 5 additions & 5 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,18 @@ start(Name, Rate) when is_integer(Rate) ->
pause(Name) ->
amoc_throttle_controller:pause(Name).

%% @doc Unlocks executions for the given `Name' as if `Rate' was set to `infinity'.
-spec unlock(name()) -> ok | {error, any()}.
unlock(Name) ->
amoc_throttle_controller:unlock(Name).

%% @doc Resumes the executions for the given `Name', to their original configuration value.
%%
%% It is the counterpart to the `pause/1' API, resuming the execution of what that mechanism paused.
-spec resume(name()) -> ok | {error, any()}.
resume(Name) ->
amoc_throttle_controller:resume(Name).

%% @doc Unlocks executions for the given `Name' by setting `Rate' to `infinity'.
-spec unlock(name()) -> ok | {error, any()}.
unlock(Name) ->
change_rate(Name, #{rate => infinity, interval => 0}).

%% @doc Sets the throttle `Config' for `Name' according to the given values.
-spec change_rate(name(), t() | rate()) -> ok | {error, any()}.
change_rate(Name, #{} = Config) ->
Expand Down
21 changes: 6 additions & 15 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
%% API
-export([start_link/0,
ensure_throttle_processes_started/2,
pause/1, resume/1, unlock/1, stop/1, get_info/1,
pause/1, resume/1, stop/1, get_info/1,
change_rate/2, change_rate_gradually/2,
pg_scope/0,
get_throttle_process/1,
Expand Down Expand Up @@ -46,7 +46,7 @@
-type change_rate() :: {change_rate, name(), amoc_throttle_config:config()}.
-type change_rate_gradually() ::
{change_rate_gradually, name(), amoc_throttle_config:gradual_plan()}.
-type operation() :: {pause | resume | unlock | stop, name()}.
-type operation() :: {pause | resume | stop, name()}.

%%%===================================================================
%%% API
Expand Down Expand Up @@ -94,10 +94,6 @@ pause(Name) ->
resume(Name) ->
gen_server:call(?MASTER_SERVER, {resume, Name}).

-spec unlock(name()) -> ok | {error, any()}.
unlock(Name) ->
gen_server:call(?MASTER_SERVER, {unlock, Name}).

-spec get_info(name()) -> #{_ := _} | {error, any()}.
get_info(Name) ->
gen_server:call(?MASTER_SERVER, {get_info, Name}).
Expand Down Expand Up @@ -172,7 +168,7 @@ handle_call({change_rate_gradually, Name, GradualChangeRate}, _From, State) ->
{reply, {error, {no_throttle_by_name, Name}}, State}
end;
handle_call({Op, Name}, _From, State)
when stop =:= Op; pause =:= Op; unlock =:= Op; resume =:= Op ->
when stop =:= Op; pause =:= Op; resume =:= Op ->
case State of
#{Name := Info} ->
do_run_op(Op, Name, Info, State);
Expand Down Expand Up @@ -280,19 +276,14 @@ consume_all_timer_ticks(Msg) ->
do_run_op(stop, Name, #throttle_info{pool_sup = PoolSup}, State) ->
ok = amoc_throttle_pooler:stop_pool(PoolSup),
{reply, ok, maps:remove(Name, State)};
do_run_op(pause, Name, #throttle_info{pool_config = PoolConfig} = Info, State) ->
do_run_op(pause, Name, #throttle_info{pool_config = PoolConfig, active = true} = Info, State) ->
Fun = fun(_, #{pid := Pid}) ->
amoc_throttle_process:update(Pid, 0, infinity)
end,
maps:foreach(Fun, PoolConfig),
{reply, ok, State#{Name => Info#throttle_info{active = false}}};
do_run_op(unlock, Name, #throttle_info{pool_config = PoolConfig} = Info, State) ->
Fun = fun(_, #{pid := Pid}) ->
amoc_throttle_process:update(Pid, infinity, 0)
end,
maps:foreach(Fun, PoolConfig),
{reply, ok, State#{Name => Info#throttle_info{active = true}}};
do_run_op(resume, Name, #throttle_info{pool_config = PoolConfig} = Info, State) ->

do_run_op(resume, Name, #throttle_info{pool_config = PoolConfig, active = false} = Info, State) ->
Fun = fun(_, #{max_n := MaxN, delay := Delay, pid := Pid}) ->
amoc_throttle_process:update(Pid, MaxN, Delay)
end,
Expand Down
10 changes: 2 additions & 8 deletions test/throttle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ groups() ->
wait_for_process_to_die_sends_a_kill,
async_runner_dies_while_waiting_raises_exit,
async_runner_dies_when_throttler_dies,
pause_and_resume_and_unlock,
pause_and_resume,
get_state
]}
].
Expand Down Expand Up @@ -375,7 +375,7 @@ async_runner_dies_when_throttler_dies(_) ->
amoc_throttle:stop(?FUNCTION_NAME),
?assertMatch(ok, ?RECV({'EXIT', _, {throttler_worker_died, _, _}}, 100)).

pause_and_resume_and_unlock(_) ->
pause_and_resume(_) ->
%% Start a 10-per-ms throttle
Description = #{rate => 600000},
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)),
Expand All @@ -388,12 +388,6 @@ pause_and_resume_and_unlock(_) ->
%% After resume the message is then received
?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)),
?assertMatch(ok, ?RECV(receive_this, 200)),
%% If unlocked, all messages are always received
?assertMatch(ok, amoc_throttle:unlock(?FUNCTION_NAME)),
amoc_throttle:send(?FUNCTION_NAME, receive_this_too),
?assertMatch(ok, ?RECV(receive_this_too, 200)),
%% From unlock it can resume
?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)),
State = get_throttle_info(?FUNCTION_NAME),
?assertMatch(#{rate := 600000}, State).

Expand Down

0 comments on commit 6e2b022

Please sign in to comment.