Skip to content

Commit

Permalink
Accept ranges of users in amoc_coordinator
Browse files Browse the repository at this point in the history
Sometimes we want to add some variance to the test and we want the coordinator
to choose groups of roughly N users instead of uniformly choosing all groups to
be of the exact same size. For this, we could give the coordinator a range of
the type {Min, Max} were every group will be chosen to have N in the interval
[Min, Max]. `rand:uniform/1` can be used to ensure that the distribution will be
pretty uniform, and in a sufficiently large scale, converge to N.

As a good to have, the coordinator could keep statistics of the distribution of
N in the interval.
  • Loading branch information
NelsonVides committed Dec 19, 2023
1 parent 9459926 commit 64eb304
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 20 deletions.
6 changes: 4 additions & 2 deletions guides/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ The coordinator reacts to new users showing up in a system, according to the *Co
The *Coordination Plan* consists of *Coordination Items*, and each of them is defined as one of the following: `{NumberOfUsers, CoordinationActions}`.
- When the `NumberOfUsers` is set to `all`, then only *Coordination Actions* with the arities `/1, /2` are handled.
The *Coordination Items* with `all` are triggered by the `timeout` event type.
- When the `NumberOfUsers` is set to a positive integer, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.
- When the `NumberOfUsers` is set to a positive integer or a range, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.

Note that `NumberOfUsers` can be a range, in which case a new integer within the range will be randomly selected every time the coordinator fills a batch, to ensure a non-equal but uniform distribution of coordination.

The timeout timer is reset by calling the `add` function.
A new batch size is set in the `NumberOfUsers`. Each user in the batch calls the `add` function registering to the coordinator and triggering the *Coordination Plan*.
If more then one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
If more than one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
For example if the *Coordination Plan* is `[{2, Act1}, {3, Act2}]` then on the 6th user calling `add`, `Act1` will be called with 2 users passed and `Act2` will be called with 3 users passed.

*Coordination Actions* may be one of the following:
Expand Down
28 changes: 20 additions & 8 deletions src/amoc_coordinator/amoc_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@

-type coordination_actions() :: [coordination_action()] | coordination_action().

-type coordination_item() :: {NoOfUsers :: pos_integer() | all,
coordination_actions()}.
-type range() :: {pos_integer(), pos_integer()}.
-type coordination_number_of_users() :: all | pos_integer() | range().
-type coordination_item() :: {coordination_number_of_users(), coordination_actions()}.

-type normalized_coordination_item() :: {NoOfUsers :: pos_integer() | all,
[coordination_action()]}.
-type normalized_coordination_item() :: {range() | pos_integer() | all, [coordination_action()]}.

-type coordination_plan() :: [coordination_item()] | coordination_item().

%% timeout in seconds
-type coordination_timeout_in_sec() :: pos_integer() | infinity.

-export_type([coordination_event_type/0,
-export_type([coordination_number_of_users/0,
coordination_event_type/0,
coordination_event/0,
coordination_action/0,
coordination_data/0,
Expand Down Expand Up @@ -88,8 +89,10 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
%% must be added last, so gen_event triggers it first.
AllItemsHandlers = lists:reverse([Item || {all, _} = Item <- Plan]),
[gen_event:add_handler(Name, ?MODULE, Item) || Item <- AllItemsHandlers],
[gen_event:add_handler(Name, ?MODULE, Item) || {N, _} = Item <- Plan,
is_integer(N)],
% [gen_event:add_handler(Name, ?MODULE, Item) || Item <- Plan, all =/= Item],
[gen_event:add_handler(Name, ?MODULE, Item) || {N, _} = Item <- Plan, is_integer(N)],
[gen_event:add_handler(Name, ?MODULE, Item)
|| {{N, _}, _} = Item <- Plan, is_integer(N)],
gen_event:add_handler(Name, ?MODULE, {timeout, Name, Timeout}),
ok;
{error, _} -> error
Expand Down Expand Up @@ -220,7 +223,16 @@ normalize_coordination_item({NoOfUsers, Action}) when is_function(Action) ->
normalize_coordination_item({NoOfUsers, Actions}) when ?IS_N_OF_USERS(NoOfUsers),
is_list(Actions) ->
[assert_action(NoOfUsers, A) || A <- Actions],
{NoOfUsers, Actions}.
{NoOfUsers, Actions};
normalize_coordination_item({{Min, Max}, Actions}) when ?IS_POS_INT(Min),
?IS_POS_INT(Max),
Min < Max,
is_list(Actions) ->
[assert_action(Min, A) || A <- Actions],
%% rand:uniform(N) -> 1 =< X =< N, so if for example N=2, then X can only be 1 or 2
%% As we will add X to Min, we substract 1 from Min to allow Min to be the result
%% And we make the diff one step bigger to allow for Max to be the result
{{Min - 1, Max - Min + 1}, Actions}.

assert_action(all, Action) when is_function(Action, 1);
is_function(Action, 2) ->
Expand Down
28 changes: 19 additions & 9 deletions src/amoc_coordinator/amoc_coordinator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
-type action() :: amoc_coordinator:coordination_action().
-type data() :: amoc_coordinator:coordination_data().

-record(state, {required_n = all :: pos_integer() | all,
-record(state, {configured = all :: {pos_integer(), pos_integer()} | pos_integer() | all,
required_n = all :: pos_integer() | all,
n = 0 :: non_neg_integer(),
actions = [] :: [action()],
collect_data = true :: boolean(),
accumulator = [] :: [data()]}).
acc = [] :: [data()]}).

-type state() :: #state{}.

Expand Down Expand Up @@ -53,9 +54,11 @@ add(Pid, Data) -> gen_server:cast(Pid, {add, Data}).

-spec init(amoc_coordinator:normalized_coordination_item()) -> {ok, state()}.
init({NoOfUsers, Actions}) ->
State = #state{required_n = NoOfUsers, actions = Actions},
{ok, State#state{collect_data = is_acc_required(Actions)}}.

State = #state{configured = NoOfUsers,
required_n = calculate_n(NoOfUsers),
actions = Actions,
collect_data = is_acc_required(Actions)},
{ok, State}.

-spec handle_call({reset, reset | timeout | stop}, term(), state()) ->
{reply, ok, state()} | {stop, normal, ok, state()}.
Expand Down Expand Up @@ -83,12 +86,12 @@ is_acc_required(Actions) ->
end, Actions).

-spec add_data(data(), state()) -> state().
add_data(Data, #state{n = N, accumulator = Acc} = State) ->
add_data(Data, #state{n = N, acc = Acc} = State) ->
NewState = case State#state.collect_data of
false ->
State#state{n = N + 1};
true ->
State#state{n = N + 1, accumulator = [Data | Acc]}
State#state{n = N + 1, acc = [Data | Acc]}
end,
maybe_reset_state(NewState).

Expand All @@ -99,9 +102,10 @@ maybe_reset_state(State) ->
State.

-spec reset_state(event_type(), state()) -> state().
reset_state(Event, #state{actions = Actions, accumulator = Acc, n = N} = State) ->
reset_state(Event, #state{configured = NoOfUsers, actions = Actions, acc = Acc, n = N} = State) ->
[execute_action(Action, {Event, N}, Acc) || Action <- Actions],
State#state{accumulator = [], n = 0}.
NewN = calculate_n(NoOfUsers),
State#state{required_n = NewN, n = 0, acc = []}.

-spec execute_action(action(), event(), [data()]) -> any().
execute_action(Action, Event, _) when is_function(Action, 1) ->
Expand All @@ -120,6 +124,12 @@ safe_executions(Fun, Args) ->
_:_ -> ok
end.

-spec calculate_n(amoc_coordinator:coordination_number_of_users()) -> all | pos_integer().
calculate_n({Min, Diff}) ->
Min + rand:uniform(Diff);
calculate_n(Value) ->
Value.

-spec distinct_pairs(fun((data(), data()) -> any()), [data()]) -> any().
distinct_pairs(Fun, []) ->
Fun(undefined, undefined);
Expand Down
22 changes: 21 additions & 1 deletion test/amoc_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@


all() ->
[execute_plan_without_timeout,
[execute_with_range_without_timeout,
execute_plan_without_timeout,
reset_plan_without_timeout,
execute_plan_with_timeout].

Expand Down Expand Up @@ -42,6 +43,25 @@ init_per_testcase(_, Config) ->
end_per_testcase(_Config) ->
ok.

execute_with_range_without_timeout(_Config) ->
N = 20, Name = ?FUNCTION_NAME,

Plan = [ Range = {{5, 10}, mocked_action(range, 1)},
All = {all, mocked_action(all, 1)}],

?assertEqual(ok, amoc_coordinator:start(Name, Plan, infinity)),
[amoc_coordinator:add(Name, User) || User <- lists:seq(1, N)],

amoc_coordinator:stop(Name),
meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000),

History = meck:history(?MOCK_MOD),
NumOfEvents = length(History),
?assert(3 =< NumOfEvents andalso NumOfEvents =< 6),

nothing_after_tags(History, [all]),
assert_telemetry_events(Name, [start, {N, add}, stop]).

execute_plan_without_timeout(_Config) ->
N = 4, Name = ?FUNCTION_NAME,

Expand Down

0 comments on commit 64eb304

Please sign in to comment.