diff --git a/Dockerfile b/Dockerfile index 6959c56c..2d9990a2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,20 +1,13 @@ ARG otp_vsn=25.3 -FROM erlang:${otp_vsn}-slim AS builder +FROM erlang:${otp_vsn} MAINTAINER Erlang Solutions -WORKDIR /amoc_build +WORKDIR /amoc +COPY ./ ./ -COPY ./rebar.config ./rebar.lock ./ -RUN rebar3 deps && rebar3 compile -d +RUN make clean +RUN make rel -COPY ./integration_test integration_test -COPY ./scenarios scenarios -COPY ./priv priv -COPY ./rel rel -COPY ./src src - -RUN rebar3 as demo release - -ENV PATH "/amoc_build/_build/demo/rel/amoc/bin:${PATH}" +ENV PATH "/amoc/_build/default/rel/amoc/bin:${PATH}" CMD ["amoc", "foreground"] diff --git a/Makefile b/Makefile index 2607408b..66abc406 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ -.PHONY: default rel compile clean ct test integration_test dialyzer xref console lint +.PHONY: default rel deps compile clean ct lint dialyzer xref console +.PHONY: test integration_test rerun_integration_test REBAR = rebar3 @@ -6,24 +7,30 @@ ifdef SUITE SUITE_OPTS = --suite $$SUITE endif -## this target is triggered when amoc is used as an erlang.mk dependency -default: - $(REBAR) compile +default: compile rel: - $(REBAR) as demo tar + $(REBAR) release + +deps: + $(REBAR) deps + $(REBAR) compile --deps_only compile: - $(REBAR) as demo compile + $(REBAR) compile clean: rm -rf _build - rm -rfv priv/scenarios_ebin/*.beam ct: - rm -rfv priv/scenarios_ebin/*.beam - ## eunit and ct commands always add a test profile to the run - $(REBAR) ct --verbose $(SUITE_OPTS) + ## in order to run some specific test suite(s) you can override + ## the SUITE variable from the command line or as env variable: + ## make ct SUITE=some_test_SUITE + ## make ct SUITE=some_test_SUITE,another_test_SUITE + ## SUITE=some_test_SUITE make ct + ## SUITE=some_test_SUITE,another_test_SUITE make ct + @ echo $(REBAR) ct --verbose $(SUITE_OPTS) + @ $(REBAR) ct --verbose $(SUITE_OPTS) lint: $(REBAR) as elvis lint @@ -31,27 +38,27 @@ lint: test: compile xref dialyzer ct lint integration_test: - ./integration_test/stop_demo_cluster.sh + ./integration_test/stop_test_cluster.sh ./integration_test/build_docker_image.sh - ./integration_test/start_demo_cluster.sh + ./integration_test/start_test_cluster.sh ./integration_test/test_amoc_cluster.sh ./integration_test/test_distribute_scenario.sh ./integration_test/test_run_scenario.sh ./integration_test/test_add_new_node.sh rerun_integration_test: - ./integration_test/stop_demo_cluster.sh - ./integration_test/start_demo_cluster.sh + ./integration_test/stop_test_cluster.sh + ./integration_test/start_test_cluster.sh ./integration_test/test_amoc_cluster.sh ./integration_test/test_distribute_scenario.sh ./integration_test/test_run_scenario.sh ./integration_test/test_add_new_node.sh dialyzer: - $(REBAR) as demo dialyzer + $(REBAR) dialyzer xref: - $(REBAR) as demo xref + $(REBAR) xref console: @echo "tests can be executed manually using ct:run/1 function:\n" \ diff --git a/elvis.config b/elvis.config index 1bf592c2..c3a56f9a 100644 --- a/elvis.config +++ b/elvis.config @@ -4,7 +4,8 @@ filter => "*.erl", ruleset => erl_files, rules => [ - {elvis_style, invalid_dynamic_call, #{ignore => [amoc_user]}}, + {elvis_style, invalid_dynamic_call, + #{ignore => [amoc_user, {amoc_code_server, get_md5, 1}]}}, {elvis_style, export_used_types, disable}, {elvis_style, no_throw, #{ignore => [{amoc_config, get, 2}] }}, {elvis_text_style, line_length, #{skip_comments => whole_line }}, @@ -14,10 +15,14 @@ filter => "*.erl", ruleset => erl_files, rules => [ - {elvis_style, function_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*$"}}, - {elvis_style, atom_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$"}}, + {elvis_style, function_naming_convention, + #{regex => "^[a-z]([a-z0-9]*_?)*$"}}, + {elvis_style, atom_naming_convention, + #{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$"}}, + {elvis_style, invalid_dynamic_call, #{ignore => [amoc_code_server_SUITE]}}, {elvis_style, dont_repeat_yourself, #{min_complexity => 50}}, {elvis_style, no_debug_call, disable}, + {elvis_style, no_block_expressions, #{ignore => [amoc_code_server_SUITE]}}, {elvis_style, no_throw, disable}, {elvis_style, no_import, disable} ]}, diff --git a/guides/coordinator.md b/guides/coordinator.md index c1a7d74d..ad24c2d3 100644 --- a/guides/coordinator.md +++ b/guides/coordinator.md @@ -37,7 +37,7 @@ It’s guaranteed that all the *Coordination Actions* with `all` are executed af ## Example -This scenario will demonstrate how do the `users` interact with `amoc_coordinator`: +This scenario shows how the `users` interact with `amoc_coordinator`: ```erlang -module(example). @@ -114,7 +114,7 @@ User = 3 Three new users showed up User = 4 -% We have 4 and 4 rem 2 == 0 therefore users added to amoc_coordinator so all of the {3, _} actions are triggered: +% We have 4 and 4 rem 2 == 0 therefore users added to amoc_coordinator so all of the {2, _} actions are triggered: Two new users showed up: Event = {coordinate,2}; User1 = {<0.1144.0>,4}; User2 = {<0.1143.0>,3} Two new users showed up: Event = {coordinate,2}; ListOfUsers = [{<0.1144.0>,4},{<0.1143.0>,3}] Two new users showed up: Event = {coordinate,2} diff --git a/guides/local-run.md b/guides/local-run.md index 4fcd277e..80273c75 100644 --- a/guides/local-run.md +++ b/guides/local-run.md @@ -2,7 +2,7 @@ Everything you need to do is to create the release. To achieve that run: `make rel`. Now you are ready to test our scenario locally with one Amoc node; -to start the node run `_build/demo/rel/amoc/bin/amoc console`. +to start the node run `_build/default/rel/amoc/bin/amoc console`. Start `my_scenario` spawning 10 amoc users with IDs from range (1,10) inclusive. ```erlang diff --git a/guides/scenario.md b/guides/scenario.md index 183305b4..19db3adf 100644 --- a/guides/scenario.md +++ b/guides/scenario.md @@ -6,11 +6,13 @@ It has to export two callback functions: - `init/0` - called only once per test run on every node, at the very beginning. It can be used for setting up initial (global) state: metrics, database connections, etc. It can return an `ok` or a tuple `{ok, State}` from which the `State` may be passed to every user. -- `start/1` or `start/2` - describes the actual scenario and is executed for +- `start/1` or `start/2` - implements the actual scenario and is executed for each user, in the context of that user's process. The first argument is the given user's unique integer id. The second, which is optional, is the state, as returned from the `init/0` function. -When the `start` function returns, it is executed again after some delay (60 seconds by default). + +In addition to that, the scenario module can implement a `terminate/0,1` callback; +it has one optional argument – the scenario state, as returned from the `init/0` function. A typical scenario file will look like this: diff --git a/guides/telemetry.md b/guides/telemetry.md index 108ff78a..1029d132 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -20,9 +20,20 @@ metadata: #{type => add | remove} ## Throttle +### Init + +Raised when a throttle mechanism is initialised. + +```erlang +event_name: [amoc, throttle, init] +measurements: #{count => 1} +metadata: #{name => atom()} +``` + ### Rate Raised when a throttle mechanism is initialised or its configured rate is changed. +This event is raised only on the master node. ```erlang event_name: [amoc, throttle, rate] @@ -50,13 +61,13 @@ measurements: #{count => 1} metadata: #{name => atom()} ``` -## Coordinate +## Coordinator -Indicates when a coordinating event was raised, like a callback index being reached or a timeout being triggered +Indicates when a coordinating event was raised, like a process being added for coordination or a timeout being triggered ### Event ```erlang -event_name: [amoc, coordinator, event] +event_name: [amoc, coordinator, start | stop | add | reset | timeout] measurements: #{count => 1} -metadata: #{type => atom()} +metadata: #{name => atom()} ``` diff --git a/integration_test/README.md b/integration_test/README.md index 981b0e88..4f2abac0 100644 --- a/integration_test/README.md +++ b/integration_test/README.md @@ -9,35 +9,33 @@ In the Amoc repo root directory run: `./integration_test/build_docker_image.sh` -This command builds docker image `amoc:latest`. +This command builds the `amoc:latest` docker image. -### 2. Start amoc demo cluster +### 2. Start amoc test cluster -`./integration_test/start_demo_cluster.sh` +`./integration_test/start_test_cluster.sh` -More information about the demo cluster can be found further in this document. +This command requires the `amoc:latest` docker image to exist. More information about the test cluster can be found further in this document. -### 3. Check the amoc clustering is done properly +### 3. Check that clustering is done properly `./integration_test/test_amoc_cluster.sh` -This command verifies that clustering is done properly and metrics are reported -from all the nodes. +This command verifies that clustering is done properly. -### 4. Test installation of user scenario in amoc cluster +### 4. Test distribution of a custom scenario in amoc cluster `./integration_test/test_distribute_scenario.sh` - -This command uploads a sample `dummy_scenario.erl` on the `amoc-master` node -using `curl` and after uploading verifies that scenario is propagated to the -worker nodes. -### 5. Run the uploaded scenario. +This command checks distribution of the sample `dummy_scenario.erl` from the `amoc-master` node + to the worker nodes. + +### 5. Run the distributed scenario. `./integration_test/test_run_scenario.sh` -This command starts execution of `dummy_scenario.erl` scenario (must be uploaded -before this action) +This command starts execution of `dummy_scenario.erl` scenario (it must be distributed +prior to this action) ### 6. Add additional node to the cluster @@ -49,32 +47,31 @@ when the new amoc node joins. ### 7. Cleanup -To stop Amoc demo cluster run: +To stop Amoc test cluster run: -`./integration_test/stop_demo_cluster.sh` +`./integration_test/stop_test_cluster.sh` -## Demo cluster +## Test cluster -To start the demo cluster you can run these commands: +* To start the test cluster you can run these commands: ``` ./integration_test/build_docker_image.sh -./integration_test/start_demo_cluster.sh +./integration_test/start_test_cluster.sh ``` -When the demo cluster is up and running you can access its -different components using the following addresses: - * Amoc Swagger UI: - * [amoc-master](http://localhost:4000/api-docs/) - * [amoc-worker-1](http://localhost:4001/api-docs/) - * [amoc-worker-2](http://localhost:4002/api-docs/) - * [graphite](http://localhost:8080/) web interface - * [grafana](http://localhost:3000/) - default username and password is `admin`/`admin` +* To get the list of nodes in the amoc test cluster use the following command: + +`docker compose -p "amoc-test-cluster" ps` + +* To check the most recent `amoc-master` logs you can run this command: + +`docker compose -p "amoc-test-cluster" logs --tail=100 amoc-master` -To check the most recent `amoc-master` logs you can run this command: +* In order to attach to the `amoc-master` erlang node run the following command: -`docker-compose -p "amoc-demo-cluster" logs --tail=100 amoc-master` +`docker compose -p "amoc-test-cluster" exec amoc-master amoc remote_console` -To attach to `amoc-master` node use the following command: +* To open a shell inside the `amoc-master` container use this command: -`docker-compose -p "amoc-demo-cluster" exec amoc-master /home/amoc/amoc/bin/amoc remote_console` +`docker compose -p "amoc-test-cluster" exec amoc-master bash` diff --git a/integration_test/build_docker_image.sh b/integration_test/build_docker_image.sh index b44b9917..3905026a 100755 --- a/integration_test/build_docker_image.sh +++ b/integration_test/build_docker_image.sh @@ -7,4 +7,8 @@ cd "$git_root" otp_vsn="${OTP_RELEASE:-25.3}" echo "ERLANG/OTP '${otp_vsn}'" -docker_compose build --build-arg otp_vsn=${otp_vsn} +docker build \ + -f Dockerfile \ + -t amoc:latest \ + --build-arg "otp_vsn=${otp_vsn}" \ + . diff --git a/integration_test/docker-compose.yml b/integration_test/docker-compose.yml index 354570c3..23e9fa11 100644 --- a/integration_test/docker-compose.yml +++ b/integration_test/docker-compose.yml @@ -1,7 +1,5 @@ x-amoc-defaults: &amoc-defaults - build: - context: ../ - dockerfile: Dockerfile + image: "amoc:latest" networks: - amoc-test-network volumes: @@ -10,7 +8,6 @@ x-amoc-defaults: &amoc-defaults target: /extra_code_paths environment: AMOC_NODES: "['amoc@amoc-master']" - AMOC_EXTRA_CODE_PATHS: '["/extra_code_paths/test1", "/extra_code_paths/test2"]' healthcheck: test: "amoc status" @@ -18,6 +15,8 @@ services: amoc-master: <<: *amoc-defaults hostname: "amoc-master" + environment: + AMOC_EXTRA_CODE_PATHS: '["/extra_code_paths/path1", "/extra_code_paths/path2"]' amoc-worker-1: &amoc-worker <<: *amoc-defaults hostname: "amoc-worker-1" diff --git a/integration_test/dummy_helper.erl b/integration_test/extra_code_paths/path1/dummy_helper.erl similarity index 100% rename from integration_test/dummy_helper.erl rename to integration_test/extra_code_paths/path1/dummy_helper.erl diff --git a/integration_test/dummy_scenario.erl b/integration_test/extra_code_paths/path2/dummy_scenario.erl similarity index 100% rename from integration_test/dummy_scenario.erl rename to integration_test/extra_code_paths/path2/dummy_scenario.erl diff --git a/integration_test/extra_code_paths/test1/.gitignore b/integration_test/extra_code_paths/test1/.gitignore deleted file mode 100644 index 212ec3df..00000000 --- a/integration_test/extra_code_paths/test1/.gitignore +++ /dev/null @@ -1,10 +0,0 @@ -## this directory is a placeholder for testing -## AMOC_EXTRA_CODE_PATHS configuration option - -## the purpose of this .gitignore file is to ensure -## that such data is not commiteded accidentaly to git - -# Ignore all files in this dir ... -* -# ... except for this one. -!.gitignore \ No newline at end of file diff --git a/integration_test/extra_code_paths/test2/.gitignore b/integration_test/extra_code_paths/test2/.gitignore deleted file mode 100644 index 212ec3df..00000000 --- a/integration_test/extra_code_paths/test2/.gitignore +++ /dev/null @@ -1,10 +0,0 @@ -## this directory is a placeholder for testing -## AMOC_EXTRA_CODE_PATHS configuration option - -## the purpose of this .gitignore file is to ensure -## that such data is not commiteded accidentaly to git - -# Ignore all files in this dir ... -* -# ... except for this one. -!.gitignore \ No newline at end of file diff --git a/integration_test/helper.sh b/integration_test/helper.sh index 7186ae32..22f41a5e 100644 --- a/integration_test/helper.sh +++ b/integration_test/helper.sh @@ -10,16 +10,13 @@ function enable_strict_mode() { IFS=$'\n\t' } -function create_code_path() { - dir="${git_root}/integration_test/extra_code_paths/${1}" - [ -d "$dir" ] || return 1 - erl_file="${dir}/${1}.erl" - dummy_scenario="${git_root}/integration_test/dummy_scenario.erl" - sed "s/-module(.*)./-module(${1})./" "$dummy_scenario" > "$erl_file" - erlc -o "$dir" "$erl_file" +function compile_file() { + local erl_file="$(realpath "$1")" + local output_dir="$(dirname "$erl_file")" + erlc -o "$output_dir" "$erl_file" } -function contain() { +function contains() { local pipeline='cat -' for pattern in "$@"; do pipeline+=" | tee >(grep -q -e \"$pattern\"; echo \"+\$?\")" @@ -29,6 +26,16 @@ function contain() { test "$(($output))" -eq 0 } +function doesnt_contain() { + local pipeline='cat -' + for pattern in "$@"; do + pipeline+=" | tee >(grep -q -v -e \"$pattern\"; echo \"+\$?\")" + done + pipeline+=' >/dev/null' + local output="$(eval "$pipeline")" + test "$(($output))" -eq 0 +} + function wait_for_cmd() { local timeout="${1:-0}" local cmd="${2:-true}" @@ -53,7 +60,7 @@ function wait_for_cmd() { ###################### docker_compose() { local compose_file="${git_root}/integration_test/docker-compose.yml" - docker compose -p "amoc-demo-cluster" -f "$compose_file" "$@" + docker compose -p "amoc-test-cluster" -f "$compose_file" "$@" } function amoc_eval() { @@ -64,7 +71,7 @@ function amoc_eval() { } function container_is_healthy() { - docker_compose ps $1 | contain "healthy" + docker_compose ps $1 | contains "healthy" } function wait_for_healthcheck() { @@ -72,16 +79,6 @@ function wait_for_healthcheck() { wait_for_cmd 60 container_is_healthy "$container" } -function metrics_reported() { - local graphite_query="target=summarize(*.amoc.users.size,'1hour','max')&from=-1h&format=json" - local result="$(curl -s "http://localhost:8080/render/?${graphite_query}")" - echo "$result" | contain "$@" -} - -function wait_for_metrics() { - wait_for_cmd 60 metrics_reported "$@" -} - ###################### ## common variables ## ###################### diff --git a/integration_test/start_demo_cluster.sh b/integration_test/start_test_cluster.sh similarity index 61% rename from integration_test/start_demo_cluster.sh rename to integration_test/start_test_cluster.sh index f750a294..344ad9b4 100755 --- a/integration_test/start_demo_cluster.sh +++ b/integration_test/start_test_cluster.sh @@ -3,8 +3,8 @@ source "$(dirname "$0")/helper.sh" enable_strict_mode -create_code_path test1 -create_code_path test2 +compile_file integration_test/extra_code_paths/path1/dummy_helper.erl +compile_file integration_test/extra_code_paths/path2/dummy_scenario.erl docker_compose up -d amoc-{master,worker-1,worker-2} diff --git a/integration_test/stop_demo_cluster.sh b/integration_test/stop_test_cluster.sh similarity index 100% rename from integration_test/stop_demo_cluster.sh rename to integration_test/stop_test_cluster.sh diff --git a/integration_test/test_add_new_node.sh b/integration_test/test_add_new_node.sh index f5a8ade1..3cde287c 100755 --- a/integration_test/test_add_new_node.sh +++ b/integration_test/test_add_new_node.sh @@ -6,7 +6,7 @@ enable_strict_mode docker_compose up -d amoc-worker-3 wait_for_healthcheck amoc-worker-3 -amoc_eval amoc-worker-3 "amoc_controller:get_status()." | contain dummy_scenario running -amoc_eval amoc-worker-3 "binary_to_list(amoc_config:get(test))." | contain "test_value" -amoc_eval amoc-worker-3 "dummy_helper:test_amoc_dist()." | contain 'amoc_dist_works_as_expected' +amoc_eval amoc-worker-3 "amoc_controller:get_status()." | contains dummy_scenario running +amoc_eval amoc-worker-3 "binary_to_list(amoc_config:get(test))." | contains "test_value" +amoc_eval amoc-worker-3 "dummy_helper:test_amoc_dist()." | contains 'amoc_dist_works_as_expected' echo "amoc_dist_works_as_expected" diff --git a/integration_test/test_amoc_cluster.sh b/integration_test/test_amoc_cluster.sh index 9914dbb1..99bc5da7 100755 --- a/integration_test/test_amoc_cluster.sh +++ b/integration_test/test_amoc_cluster.sh @@ -4,10 +4,14 @@ source "$(dirname "$0")/helper.sh" enable_strict_mode echo "checking that clustering is done properly" -amoc_eval amoc-master "nodes()." | contain amoc-worker-1 amoc-worker-2 -amoc_eval amoc-worker-1 "nodes()." | contain amoc-master amoc-worker-2 -amoc_eval amoc-worker-2 "nodes()." | contain amoc-master amoc-worker-1 +amoc_eval amoc-master "nodes()." | contains amoc-worker-1 amoc-worker-2 +amoc_eval amoc-worker-1 "nodes()." | contains amoc-master amoc-worker-2 +amoc_eval amoc-worker-2 "nodes()." | contains amoc-master amoc-worker-1 echo "checking that AMOC_EXTRA_CODE_PATHS setting works as expected" -amoc_eval amoc-master "amoc_scenario:does_scenario_exist(test1)." | contain true -amoc_eval amoc-master "amoc_scenario:does_scenario_exist(test2)." | contain true +amoc_eval amoc-master "amoc_code_server:list_scenario_modules()." | contains dummy_scenario +amoc_eval amoc-master "amoc_code_server:list_configurable_modules()." | contains dummy_helper +amoc_eval amoc-worker-1 "amoc_code_server:list_scenario_modules()." | doesnt_contain dummy_scenario +amoc_eval amoc-worker-1 "amoc_code_server:list_configurable_modules()." | doesnt_contain dummy_helper +amoc_eval amoc-worker-2 "amoc_code_server:list_scenario_modules()." | doesnt_contain dummy_scenario +amoc_eval amoc-worker-2 "amoc_code_server:list_configurable_modules()." | doesnt_contain dummy_helper diff --git a/integration_test/test_distribute_scenario.sh b/integration_test/test_distribute_scenario.sh index cac3485c..57ebcf1f 100755 --- a/integration_test/test_distribute_scenario.sh +++ b/integration_test/test_distribute_scenario.sh @@ -4,44 +4,73 @@ source "$(dirname "$0")/helper.sh" enable_strict_mode cd "${git_root}/integration_test" -scenario_name="dummy_scenario" +modules=( "dummy_scenario" "dummy_helper" ) -############################# -## amoc REST API functions ## -############################# function get_scenarios() { - amoc_eval "$1" "amoc_scenario:list_scenario_modules()." + amoc_eval "$1" "amoc_code_server:list_scenario_modules()." } -function list_scenarios_by_port() { - local result="$(get_scenarios "$1")" - echo "Scenarios on the ${1} node: ${result}" +function get_helpers() { + amoc_eval "$1" "amoc_code_server:list_configurable_modules()." } -function ensure_scenarios_installed() { - local result="$(get_scenarios "$1")" - echo "Scenarios on the ${1} node: ${result}" +function list_scenarios_and_helpers() { + local scenarios="$(get_scenarios "$1")" + local helpers="$(get_helpers "$1")" + echo "Scenarios on the ${1} node: ${scenarios}" + echo "Configurable helpers on the ${1} node: ${helpers}" +} + +function erlang_list() { + local ret=( "[" ) + local element + if [ "$#" -gt 0 ]; then + ret+=( "$1" ) + shift 1 + for element in "$@"; do + ret+=( "," "$element" ) + done + fi + ret+=( "]" ) + echo "${ret[@]}" +} + +function ensure_modules_loaded() { + local node="$1" shift 1 - echo "$result" | contain "$@" + local modules="$(erlang_list "$@")" + amoc_eval "$node" "[code:ensure_loaded(M) || M <- ${modules}]." } -function upload_module() { - local filename="$2" - docker_compose cp "$2" "${1}:/tmp/erlang_module" - eval_cmd=( "{ok, FileContent} = file:read_file(\"/tmp/erlang_module\")," - "amoc_scenario:install_module(${filename%.erl}, FileContent)." ) - amoc_eval "${1}" "${eval_cmd[*]}" +function add_module() { + local node="${1}" + local module + shift 1 + for module in "$@"; do + echo "adding module '${module}' for distribution from the node '${node}'" + amoc_eval "${node}" "amoc_code_server:add_module(${module})." + done } -list_scenarios_by_port amoc-master -list_scenarios_by_port amoc-worker-1 -list_scenarios_by_port amoc-worker-2 +function distribute_modules() { + amoc_eval "${1}" "amoc_code_server:distribute_modules('amoc@${2}')." +} + +ensure_modules_loaded amoc-master "${modules[@]}" | contains "${modules[@]}" +ensure_modules_loaded amoc-worker-1 "${modules[@]}" | doesnt_contain "${modules[@]}" +ensure_modules_loaded amoc-worker-2 "${modules[@]}" | doesnt_contain "${modules[@]}" + +list_scenarios_and_helpers amoc-worker-2 | doesnt_contain "${modules[@]}" +list_scenarios_and_helpers amoc-worker-1 | doesnt_contain "${modules[@]}" + +echo "Distributing scenario and helper module from the amoc-master node" +## amoc_controller is added to the list as an example of module +## that already exists on all the slave amoc nodes +add_module amoc-master "${modules[@]}" amoc_controller +distribute_modules amoc-master amoc-worker-1 | contains "${modules[@]}" amoc_controller -echo "Installing scenario and helper module on the amoc-master node" -scenario_put="$(upload_module amoc-master "${scenario_name}.erl")" -echo "Response for '${scenario_name}.erl': ${scenario_put}" -helper_put="$(upload_module amoc-master "dummy_helper.erl")" -echo "Response for 'dummy_helper.erl': ${helper_put}" +ensure_modules_loaded amoc-worker-1 "${modules[@]}" | contains "${modules[@]}" +ensure_modules_loaded amoc-worker-2 "${modules[@]}" | doesnt_contain "${modules[@]}" -ensure_scenarios_installed amoc-worker-1 ${scenario_name} -ensure_scenarios_installed amoc-worker-2 ${scenario_name} +list_scenarios_and_helpers amoc-worker-1 | contains "${modules[@]}" +list_scenarios_and_helpers amoc-worker-2 | doesnt_contain "${modules[@]}" diff --git a/integration_test/test_run_scenario.sh b/integration_test/test_run_scenario.sh index 768305de..c5735dbb 100755 --- a/integration_test/test_run_scenario.sh +++ b/integration_test/test_run_scenario.sh @@ -14,7 +14,7 @@ result="$(run_scenario amoc-master dummy_scenario 10)" echo "$result" -if echo "$result" | contain "ok" "'amoc@amoc-worker-1'" "'amoc@amoc-worker-2'" ; then +if echo "$result" | contains "ok" "'amoc@amoc-worker-1'" "'amoc@amoc-worker-2'" ; then echo "Scenario executed" exit 0 else diff --git a/priv/scenarios/.gitignore b/priv/scenarios/.gitignore deleted file mode 100644 index 2a6629d4..00000000 --- a/priv/scenarios/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -## This directory should remain empty. -## Ignore everything except this file. -* -!.gitignore diff --git a/priv/scenarios_ebin/.gitignore b/priv/scenarios_ebin/.gitignore deleted file mode 100644 index 2a6629d4..00000000 --- a/priv/scenarios_ebin/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -## This directory should remain empty. -## Ignore everything except this file. -* -!.gitignore diff --git a/rebar.config b/rebar.config index 101139b9..96c863d1 100644 --- a/rebar.config +++ b/rebar.config @@ -1,39 +1,40 @@ -{ erl_opts, [ +{erl_opts, [ debug_info, - warn_missing_spec]}. + warn_missing_spec +]}. -{ deps, [ +{deps, [ {telemetry, "1.2.1"} ]}. -{ profiles, [ +{profiles, [ {test, [ {deps, [ {meck, "0.9.2"}, {proper, "1.4.0"}, - {fusco, "0.1.1"} + {bbmustache, "1.12.2"} ]} ]}, - {elvis, [{plugins, [{rebar3_lint, "3.0.1"}]}]}, - {demo, [ - {erl_opts, [debug_info, {src_dirs, ["src", "scenarios"]}]}, - {relx, [ - {release, {amoc, git}, [amoc, runtime_tools]}, - {debug_info, keep}, - {include_src, true}, - {include_erts, true}, - {dev_mode, false}, - {extended_start_script, true}, - {sys_config, "rel/app.config"} - ]}]} + {elvis, [{plugins, [{rebar3_lint, "3.0.1"}]}]} +]}. + +{relx, [ + {release, {amoc, git}, [amoc, runtime_tools]}, + {debug_info, keep}, + {include_src, true}, + {include_erts, true}, + {dev_mode, false}, + {extended_start_script, true}, + {sys_config, "rel/app.config"} ]}. -{ xref_checks, [ +{xref_checks, [ undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, - deprecated_functions]}. + deprecated_functions +]}. {dialyzer, [ {warnings, [unknown]} @@ -41,16 +42,18 @@ {ex_doc, [ {source_url, <<"https://github.com/esl/amoc">>}, - {extras, [{'README.md', #{title => <<"A Murder of Crows">>}}, - {'guides/scenario.md', #{title => <<"Developing a scenario">>}}, - {'guides/local-run.md', #{title => <<"Running locally">>}}, - {'guides/configuration.md', #{title => <<"Configuration">>}}, - {'guides/distributed.md', #{title => <<"Setting up a distributed environment">>}}, - {'guides/distributed-run.md', #{title => <<"Running a load test">>}}, - {'guides/telemetry.md', #{title => <<"Telemetry events">>}}, - {'guides/throttle.md', #{title => <<"Amoc throttle">>}}, - {'guides/coordinator.md', #{title => <<"Amoc coordinator">>}}, - {'LICENSE', #{title => <<"License">>}}]}, + {extras, [ + {'README.md', #{title => <<"A Murder of Crows">>}}, + {'guides/scenario.md', #{title => <<"Developing a scenario">>}}, + {'guides/local-run.md', #{title => <<"Running locally">>}}, + {'guides/configuration.md', #{title => <<"Configuration">>}}, + {'guides/distributed.md', #{title => <<"Setting up a distributed environment">>}}, + {'guides/distributed-run.md', #{title => <<"Running a load test">>}}, + {'guides/telemetry.md', #{title => <<"Telemetry events">>}}, + {'guides/throttle.md', #{title => <<"Amoc throttle">>}}, + {'guides/coordinator.md', #{title => <<"Amoc coordinator">>}}, + {'LICENSE', #{title => <<"License">>}} + ]}, {assets, <<"guides/assets">>}, {main, <<"readme">>} ]}. diff --git a/scenarios/parallel_throttle_test.erl b/scenarios/parallel_throttle_test.erl deleted file mode 100644 index 22f77ac3..00000000 --- a/scenarios/parallel_throttle_test.erl +++ /dev/null @@ -1,96 +0,0 @@ -%%============================================================================== -%% Copyright 2019 Erlang Solutions Ltd. -%% Licensed under the Apache License, Version 2.0 (see LICENSE file) -%%============================================================================== -%% @doc -%% This scenario demonstrates amoc_throttle execution rate limiting -%% functionality. You might be interested in the next telemetry events: -%% - [amoc, controller, users], #{count := non_neg_integer()}, #{} -%% - [amoc, throttle, rate], #{rate := non_neg_integer()}, #{name := parallel_testing} -%% - [amoc, max_scheduled], #{value := non_neg_integer()}, #{} -%% - [amoc, current_scheduled], #{value := non_neg_integer()}, #{} -%% @end -%%============================================================================== --module(parallel_throttle_test). - -%% API --behaviour(amoc_scenario). --export([start/1, init/0]). - --define(PARALLEL_EXECUTION_TEST, parallel_testing). --define(METRICS_PROC_NAME, max_counter_value). --define(METRICS_PROC, {?METRICS_PROC_NAME, amoc_cluster:master_node()}). - --spec init() -> ok. -init() -> - %% not more than 20 simultaneous executions - amoc_throttle:start(?PARALLEL_EXECUTION_TEST, 20, 0, 3), - ok. - --spec start(amoc_scenario:user_id()) -> any(). -start(1) -> - spawn(fun change_throttle_rate/0), - start_metrics_on_master_node(), - parallel_execution_scenario(); -start(_Id) -> parallel_execution_scenario(). - -%%------------------------------------------------------------------- -%% Internal functions -%%------------------------------------------------------------------- - -%%%%%%%%%%%%%%%%%%%%%%%% -%% user scenario loop %% -%%%%%%%%%%%%%%%%%%%%%%%% -parallel_execution_scenario() -> - Pid = ?METRICS_PROC, - Self = self(), - amoc_throttle:run(?PARALLEL_EXECUTION_TEST, - fun() -> - Pid ! {delta, 1}, - timer:sleep(30000), - Pid ! {delta, -1}, - Self ! next - end), - receive next -> ok end, - parallel_execution_scenario(). - -%%%%%%%%%%%%%%%%%%%%%%%%%% -%% change throttle rate %% -%%%%%%%%%%%%%%%%%%%%%%%%%% -change_throttle_rate() -> - %% we start with 20 parallel executions, each execution takes 30 sec. - %% so it's 40 executions per minute - timer:sleep(200000), - %% 0 executions per minute - amoc_throttle:pause(?PARALLEL_EXECUTION_TEST), - timer:sleep(150000), - %% back to 20 parallel executions (40 executions per minute) - amoc_throttle:resume(?PARALLEL_EXECUTION_TEST), - timer:sleep(100000), - %% switch to 20 executions per minute rate - amoc_throttle:change_rate(?PARALLEL_EXECUTION_TEST, 20, 60000), - timer:sleep(150000), - %% 30 parallel executions (60 executions per minute) - amoc_throttle:change_rate(?PARALLEL_EXECUTION_TEST, 30, 0). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Manual metrics counting on master node %% -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -start_metrics_on_master_node() -> - MasterNode = amoc_cluster:master_node(), - spawn(MasterNode, fun max_counter_value/0). - -max_counter_value() -> - erlang:register(?METRICS_PROC_NAME, self()), - max_counter_value(0, 0). - -max_counter_value(Gauge, MaxGauge) when Gauge =:= 0 andalso MaxGauge =/= 0; - Gauge > MaxGauge -> - telemetry:execute([amoc, max_scheduled], #{value => Gauge}, #{}), - max_counter_value(Gauge, Gauge); -max_counter_value(Gauge, MaxGauge) -> - receive - {delta, N} -> - telemetry:execute([amoc, current_scheduled], #{value => Gauge + N}, #{}), - max_counter_value(Gauge + N, MaxGauge) - end. diff --git a/scenarios/rate_throttle_test.erl b/scenarios/rate_throttle_test.erl deleted file mode 100644 index ac489eac..00000000 --- a/scenarios/rate_throttle_test.erl +++ /dev/null @@ -1,93 +0,0 @@ -%%============================================================================== -%% Copyright 2019 Erlang Solutions Ltd. -%% Licensed under the Apache License, Version 2.0 (see LICENSE file) -%%============================================================================== -%% @doc -%% This scenario demonstrates amoc_throttle execution rate limiting -%% functionality. You might be interested in the next metrics: -%% - [amoc, controller, users], #{count := non_neg_integer()}, #{} -%% - [amoc, throttle, rate], #{rate := non_neg_integer()}, #{name := testing} -%% - [amoc, throttle, execute], #{count := non_neg_integer()}, #{name := testing} -%% - [amoc, scheduled_per_minute], #{value := non_neg_integer()}, #{} -%% @end -%%============================================================================== --module(rate_throttle_test). - -%% API --behaviour(amoc_scenario). --export([start/1, init/0]). - --define(RATE_CHANGE_TEST, testing). --define(METRICS_PROC_NAME, count_per_minute). --define(METRICS_PROC, {?METRICS_PROC_NAME, amoc_cluster:master_node()}). - --spec init() -> ok. -init() -> - amoc_throttle:start(?RATE_CHANGE_TEST, 20000, 120000, 20), %% 20k per 2 min - ok. - --spec start(amoc_scenario:user_id()) -> any(). -start(1) -> - spawn(fun change_throttle_rate/0), - start_metrics_on_master_node(), - rate_change_scenario(); -start(_Id) -> rate_change_scenario(). - -%%------------------------------------------------------------------- -%% Internal functions -%%------------------------------------------------------------------- - -%%%%%%%%%%%%%%%%%%%%%%%% -%% user scenario loop %% -%%%%%%%%%%%%%%%%%%%%%%%% -rate_change_scenario() -> - amoc_throttle:send_and_wait(?RATE_CHANGE_TEST, some_msg), - ?METRICS_PROC ! inc, - rate_change_scenario(). - -%%%%%%%%%%%%%%%%%%%%%%%%%% -%% change throttle rate %% -%%%%%%%%%%%%%%%%%%%%%%%%%% -change_throttle_rate() -> - timer:sleep(200000), - amoc_throttle:change_rate_gradually(?RATE_CHANGE_TEST, - 1750, 21500, 60000, 200000, 4), - timer:sleep(950000), - amoc_throttle:pause(?RATE_CHANGE_TEST), - timer:sleep(100000), - amoc_throttle:change_rate_gradually(?RATE_CHANGE_TEST, - 20000, 1750, 60000, 200000, 2), - timer:sleep(100000), - amoc_throttle:resume(?RATE_CHANGE_TEST), - timer:sleep(480000), % 8 mins - amoc_throttle:change_rate(?RATE_CHANGE_TEST, 20019, 60000). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Manual metrics counting on master node %% -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -start_metrics_on_master_node() -> - MasterNode = amoc_cluster:master_node(), - spawn(MasterNode, fun count_per_minute/0). - -count_per_minute() -> - %% !!! this code runs only on the master amoc node !!! - %% sometimes exometer shows execution rate higher than - %% expected, however that is not true. it's caused by - %% the way how exometer internally calculates spiral - %% metrics. below is the simple algorithm for manual - %% spiral metric calculation, use 'scheduled_per_minute' - %% metric to ensure that amoc_throttle is not crossing - %% the upper execution rate boundary - erlang:register(?METRICS_PROC_NAME, self()), - erlang:send_after(60000, self(), one_minute), - count_per_minute(0). - -count_per_minute(N) -> - receive - inc -> - count_per_minute(N + 1); - one_minute -> - erlang:send_after(60000, self(), one_minute), - telemetry:execute([amoc, scheduled_per_minute], #{value => N}, #{}), - count_per_minute(0) - end. diff --git a/src/amoc_code_server.erl b/src/amoc_code_server.erl new file mode 100644 index 00000000..6819c107 --- /dev/null +++ b/src/amoc_code_server.erl @@ -0,0 +1,279 @@ +%%============================================================================== +%% Copyright 2023 Erlang Solutions Ltd. +%% Licensed under the Apache License, Version 2.0 (see LICENSE file) +%%============================================================================== +-module(amoc_code_server). +%% API +-export([start_link/0, + add_module/1, + distribute_modules/1, + does_scenario_exist/1, + list_scenario_modules/0, + list_configurable_modules/0]). + +-ifdef(TEST). + +-export([upload_module/2, + uploaded_module_to_map/1, + map_to_uploaded_module/1]). + +-define(RECORD2MAP(RecordName), + (fun(#RecordName{} = Record) -> + RecordFields = lists:zip(lists:seq(2, record_info(size, RecordName)), + record_info(fields, RecordName)), + BasicMap = #{'$RECORD_NAME' => RecordName}, + lists:foldl(fun({FieldPos, FieldName}, Map) -> + Map#{FieldName => erlang:element(FieldPos, Record)} + end, + BasicMap, RecordFields) + end)). + +-define(MAP2RECORD(RecordName), + (fun(#{'$RECORD_NAME' := RecordName} = Map) -> + BasicRecord = #RecordName{}, + RecordFields = + lists:zip3(lists:seq(2, tuple_size(BasicRecord)), + record_info(fields, RecordName), + tl(tuple_to_list(BasicRecord))), + lists:foldl(fun({FieldPos, FieldName, DefaultValue}, Record) -> + Value = maps:get(FieldName, Map, DefaultValue), + setelement(FieldPos, Record, Value) + end, + BasicRecord, RecordFields) + end)). + +-endif. + +-behaviour(gen_server). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2]). + +%% renaming the fields of the uploaded_module record will +%% break most test cases in amoc_code_server_SUITE because +%% the module info map used in the suite must have key names +%% matched to the field names of this record. +-record(uploaded_module, {module :: module(), + beam_filename :: file:filename(), + binary :: binary(), + md5 :: binary()}). + +-type uploaded_module() :: #uploaded_module{}. + +-type state() :: map(). + +%%------------------------------------------------------------------------- +%% API +%%------------------------------------------------------------------------- +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec add_module(module()) -> ok | {error, term()}. +add_module(Module) -> + gen_server:call(?MODULE, {add_module, Module}). + +-spec distribute_modules(node()) -> [{module(), ok | {error, term()}}]. +distribute_modules(Node) -> + UploadedModules = ets:tab2list(uploaded_modules), + [{Module, upload_module(Node, UM)} || + #uploaded_module{module = Module} = UM <- UploadedModules]. + +-spec upload_module(node(), uploaded_module()) -> ok | {error, term()}. +upload_module(Node, UploadedModule) -> + %% format of the call request is critical for the tests, it must be + %% {upload_module, #uploaded_module{}} + gen_server:call({?MODULE, Node}, {upload_module, UploadedModule}). + +-ifdef(TEST). +uploaded_module_to_map(Record) -> + ?RECORD2MAP(uploaded_module)(Record). + +map_to_uploaded_module(Map) -> + ?MAP2RECORD(uploaded_module)(Map). +-endif. + +-spec does_scenario_exist(module()) -> boolean(). +does_scenario_exist(Scenario) -> + [{Scenario, scenario}] =:= ets:lookup(configurable_modules, Scenario). + +-spec list_scenario_modules() -> [module()]. +list_scenario_modules() -> + [S || [S] <- ets:match(configurable_modules, {'$1', scenario})]. + +-spec list_configurable_modules() -> [module()]. +list_configurable_modules() -> + [S || [S] <- ets:match(configurable_modules, {'$1', configurable})]. + +%%------------------------------------------------------------------------- +%% gen_server callbacks +%%------------------------------------------------------------------------- +-spec init([]) -> {ok, state()}. +init([]) -> + start_scenarios_ets(), + ok = add_code_paths(), + find_all_configurable_and_scenario_modules(), + {ok, #{}}. + +-spec handle_call(any(), any(), state()) -> {reply, any(), state()}. +handle_call({add_module, Module}, _, State) -> + Reply = add_module_internal(Module), + {reply, Reply, State}; +handle_call({upload_module, #uploaded_module{} = UM}, _, State) -> + Reply = upload_module_internal(UM), + {reply, Reply, State}; +handle_call(_, _, State) -> + {reply, {error, not_implemented}, State}. + +-spec handle_cast(any(), state()) -> {noreply, state()}. +handle_cast(_, State) -> + {noreply, State}. + +%%------------------------------------------------------------------------- +%% local functions +%%------------------------------------------------------------------------- +-spec start_scenarios_ets() -> term(). +start_scenarios_ets() -> + EtsOptions = [named_table, protected, {read_concurrency, true}], + ets:new(configurable_modules, EtsOptions), + ets:new(uploaded_modules, [{keypos, #uploaded_module.module} | EtsOptions]). + +-spec add_code_paths() -> ok | {error, {bad_directories, [file:filename()]}}. +add_code_paths() -> + AdditionalCodePaths = amoc_config_env:get(extra_code_paths, []), + Res = [{code:add_pathz(Path), Path} || Path <- AdditionalCodePaths], + case [Dir || {{error, bad_directory}, Dir} <- Res] of + [] -> ok; + BadDirectories -> {error, {bad_directories, BadDirectories}} + end. + +-spec find_all_configurable_and_scenario_modules() -> [module()]. +find_all_configurable_and_scenario_modules() -> + ErtsPath = code:lib_dir(), + AllPaths = [Path || Path <- code:get_path(), not lists:prefix(ErtsPath, Path)], + AllBeamFiles = [File || Path <- AllPaths, File <- filelib:wildcard("*.beam", Path)], + AllModules = [list_to_atom(filename:rootname(BeamFile)) || BeamFile <- AllBeamFiles], + ok = code:ensure_modules_loaded(AllModules), + [maybe_store_configurable_module(Module) || {Module, _} <- code:all_loaded()]. + +-spec maybe_store_configurable_module(module()) -> any(). +maybe_store_configurable_module(Module) -> + case get_module_type(Module) of + scenario -> + ets:insert_new(configurable_modules, {Module, scenario}); + configurable -> + ets:insert_new(configurable_modules, {Module, configurable}); + ordinary -> + ok + end. + +-spec get_module_type(module()) -> scenario | configurable | ordinary. +get_module_type(Module) -> + case erlang:function_exported(Module, module_info, 1) of + false -> + %% This can happen with the mocked (meck:new/2) and + %% later unloaded (meck:unload/1) module. So this + %% clause is required to pass the tests. + ordinary; + true -> + ModuleAttributes = apply(Module, module_info, [attributes]), + lists:foldl(fun({behaviour, [amoc_scenario]}, _) -> scenario; + ({behavior, [amoc_scenario]}, _) -> scenario; + ({required_variable, _}, ordinary) -> configurable; + (_, Ret) -> Ret + end, ordinary, ModuleAttributes) + end. + +-spec add_module_internal(module()) -> + ok | {error, module_version_has_changed | no_beam_file_for_module | + module_is_not_loaded | code_path_collision}. +add_module_internal(Module) -> + case maybe_add_module(Module) of + ok -> ok; + {error, no_beam_file_for_module} -> + %% might happen if directory with beam file is not added to the code path + case maybe_add_code_path(Module) of + true -> maybe_add_module(Module); + false -> {error, no_beam_file_for_module} + end; + Error -> Error + end. + +maybe_add_module(Module) -> + case {code:is_loaded(Module), code:get_object_code(Module)} of + {false, _} -> + {error, module_is_not_loaded}; + {{file, _BeamFile}, error} -> + {error, no_beam_file_for_module}; + {{file, BeamFile}, {Module, _Binary, Filename}} when BeamFile =/= Filename -> + {error, code_path_collision}; + {{file, BeamFile}, {Module, Binary, BeamFile}} -> + maybe_store_uploaded_module(Module, Binary, BeamFile) + end. + +maybe_store_uploaded_module(Module, Binary, Filename) -> + MD5 = get_md5(Module), + UploadedModule = #uploaded_module{module = Module, binary = Binary, + beam_filename = Filename, md5 = MD5}, + case ets:insert_new(uploaded_modules, UploadedModule) of + true -> + maybe_store_configurable_module(Module), + ok; + false -> + check_uploaded_module_version(UploadedModule) + end. + +-spec check_uploaded_module_version(uploaded_module()) -> + ok | {error, module_version_has_changed | module_is_not_uploaded}. +check_uploaded_module_version(#uploaded_module{module = Module, md5 = MD5}) -> + case {ets:lookup(uploaded_modules, Module), get_md5(Module)} of + {[#uploaded_module{md5 = MD5}], MD5} -> + %% md5 is the same, we have the same version of module loaded & stored in ETS + ok; + {[], MD5} -> + %% this can happen for upload_module_internal/1 calls + %% and should never happen for add_module_internal/1 + {error, module_is_not_uploaded}; + _ -> + {error, module_version_has_changed} + end. + +-spec get_md5(module()) -> binary(). +get_md5(Module) -> + Module:module_info(md5). + +maybe_add_code_path(Module) -> + try + {file, BeamFile} = code:is_loaded(Module), + true = is_list(BeamFile), + true = filelib:is_regular(BeamFile), + BeamDir = filename:dirname(filename:absname(BeamFile)), + true = filelib:is_dir(BeamDir), + CodePath = code:get_path(), + code:add_pathz(BeamDir), + NewCodePath = code:get_path(), + NewCodePath =/= CodePath + catch + _C:_E -> false + end. + +upload_module_internal(#uploaded_module{module = Module, binary = Binary, + beam_filename = Filename} = UM) -> + case code:is_loaded(Module) of + false -> + case code:load_binary(Module, Filename, Binary) of + {error, _Error} -> {error, module_loading_error}; + {module, Module} -> maybe_store_uploaded_module(Module, Binary, Filename) + end; + {file, _} -> + case check_uploaded_module_version(UM) of + ok -> ok; + {error, module_is_not_uploaded} -> + %% the same version of module is loaded, but not yet stored in ETS + %% so let's try to store it for consistency. if module adding fails + %% (e.g. if there's no beam file), it's not a big problem and can + %% be ignored. + add_module_internal(Module), + ok; + Error -> Error + end + end. diff --git a/src/amoc_config/amoc_config_scenario.erl b/src/amoc_config/amoc_config_scenario.erl index 72cce80a..d82b9b27 100644 --- a/src/amoc_config/amoc_config_scenario.erl +++ b/src/amoc_config/amoc_config_scenario.erl @@ -67,7 +67,7 @@ get_current_configuration() -> -spec get_configuration(module()) -> {ok, module_configuration()} | error(). get_configuration(Module) -> - ConfigurableModules = amoc_scenario:list_configurable_modules(), + ConfigurableModules = amoc_code_server:list_configurable_modules(), AllConfigurableModules = [Module | ConfigurableModules], PipelineActions = [ {fun compose_configuration/1, [AllConfigurableModules]}, diff --git a/src/amoc_controller.erl b/src/amoc_controller.erl index c88bf78f..1547834a 100644 --- a/src/amoc_controller.erl +++ b/src/amoc_controller.erl @@ -19,7 +19,7 @@ last_user_id = 0 :: last_user_id(), status = idle :: idle | running | terminating | finished | {error, any()} | disabled, - scenario_state :: any(), + scenario_state :: any(), %% state returned from Scenario:init/0 create_users = [] :: [amoc_scenario:user_id()], tref :: timer:tref() | undefined}). @@ -78,7 +78,7 @@ start_link() -> -spec start_scenario(amoc:scenario(), amoc_config:settings()) -> ok | {error, term()}. start_scenario(Scenario, Settings) -> - case amoc_scenario:does_scenario_exist(Scenario) of + case amoc_code_server:does_scenario_exist(Scenario) of true -> gen_server:call(?SERVER, {start_scenario, Scenario, Settings}); false -> @@ -292,7 +292,7 @@ start_tables() -> %% ETS creation {ok | error, any()}. init_scenario(Scenario, Settings) -> case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of - ok -> apply_safely(Scenario, init, []); + ok -> amoc_scenario:init(Scenario); {error, Type, Reason} -> {error, {Type, Reason}} end. @@ -332,9 +332,9 @@ terminate_all_users({Objects, Continuation}) -> terminate_all_users('$end_of_table') -> ok. -spec dec_no_of_users(state()) -> state(). -dec_no_of_users(#state{scenario = Scenario, scenario_state = ScenarioState, +dec_no_of_users(#state{scenario = Scenario, scenario_state = ScenarioState, no_of_users = 1, status = terminating} = State) -> - apply_safely(Scenario, terminate, [ScenarioState]), + amoc_scenario:terminate(Scenario, ScenarioState), State#state{no_of_users = 0, status = finished}; dec_no_of_users(#state{no_of_users = N} = State) -> State#state{no_of_users = N - 1}. @@ -343,17 +343,6 @@ dec_no_of_users(#state{no_of_users = N} = State) -> interarrival() -> amoc_config:get(interarrival). --spec apply_safely(atom(), atom(), [term()]) -> {ok | error, term()}. -apply_safely(M, F, A) -> - try erlang:apply(M, F, A) of - {ok, RetVal} -> {ok, RetVal}; - {error, Error} -> {error, Error}; - Result -> {ok, Result} - catch - Class:Exception:Stacktrace -> - {error, {Class, Exception, Stacktrace}} - end. - -spec maybe_update_interarrival_timer(state()) -> state(). maybe_update_interarrival_timer(#state{tref = undefined} = State) -> State; diff --git a/src/amoc_coordinator/amoc_coordinator.erl b/src/amoc_coordinator/amoc_coordinator.erl index 145dcb04..374c9379 100644 --- a/src/amoc_coordinator/amoc_coordinator.erl +++ b/src/amoc_coordinator/amoc_coordinator.erl @@ -24,7 +24,7 @@ -define(IS_TIMEOUT(Timeout), (?IS_POS_INT(Timeout) orelse Timeout =:= infinity)). -type name() :: atom(). --type state() :: {worker, name(), pid()} | {timeout, name(), pid()}. +-type state() :: {worker, pid()} | {timeout, name(), pid()}. -type coordination_data() :: {pid(), Data :: any()}. @@ -74,6 +74,7 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) -> Plan = normalize_coordination_plan(CoordinationPlan), case gen_event:start({local, Name}) of {ok, _} -> + telemetry:execute([amoc, coordinator, start], #{count => 1}, #{name => Name}), %% according to gen_event documentation: %% %% When the event is received, the event manager calls @@ -83,11 +84,12 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) -> %% in reality the order is reversed, the last added handler %% is executed at first. so to ensure that all the items in %% the plan with NoOfUsers =:= all are executed in the very - %% end, we need to add them first. + %% end, we need to add them first. Also, the timeout handler + %% must be added last, so gen_event triggers it first. AllItemsHandlers = lists:reverse([Item || {all, _} = Item <- Plan]), - [gen_event:add_handler(Name, ?MODULE, {Name, Item}) || Item <- AllItemsHandlers], - [gen_event:add_handler(Name, ?MODULE, {Name, Item}) || {N, _} = Item <- Plan, - is_integer(N)], + [gen_event:add_handler(Name, ?MODULE, Item) || Item <- AllItemsHandlers], + [gen_event:add_handler(Name, ?MODULE, Item) || {N, _} = Item <- Plan, + is_integer(N)], gen_event:add_handler(Name, ?MODULE, {timeout, Name, Timeout}), ok; {error, _} -> error @@ -96,7 +98,8 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) -> %% @doc Stops a coordinator. -spec stop(name()) -> ok. stop(Name) -> - gen_event:stop(Name). + gen_event:stop(Name), + telemetry:execute([amoc, coordinator, stop], #{count => 1}, #{name => Name}). %% @see add/3 -spec add(name(), any()) -> ok. @@ -137,9 +140,9 @@ init({timeout, Name, Timeout}) -> end end), {ok, {timeout, Name, Pid}}; -init({Name, CoordinationItem}) -> +init(CoordinationItem) -> {ok, Pid} = amoc_coordinator_worker:start_link(CoordinationItem), - {ok, {worker, Name, Pid}}. + {ok, {worker, Pid}}. %%-------------------------------------------------------------------- %% @private @@ -152,19 +155,28 @@ init({Name, CoordinationItem}) -> %%-------------------------------------------------------------------- -spec handle_event(Event :: term(), state()) -> {ok, state()}. handle_event(Event, {timeout, Name, Pid}) -> + %% there's only one "timeout" event handler for coordinator, + %% so calling telemetry:execute/3 here to ensure that it's + %% triggered just once per event. + TelemetryEvent = case Event of + {coordinate, _} -> add; + reset_coordinator -> reset; + coordinator_timeout -> timeout + end, + telemetry:execute([amoc, coordinator, TelemetryEvent], + #{count => 1}, #{name => Name}), erlang:send(Pid, Event), {ok, {timeout, Name, Pid}}; -handle_event(Event, {worker, Name, Pid}) -> - telemetry:execute([amoc, coordinator, event], #{count => 1}, #{name => Name, type => Event}), +handle_event(Event, {worker, WorkerPid}) -> case Event of coordinator_timeout -> %% synchronous notification - amoc_coordinator_worker:timeout(Pid); + amoc_coordinator_worker:timeout(WorkerPid); reset_coordinator -> %% synchronous notification - amoc_coordinator_worker:reset(Pid); - {coordinate, Data} -> %% asnyc notification - amoc_coordinator_worker:add(Pid, Data) + amoc_coordinator_worker:reset(WorkerPid); + {coordinate, {Pid, Data}} when is_pid(Pid) -> %% async notification + amoc_coordinator_worker:add(WorkerPid, {Pid, Data}) end, - {ok, {worker, Name, Pid}}. + {ok, {worker, WorkerPid}}. %%-------------------------------------------------------------------- %% @private @@ -191,7 +203,7 @@ handle_call(_Request, State) -> -spec terminate(any(), state()) -> ok. terminate(_, {timeout, _Name, Pid}) -> erlang:send(Pid, terminate), ok; -terminate(_, {worker, _Name, Pid}) -> +terminate(_, {worker, Pid}) -> %% synchronous notification amoc_coordinator_worker:stop(Pid), ok. diff --git a/src/amoc_distribution/amoc_dist.erl b/src/amoc_distribution/amoc_dist.erl index 7ada6a8c..a57d6084 100644 --- a/src/amoc_distribution/amoc_dist.erl +++ b/src/amoc_distribution/amoc_dist.erl @@ -158,14 +158,9 @@ setup_slave_node(Node) -> -spec propagate_uploaded_modules(node()) -> {ok, any()} | {error, any()}. propagate_uploaded_modules(Node) -> - UploadedModules = amoc_scenario:list_uploaded_modules(), - Result = [{Module, propagate_module(Node, Module, SourceCode)} - || {Module, SourceCode} <- UploadedModules], + Result = amoc_code_server:distribute_modules(Node), maybe_error(Result). -propagate_module(Node, Module, SourceCode) -> - rpc:call(Node, amoc_scenario, install_module, [Module, SourceCode]). - -spec add_users(pos_integer(), [node()]) -> {ok, any()} | {error, any()}. add_users(Count, Nodes) -> {ok, LastId} = get_param(last_id), diff --git a/src/amoc_scenario.erl b/src/amoc_scenario.erl index 2903232a..27c4502c 100644 --- a/src/amoc_scenario.erl +++ b/src/amoc_scenario.erl @@ -3,22 +3,8 @@ %% Licensed under the Apache License, Version 2.0 (see LICENSE file) %%============================================================================== -module(amoc_scenario). -%% API --export([start_link/0, - install_module/2, - remove_module/1, - does_scenario_exist/1, - list_scenario_modules/0, - list_uploaded_modules/0, - list_configurable_modules/0]). - --behaviour(gen_server). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2]). --define(TABLE, amoc_scenarios). --define(PRIV_DIR, code:priv_dir(amoc)). --define(EBIN_DIR, filename:join(code:priv_dir(amoc), "scenarios_ebin")). +-export([init/1, terminate/2, start/3]). %%------------------------------------------------------------------------- %% behaviour definition @@ -27,192 +13,66 @@ -type user_id() :: pos_integer(). -type state() :: any(). --type sourcecode() :: binary(). -callback init() -> {ok, state()} | ok | {error, Reason :: term()}. -callback start(user_id(), state()) -> any(). -callback start(user_id()) -> any(). -callback terminate(state()) -> any(). +-callback terminate() -> any(). -%% either start/1 or start/2 must be exported from the behaviour module +%% either start/1 or start/2 must be exported from the behavior module. +%% if scenario module exports both functions, start/2 is used. -optional_callbacks([start/1, start/2]). --optional_callbacks([terminate/1]). - -%%------------------------------------------------------------------------- -%% API -%%------------------------------------------------------------------------- --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - --spec install_module(module(), sourcecode()) -> - ok | {error, [Errors :: string()], [Warnings :: string()]}. -install_module(Module, ModuleSource) -> - gen_server:call(?MODULE, {add_module, Module, ModuleSource}). - --spec remove_module(module()) -> - ok | {error, [Errors :: string()], [Warnings :: string()]}. -remove_module(Module) -> - gen_server:call(?MODULE, {remove_module, Module}). --spec does_scenario_exist(module()) -> boolean(). -does_scenario_exist(Scenario) -> - [{Scenario, scenario}] =:= ets:lookup(?TABLE, Scenario). +%% terminate/0,1 callbacks are optional. +%% if scenario module exports both functions, terminate/1 is used. +-optional_callbacks([terminate/0, terminate/1]). --spec list_scenario_modules() -> [module()]. -list_scenario_modules() -> - [S || [S] <- ets:match(?TABLE, {'$1', scenario})]. - --spec list_uploaded_modules() -> [{module(), sourcecode()}]. -list_uploaded_modules() -> - ets:tab2list(uploaded_modules). - --spec list_configurable_modules() -> [module()]. -list_configurable_modules() -> - [S || [S] <- ets:match(?TABLE, {'$1', configurable})]. - -%%------------------------------------------------------------------------- -%% gen_server callbacks %%------------------------------------------------------------------------- --spec init([]) -> {ok, state()}. -init([]) -> - start_scenarios_ets(), - ok = add_code_paths(), - find_scenario_modules(), - {ok, ok}. - --spec handle_call(any(), any(), state()) -> {reply, any(), state()}. -handle_call({add_module, Module, ModuleSource}, _, State) -> - Reply = add_module(Module, ModuleSource), - {reply, Reply, State}; -handle_call({remove_module, Module}, _, State) -> - Reply = do_remove_module(Module), - {reply, Reply, State}; -handle_call(_, _, State) -> - {reply, {error, not_implemented}, State}. - --spec handle_cast(any(), state()) -> {noreply, state()}. -handle_cast(_, State) -> - {noreply, State}. - -%%------------------------------------------------------------------------- -%% local functions +%% API %%------------------------------------------------------------------------- --spec start_scenarios_ets() -> term(). -start_scenarios_ets() -> - EtsOptions = [named_table, protected, {read_concurrency, true}], - ets:new(?TABLE, EtsOptions), - ets:new(uploaded_modules, EtsOptions). - --spec add_code_paths() -> ok | {error, {bad_directories, [file:filename()]}}. -add_code_paths() -> - true = code:add_pathz(?EBIN_DIR), - AdditionalCodePaths = amoc_config_env:get(extra_code_paths, []), - Res = [{code:add_pathz(Path), Path} || Path <- [?EBIN_DIR | AdditionalCodePaths]], - case [Dir || {{error, bad_directory}, Dir} <- Res] of - [] -> ok; - BadDirectories -> {error, {bad_directories, BadDirectories}} - end. - --spec find_scenario_modules() -> [module()]. -find_scenario_modules() -> - ErtsPath = code:lib_dir(), - AllPaths = [Path || Path <- code:get_path(), not lists:prefix(ErtsPath, Path)], - AllBeamFiles = [File || Path <- AllPaths, File <- filelib:wildcard("*.beam", Path)], - AllModules = [list_to_atom(filename:rootname(BeamFile)) || BeamFile <- AllBeamFiles], - ok = code:ensure_modules_loaded(AllModules), - [maybe_store_module(Module) || Module <- erlang:loaded()]. - --spec maybe_store_module(module()) -> any(). -maybe_store_module(Module) -> - case get_module_type(Module) of - scenario -> - ets:insert(?TABLE, {Module, scenario}); - configurable -> - ets:insert(?TABLE, {Module, configurable}); - ordinary -> +-spec init(amoc:scenario()) -> {ok, state()} | {error, Reason :: term()}. +init(Scenario) -> + apply_safely(Scenario, init, []). + +-spec terminate(amoc:scenario(), state()) -> {ok, any()} | {error, Reason :: term()}. +terminate(Scenario, State) -> + case {erlang:function_exported(Scenario, terminate, 1), + erlang:function_exported(Scenario, terminate, 0)} of + {true, _} -> + %% since we ignore Scenario:terminate/1 return value + %% we can use apply_safely/3 function + apply_safely(Scenario, terminate, [State]); + {_, true} -> + %% since we ignore Scenario:terminate/0 return value + %% we can use apply_safely/3 function + apply_safely(Scenario, terminate, []); + _ -> ok end. --spec get_module_type(module()) -> scenario | configurable | ordinary. -get_module_type(Module) -> - case erlang:function_exported(Module, module_info, 1) of - false -> - %% This can happen with the mocked (meck:new/2) and - %% later unloaded (meck:unload/1) module. So this - %% clause is required to pass the tests. - ordinary; - true -> - ModuleAttributes = apply(Module, module_info, [attributes]), - lists:foldl(fun({behaviour, [?MODULE]}, _) -> scenario; - ({behavior, [?MODULE]}, _) -> scenario; - ({required_variable, _}, ordinary) -> configurable; - (_, Ret) -> Ret - end, ordinary, ModuleAttributes) - end. - --spec add_module(module(), sourcecode()) -> - ok | {error, [Errors :: string()], [Warnings :: string()]}. -add_module(Module, ModuleSource) -> - case erlang:module_loaded(Module) of - true -> - case ets:lookup(uploaded_modules, Module) of - [{Module, ModuleSource}] -> ok; - _ -> {error, ["module with such name already exists"], []} - end; - false -> - ScenarioPath = scenario_path_name(Module), - write_scenario_to_file(ModuleSource, ScenarioPath), - case compile_and_load_scenario(ScenarioPath) of - {ok, Module} -> - maybe_store_module(Module), - propagate_module(Module, ModuleSource), - ets:insert(uploaded_modules, {Module, ModuleSource}), - ok; - Error -> Error - end - end. - --spec do_remove_module(module()) -> - ok | {error, [Errors :: string()], [Warnings :: string()]}. -do_remove_module(Module) -> - case erlang:module_loaded(Module) andalso - ets:member(?TABLE, Module) of - true -> - ets:delete(?TABLE, Module), - propagate_remove_module(Module), - ets:delete(uploaded_modules, Module), - ok; - false -> - {error, ["module with such name does not exist"], []} +-spec start(amoc:scenario(), user_id(), state()) -> any(). +start(Scenario, Id, State) -> + case {erlang:function_exported(Scenario, start, 2), + erlang:function_exported(Scenario, start, 1)} of + {true, _} -> + Scenario:start(Id, State); + {_, true} -> + Scenario:start(Id); + {false, false} -> + error("the scenario module must export either start/2 or start/1 function") end. --spec propagate_module(module(), sourcecode()) -> any(). -propagate_module(Module, ModuleSource) -> - Nodes = amoc_cluster:all_nodes() -- [node()], - rpc:multicall(Nodes, amoc_scenario, install_module, [Module, ModuleSource]). - --spec propagate_remove_module(module()) -> any(). -propagate_remove_module(Module) -> - Nodes = amoc_cluster:all_nodes() -- [node()], - rpc:multicall(Nodes, amoc_scenario, remove_module, [Module]). - --spec scenario_path_name(module()) -> file:filename(). -scenario_path_name(Module) -> %% w/o ".erl" extension - filename:join([?PRIV_DIR, "scenarios", atom_to_list(Module)]). - --spec write_scenario_to_file(sourcecode(), file:filename()) -> ok. -write_scenario_to_file(ModuleSource, ScenarioPath) -> - ok = file:write_file(ScenarioPath ++ ".erl", ModuleSource, [write]). - --spec compile_and_load_scenario(string()) -> {ok, module()} | {error, [string()], [string()]}. -compile_and_load_scenario(ScenarioPath) -> - CompilationFlags = [{outdir, ?EBIN_DIR}, return_errors, report_errors, verbose], - case compile:file(ScenarioPath, CompilationFlags) of - {ok, Module} -> - {module, Module} = code:load_file(Module), - {ok, Module}; - {error, Errors, Warnings} -> - file:delete(ScenarioPath ++ ".erl"), - {error, Errors, Warnings} +%% ------------------------------------------------------------------ +%% internal functions +%% ------------------------------------------------------------------ +-spec apply_safely(atom(), atom(), [term()]) -> {ok | error, term()}. +apply_safely(M, F, A) -> + try erlang:apply(M, F, A) of + {ok, RetVal} -> {ok, RetVal}; + {error, Error} -> {error, Error}; + Result -> {ok, Result} + catch + Class:Exception:Stacktrace -> + {error, {Class, Exception, Stacktrace}} end. diff --git a/src/amoc_sup.erl b/src/amoc_sup.erl index d3148d1a..1a912e2a 100644 --- a/src/amoc_sup.erl +++ b/src/amoc_sup.erl @@ -37,6 +37,6 @@ init([]) -> ?CHILD(amoc_users_sup, supervisor), ?CHILD(amoc_controller, worker), ?CHILD(amoc_cluster, worker), - ?CHILD(amoc_scenario, worker), + ?CHILD(amoc_code_server, worker), ?CHILD(amoc_throttle_controller, worker) ]}}. diff --git a/src/amoc_throttle/amoc_throttle_controller.erl b/src/amoc_throttle/amoc_throttle_controller.erl index dbf56800..0a1f7f33 100644 --- a/src/amoc_throttle/amoc_throttle_controller.erl +++ b/src/amoc_throttle/amoc_throttle_controller.erl @@ -10,7 +10,7 @@ ensure_throttle_processes_started/4, pause/1, resume/1, stop/1, change_rate/3, change_rate_gradually/6, - run/2]). + run/2, telemetry_event/2]). %% gen_server callbacks -export([init/1, @@ -53,16 +53,17 @@ start_link() -> {ok, throttle_processes_already_started} | {error, any()}). ensure_throttle_processes_started(Name, Interval, Rate, NoOfProcesses) -> + maybe_raise_event(Name, init), gen_server:call(?MASTER_SERVER, {start_processes, Name, Interval, Rate, NoOfProcesses}). -spec run(name(), fun(() -> any())) -> ok | {error, any()}. run(Name, Fn) -> case get_throttle_process(Name) of {ok, Pid} -> - maybe_raise_event([amoc, throttle, request], #{count => 1}, #{name => Name}), + maybe_raise_event(Name, request), Fun = fun() -> - maybe_raise_event([amoc, throttle, execute], #{count => 1}, #{name => Name}), + maybe_raise_event(Name, execute), Fn() end, amoc_throttle_process:run(Pid, Fun), @@ -93,6 +94,10 @@ change_rate_gradually(Name, LowRate, HighRate, RateInterval, StepInterval, NoOfS stop(Name) -> gen_server:call(?MASTER_SERVER, {stop, Name}). +-spec telemetry_event(name(), request | execute) -> ok. +telemetry_event(Name, Event) when Event =:= request; Event =:= execute -> + raise_event(Name, Event). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -190,12 +195,15 @@ handle_info({change_plan, Name}, State) -> %%% Internal functions %%%=================================================================== -maybe_raise_event(Name, Measurements, Metadata) -> +maybe_raise_event(Name, Event) -> case amoc_cluster:master_node() =:= node() of true -> ok; - _ -> telemetry:execute(Name, Measurements, Metadata) + _ -> raise_event(Name, Event) end. +raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init -> + telemetry:execute([amoc, throttle, Event], #{count => 1}, #{name => Name}). + -spec change_rate_and_stop_plan(name(), state()) -> state(). change_rate_and_stop_plan(Name, State) -> Info = maps:get(Name, State), @@ -237,7 +245,7 @@ rate_per_minute(Rate, Interval) -> -spec start_processes(name(), pos_integer(), non_neg_integer(), pos_integer()) -> pos_integer(). start_processes(Name, Rate, Interval, NoOfProcesses) -> - % Master metrics + raise_event(Name, init), RatePerMinute = rate_per_minute(Rate, Interval), report_rate(Name, RatePerMinute), RealNoOfProcesses = min(Rate, NoOfProcesses), diff --git a/src/amoc_throttle/amoc_throttle_process.erl b/src/amoc_throttle/amoc_throttle_process.erl index e716e35a..41a23666 100644 --- a/src/amoc_throttle/amoc_throttle_process.erl +++ b/src/amoc_throttle/amoc_throttle_process.erl @@ -97,7 +97,7 @@ handle_cast(pause_process, State) -> handle_cast(resume_process, State) -> {noreply, State#state{pause = false}, {continue, maybe_run_fn}}; handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) -> - telemetry:execute([amoc, throttle, request], #{count => 1}, #{name => Name}), + amoc_throttle_controller:telemetry_event(Name, request), {noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}}; handle_cast({update, Interval, Rate}, State) -> NewState = merge_state(initial_state(Interval, Rate), State), @@ -188,7 +188,7 @@ maybe_run_fn(State) -> run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) -> erlang:monitor(process, RunnerPid), RunnerPid ! scheduled, - telemetry:execute([amoc, throttle, execute], #{count => 1}, #{name => Name}), + amoc_throttle_controller:telemetry_event(Name, execute), State#state{schedule = T, n = N - 1}. async_runner(Fun) -> diff --git a/src/amoc_user.erl b/src/amoc_user.erl index 6c289155..a23e6492 100644 --- a/src/amoc_user.erl +++ b/src/amoc_user.erl @@ -29,15 +29,5 @@ stop(Pid, Force) when is_pid(Pid) -> init(Parent, Scenario, Id, State) -> proc_lib:init_ack(Parent, {ok, self()}), process_flag(trap_exit, true), - ScenarioFun = fun() -> perform_scenario(Scenario, Id, State) end, + ScenarioFun = fun() -> {amoc_scenario:start(Scenario, Id, State), #{}} end, telemetry:span([amoc, scenario, user], #{}, ScenarioFun). - --spec perform_scenario(amoc:scenario(), amoc_scenario:user_id(), state()) -> {term(), map()}. -perform_scenario(Scenario, Id, State) -> - Ret = case erlang:function_exported(Scenario, start, 2) of - true -> - Scenario:start(Id, State); - false -> - Scenario:start(Id) - end, - {Ret, #{}}. diff --git a/test/amoc_code_server_SUITE.erl b/test/amoc_code_server_SUITE.erl new file mode 100644 index 00000000..9f9968bd --- /dev/null +++ b/test/amoc_code_server_SUITE.erl @@ -0,0 +1,710 @@ +-module(amoc_code_server_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-export([all/0, groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2]). + +-export([verify_module_info_compatibility/1, + module_md5_and_binary_test/1, + add_regular_module_test/1, + add_configurable_modules_test/1, + modules_distribution_test/1, + basic_modules_uploading_test/1, + uploading_module_with_different_md5_test/1, + reuploading_modules_after_amoc_code_server_restart_test/1, + uploading_release_modules_test/1]). + +-import(amoc_code_server, [map_to_uploaded_module/1, + uploaded_module_to_map/1]). + +-define(assertEqualLists(L1, L2), + ?assertEqual(lists:sort(L1), lists:sort(L2))). + +-define(DUMMY_CODE_PATH, "/invalid/code/path"). + +all() -> + [{group, all_tests}]. + +groups() -> + [{all_tests, [sequence], all_tests()}]. + +all_tests() -> + %% if the first test fails, then it makes no + %% sense to run any of the following tests + [verify_module_info_compatibility, + % failing_testcase, + module_md5_and_binary_test, + add_regular_module_test, + add_configurable_modules_test, + modules_distribution_test, + basic_modules_uploading_test, + uploading_module_with_different_md5_test, + reuploading_modules_after_amoc_code_server_restart_test, + uploading_release_modules_test]. + +init_per_suite(Config) -> + % ct:pal("~nConfig = ~p~n", [Config]), + + OriginalCodePath = code:get_path(), + WorkingDir = recreate_working_dir(Config), + NewConfig = [{original_code_path, OriginalCodePath}, + {working_dir, WorkingDir} | Config], + %% ModuleA and ModuleB are essentially the same, however there are few differences. + %% while module_info(md5) call for both modules returns one and the same checksum, + %% code:get_object_code/1 returns different binaries. These modules can be used for + %% amoc_code_server testing. ModuleC has different MD5 and different binary. + %% see module_md5_and_binary_test test case for more details. + ModuleInfoA = generate_module(NewConfig, #{}), %% "tag" => [$\n, $\n, $\n] + ModuleInfoB = generate_module(NewConfig, #{"tag" => ""}), + ModuleInfoC = generate_module(NewConfig, #{"tag" => "-compile(export_all)."}), + + ScenarioModuleInfo = generate_module(NewConfig, + #{"tag" => "-behavior(amoc_scenario).", + "module_name" => "some_scenario"}), + HelperModuleInfo = generate_module(NewConfig, + #{"tag" => "-required_variable(some_var).", + "module_name" => "some_helper"}), + + [{module_a, ModuleInfoA}, {module_b, ModuleInfoB}, {module_c, ModuleInfoC}, + {scenario_module, ScenarioModuleInfo}, {helper_module, HelperModuleInfo} + | NewConfig]. + +end_per_suite(Config) -> + Config. + +init_per_testcase(_TestCase, Config) -> + unload_modules_and_restore_code_path(Config), + Config. + +end_per_testcase(_TestCase, Config) -> + unload_modules_and_restore_code_path(Config), + Config. + +%%--------------------------------------------------------- +%% test cases +%%--------------------------------------------------------- + +verify_module_info_compatibility(Config) -> + %% this is not a test case, but rather a verification of + %% compatibility between internal uploaded_module record + %% defined in amoc_code_server and module info map used + %% in this suite. if this test fails, then mostly like all + %% other test cases would fail as well. + + DummyUploadedModuleRec = + map_to_uploaded_module(#{'$RECORD_NAME' => uploaded_module}), + DummyUploadedModuleMap = uploaded_module_to_map(DummyUploadedModuleRec), + %% while a record tuple has an extra additional element with a record name, + %% its map counterpart contains an additional '$RECORD_NAME' key, so the sizes + %% of map and tuple must be the same. + ?assertEqual(map_size(DummyUploadedModuleMap), tuple_size(DummyUploadedModuleRec)), + UploadedModuleKeys = maps:keys(DummyUploadedModuleMap), + ModuleInfoA = proplists:get_value(module_a, Config), + [?assert(maps:is_key(Key, ModuleInfoA)) || Key <- UploadedModuleKeys], + UploadedModuleRec = map_to_uploaded_module(ModuleInfoA), + UploadedModuleMap = uploaded_module_to_map(UploadedModuleRec), + ?assertEqual(UploadedModuleMap, maps:with(UploadedModuleKeys, UploadedModuleMap)), + ?assertEqual(UploadedModuleRec, map_to_uploaded_module(UploadedModuleMap)). + +module_md5_and_binary_test(Config) -> + %% this is not a test case, but rather a rational why MD5 + %% should be used instead of binary for modules comparison. + + [ModuleInfoA, ModuleInfoB, ModuleInfoC] = + [proplists:get_value(Key, Config) || Key <- [module_a, module_b, module_c]], + + %% note that compare_module_info/3 not only verifies that the + %% values for keys supplied in the EqualKeys list are the same + %% in both ModuleInfo maps, but in addition to that it also + %% verifies that all other keys have different values associated + %% with them in ModuleInfo maps. + EqualKeys = [module, working_dir, erl_file], + + %% reason #1, empty lines at the beginning or in the middle + %% of the document affect binary module representation, however + %% module's MD5 remains the same. this situation is possible + %% if we manually copy-past and upload code on 2 nodes of the + %% cluster and accidentally copy code with an extra empty line + %% while uploading on one node and without that extra line when + %% uploading on another node. + compare_module_info(ModuleInfoA, ModuleInfoC, EqualKeys), + compare_module_info(ModuleInfoA, ModuleInfoB, [md5 | EqualKeys]), + SouceCodeA = maps:get(source_code, ModuleInfoA), + SouceCodeB = maps:get(source_code, ModuleInfoB), + ct:pal("Module w/o extra empty lines:~n~n----~n~s~n----~n", + [re:replace(SouceCodeA, "\n(\s*\n)+", "\n", [global])]), + ?assertEqual(re:replace(SouceCodeA, "\n(\s*\n)+", "\n", [global]), + re:replace(SouceCodeB, "\n(\s*\n)+", "\n", [global])), + + %% reason #2, when compiling one and the same module but + %% with different path to source code, it changes the binary + %% representation of the module, while MD5 remains unchanged. + WorkingDir = maps:get(working_dir, ModuleInfoA), + NewWorkingDir = filename:join(WorkingDir, "tmp"), + file:make_dir(NewWorkingDir), + ?assertEqual(true, filelib:is_dir(NewWorkingDir)), + ModuleInfoA2 = compile_and_load_module(ModuleInfoA#{working_dir := NewWorkingDir}), + compare_module_info(ModuleInfoA, ModuleInfoA2, [module, md5, source_code]), + + %% reason #3, if module is loaded using code:load_binary/3 + %% interface, then there might be no beam file. so there's + %% no way to fetch an actual binary for the module. more + %% over, code:get_object_code/3 doesn't use the beam file + %% of the currently loaded module, but searches for the + %% first suitable beam file in the code path. if it's not + %% the same beam file as the loaded one, then the fetched + %% binary is incorrect. + #{beam_filename := BeamFileA, out_dir := OutDirA, + binary := BinaryA, module := Module} = ModuleInfoA, + #{binary := BinaryC, out_dir := OutDirC, + beam_filename := BeamFileC} = ModuleInfoC, + compare_module_info(ModuleInfoA, ModuleInfoC, EqualKeys), + load_module(ModuleInfoA), + ?assertEqual(error, code:get_object_code(Module)), + assert_module_loaded(ModuleInfoA), + ?assertEqual(true, code:add_patha(OutDirC)), + ?assertEqual({Module, BinaryC, BeamFileC}, code:get_object_code(Module)), + assert_module_loaded(ModuleInfoA), + ?assertEqual(true, code:add_patha(OutDirA)), + ?assertEqual({Module, BinaryA, BeamFileA}, code:get_object_code(Module)), + assert_module_loaded(ModuleInfoA), + ok. + +add_regular_module_test(Config) -> + [ModuleInfoA, ModuleInfoB, ModuleInfoC, OriginalCodePath] = + [proplists:get_value(Key, Config) || + Key <- [module_a, module_b, module_c, original_code_path]], + #{module := Module, out_dir := OutDirA} = ModuleInfoA, + {ok, Pid} = amoc_code_server:start_link(), + + %% note that test modules are loaded only after the start of the amoc_code_server, + %% so they are not added into configurable_modules ETS during the initial modules + %% analysis + ConfigurableModules = amoc_code_server:list_configurable_modules(), + ScenarioModules = amoc_code_server:list_scenario_modules(), + + %% initially uploaded modules ETS is empty + assert_uploaded_modules_ets([]), + + %% if module is not loaded, amoc_code_server:add_module/1 returns error and + %% nothing is added to the uploaded modules ETS + ?assertEqual({error, module_is_not_loaded}, amoc_code_server:add_module(Module)), + assert_uploaded_modules_ets([]), + + %% if beam file for the module is missing, amoc_code_server:add_module/1 returns + %% error and nothing is added to the uploaded modules ETS + load_module(invalidate_filename(ModuleInfoA)), + assert_invalid_filename(Module), + ?assertEqual({error, no_beam_file_for_module}, amoc_code_server:add_module(Module)), + assert_uploaded_modules_ets([]), + + %% if beam file is located properly, amoc_code_server:add_module/1 adds module + %% to the uploaded modules ETS + load_module(ModuleInfoA), + ?assertEqual(ok, amoc_code_server:add_module(Module)), + %% if required, amoc_code_server:add_module/1 call adds + %% missing code path automatically + ?assertEqualLists([OutDirA | OriginalCodePath], code:get_path()), + assert_uploaded_modules_ets([ModuleInfoA]), + + %% after loading ModuleInfoB we run into collision, because code path + %% for ModuleInfoA is already added, but it's not added for ModuleInfoB. + %% so code:get_object_code/1 returns a binary for ModuleInfoA. we must + %% ensure that amoc_code_server detects it and returns an error. + %% uploaded modules ETS must remain unchanged. + load_module(ModuleInfoB), + ?assertEqual({error, code_path_collision}, amoc_code_server:add_module(Module)), + assert_uploaded_modules_ets([ModuleInfoA]), + + %% if there is no code path collision, MD5 of the loaded module is compared + %% with the one already stored in ETS. if it's the same, amoc_code_server + %% returns ok. uploaded modules ETS must remain unchanged. + restore_code_path(OriginalCodePath), + ?assertEqual(ok, amoc_code_server:add_module(Module)), + assert_uploaded_modules_ets([ModuleInfoA]), + + %% if MD5 of the loaded module different than the one already stored in ETS, + %% amoc_code_server must return an error. uploaded modules ETS must remain + %% unchanged. + load_module(ModuleInfoC), + restore_code_path(OriginalCodePath), + ?assertEqual({error, module_version_has_changed}, + amoc_code_server:add_module(Module)), + assert_uploaded_modules_ets([ModuleInfoA]), + + %% check that added before module is not erronously identified as a scenario + %% or a configurable module + ?assertEqualLists(ConfigurableModules, + amoc_code_server:list_configurable_modules()), + ?assertEqualLists(ScenarioModules, + amoc_code_server:list_scenario_modules()), + + gen_server:stop(Pid), + ok. + +add_configurable_modules_test(Config) -> + [ScenarioModuleInfo, HelperModuleInfo] = + [proplists:get_value(Key, Config) || Key <- [scenario_module, helper_module]], + {ok, Pid1} = amoc_code_server:start_link(), + %% note that modules are loaded only after the start of the amoc_code_server, + %% so they are not added into configurable_modules ETS table during the initial + %% modules' analysis on amoc_code_server startup. + ConfigurableModules1 = amoc_code_server:list_configurable_modules(), + ScenarioModules1 = amoc_code_server:list_scenario_modules(), + + %% initially uploaded modules ETS is empty + assert_uploaded_modules_ets([]), + + %% load scenario & configurable helper modules + load_module(ScenarioModuleInfo), + load_module(HelperModuleInfo), + + %% when module is added, it must be analyzed and if it's a scenario + %% or configurable module, amoc_code_server must be added into the + %% configurable_modules ETS table + ScenarioModule = maps:get(module, ScenarioModuleInfo), + ?assertEqualLists(ScenarioModules1, amoc_code_server:list_scenario_modules()), + ?assertNot(amoc_code_server:does_scenario_exist(ScenarioModule)), + ?assertEqual(ok, amoc_code_server:add_module(ScenarioModule)), + assert_uploaded_modules_ets([ScenarioModuleInfo]), + ?assert(amoc_code_server:does_scenario_exist(ScenarioModule)), + ScenarioModules2 = amoc_code_server:list_scenario_modules(), + ?assertEqualLists([ScenarioModule | ScenarioModules1], ScenarioModules2), + ?assertEqualLists(ConfigurableModules1, amoc_code_server:list_configurable_modules()), + + HelperModule = maps:get(module, HelperModuleInfo), + ?assertEqual(ok, amoc_code_server:add_module(HelperModule)), + assert_uploaded_modules_ets([ScenarioModuleInfo, HelperModuleInfo]), + ?assertEqualLists(ScenarioModules2, amoc_code_server:list_scenario_modules()), + ConfigurableModules2 = amoc_code_server:list_configurable_modules(), + ?assertEqualLists([HelperModule | ConfigurableModules1], ConfigurableModules2), + + %% if amoc_code_server gets restarted, then all of the previously added modules + %% are analyzed properly during the startup phase, however ETS with uploaded + %% module cannot be repopulated properly. + ?assertNotEqual(undefined, ets:whereis(configurable_modules)), + ?assertNotEqual(undefined, ets:whereis(uploaded_modules)), + + gen_server:stop(Pid1), + + ?assertEqual(undefined, ets:whereis(configurable_modules)), + ?assertEqual(undefined, ets:whereis(uploaded_modules)), + + {ok, Pid2} = amoc_code_server:start_link(), + + ?assertNotEqual(undefined, ets:whereis(configurable_modules)), + ?assertNotEqual(undefined, ets:whereis(uploaded_modules)), + ?assertEqualLists(ScenarioModules2, amoc_code_server:list_scenario_modules()), + ?assertEqualLists(ConfigurableModules2, amoc_code_server:list_configurable_modules()), + assert_uploaded_modules_ets([]), + + gen_server:stop(Pid2), + ok. + +modules_distribution_test(Config) -> + [OriginalCodePath | TestModules ] = + [proplists:get_value(Key, Config) || + Key <- [original_code_path, module_a, scenario_module, helper_module]], + + {ok, Pid1} = amoc_code_server:start_link(), + + [begin + load_module(ModuleInfo), + ?assertEqual(ok, amoc_code_server:add_module(M)) + end || #{module := M} = ModuleInfo <- TestModules], + + start_system_events_logging(Pid1), + DistributionRet = amoc_code_server:distribute_modules(node()), + LoggedEvents1 = stop_system_events_logging(Pid1), + + ExpectedDistributionRet = [{M, ok} || #{module := M} <- TestModules], + ?assertEqualLists(DistributionRet, ExpectedDistributionRet), + + %% note that extract_gen_server_call_request function crashes if supplied + %% message is not a gen_server call, this works as an extra verification + %% that only gen_server calls were made during distribution. + DistributionGenServerCalls1 = + [extract_gen_server_call_request(Msg) || {in, Msg} <- LoggedEvents1], + ct:pal("distribution gen_server calls = ~p", [DistributionGenServerCalls1]), + %% and the number of gen_server calls corresponds to the number of + %% the added modules. + ?assertEqual(length(TestModules), length(DistributionGenServerCalls1)), + verify_upload_module_requests(TestModules, DistributionGenServerCalls1), + + %% now let's ensure that manual modules uploading does exactly the same. + %% modules' distribution is idempotent action, so it must be fine to call + %% it multiple times. + start_system_events_logging(Pid1), + ?assertEqualLists(ExpectedDistributionRet, upload_modules(TestModules)), + LoggedEvents2 = stop_system_events_logging(Pid1), + + DistributionGenServerCalls2 = + [extract_gen_server_call_request(Msg) || {in, Msg} <- LoggedEvents2], + ?assertEqualLists(DistributionGenServerCalls1, DistributionGenServerCalls2), + + assert_uploaded_modules_ets(TestModules), + gen_server:stop(Pid1), + + %% restore the initial state of the system and check that modules uploading + %% is working correctly. + restore_code_path(OriginalCodePath), + [unload_module(M) || M <- TestModules], + + {ok, Pid2} = amoc_code_server:start_link(), + + start_system_events_logging(Pid2), + ?assertEqualLists(ExpectedDistributionRet, upload_modules(TestModules)), + LoggedEvents3 = stop_system_events_logging(Pid2), + + DistributionGenServerCalls3 = + [extract_gen_server_call_request(Msg) || {in, Msg} <- LoggedEvents3], + ?assertEqualLists(DistributionGenServerCalls1, DistributionGenServerCalls3), + assert_uploaded_modules_ets(TestModules), + [assert_module_loaded(MI) || MI <- TestModules], + + gen_server:stop(Pid2), + ok. + +basic_modules_uploading_test(Config) -> + [ScenarioModuleInfo, HelperModuleInfo, _] = OriginalTestModules = + [proplists:get_value(Key, Config) || + Key <- [scenario_module, helper_module, module_a]], + + %% normally all the uploaded modules have invalid beam path + %% associated with them. + TestModules = [invalidate_filename(M) || M <- OriginalTestModules], + + ScenarioModule = maps:get(module, ScenarioModuleInfo), + HelperModule = maps:get(module, HelperModuleInfo), + + {ok, Pid} = amoc_code_server:start_link(), + + [?assertNot(code:is_loaded(M)) || #{module := M} <- TestModules], + assert_uploaded_modules_ets([]), + ?assertNot(amoc_code_server:does_scenario_exist(ScenarioModule)), + ConfigurableModules = amoc_code_server:list_configurable_modules(), + ScenarioModules = amoc_code_server:list_scenario_modules(), + + ExpectedUploadingRet = [{M, ok} || #{module := M} <- TestModules], + + ?assertEqualLists(ExpectedUploadingRet, upload_modules(TestModules)), + assert_uploaded_modules_ets(TestModules), + [assert_invalid_filename(MI) || MI <- TestModules], + [assert_module_loaded(MI) || MI <- TestModules], + + %% scenario and helper modules are added properly to the 'configurable_modules' ETS + ?assert(amoc_code_server:does_scenario_exist(ScenarioModule)), + ?assertEqualLists([HelperModule | ConfigurableModules], + amoc_code_server:list_configurable_modules()), + ?assertEqualLists([ScenarioModule | ScenarioModules], + amoc_code_server:list_scenario_modules()), + + %% uploading modules with the same MD5 just returns 'ok' + %% without actual modules reloading + ?assertEqualLists(ExpectedUploadingRet, upload_modules(OriginalTestModules)), + assert_uploaded_modules_ets(TestModules), + [?assertError(_, assert_module_loaded(MI)) || MI <- OriginalTestModules], + [assert_module_loaded(MI) || MI <- TestModules], + + %% scenario and helper modules are added properly to the 'configurable_modules' ETS + ?assert(amoc_code_server:does_scenario_exist(ScenarioModule)), + ?assertEqualLists([HelperModule | ConfigurableModules], + amoc_code_server:list_configurable_modules()), + ?assertEqualLists([ScenarioModule | ScenarioModules], + amoc_code_server:list_scenario_modules()), + + gen_server:stop(Pid). + +reuploading_modules_after_amoc_code_server_restart_test(Config) -> + [ScenarioModuleInfo, HelperModuleInfo, _] = OriginalTestModules = + [proplists:get_value(Key, Config) || + Key <- [scenario_module, helper_module, module_a]], + + %% normally all the uploaded modules have invalid beam path + %% associated with them. (that is the case when *.erl file is + %% compiled on another node and module is uploaded as a binary) + TestModules = [invalidate_filename(M) || M <- OriginalTestModules], + + ScenarioModule = maps:get(module, ScenarioModuleInfo), + HelperModule = maps:get(module, HelperModuleInfo), + + %% note that modules are loaded with invalid beam path before we start + %% amoc_code_sever. this precisely simulates amoc_code_sever restart + %% after initial modules uploading. + [load_module(MI) || MI <- TestModules], + + {ok, Pid} = amoc_code_server:start_link(), + + %% after restarting amoc_code_server 'uploaded_modules' ETS is empty. + %% however, both ScenarioModule and HelperModule must be added into + %% 'configurable_modules' ETS during the initial modules' analysis + assert_uploaded_modules_ets([]), + ConfigurableModules = amoc_code_server:list_configurable_modules(), + ?assert(lists:member(HelperModule, ConfigurableModules)), + ScenarioModules = amoc_code_server:list_scenario_modules(), + ?assert(lists:member(ScenarioModule, ScenarioModules)), + ?assert(amoc_code_server:does_scenario_exist(ScenarioModule)), + + %% reuploading must succeed, since MD5 of the loaded and reuploaded modules + %% are the same. however, none of the reuploaded modules can be added into + %% 'uploaded_modules' ETS. it happens because beam file names associated with + %% such modules are typically invalid and we cannot get modules' binary using + %% code:get_object_code/3 interface. + ExpectedUploadingRet = [{M, ok} || #{module := M} <- TestModules], + ?assertEqualLists(ExpectedUploadingRet, upload_modules(TestModules)), + assert_uploaded_modules_ets([]), + + %% also, 'configurable_modules' ETS must remain unchanged + ?assert(amoc_code_server:does_scenario_exist(ScenarioModule)), + ?assertEqualLists(ScenarioModules, amoc_code_server:list_scenario_modules()), + ?assertEqualLists(ConfigurableModules, amoc_code_server:list_configurable_modules()), + + gen_server:stop(Pid), + ok. + +uploading_release_modules_test(Config) -> + [ScenarioModuleInfo, HelperModuleInfo | _] = OriginalTestModules = + [proplists:get_value(Key, Config) || + Key <- [scenario_module, helper_module, module_a]], + + %% module_b has the same MD5 as module_a, but different binary representation. + %% also, for the sake of testing, let's invalidate beam file name for uploaded + %% scenario and helper modules. + TestModules = [proplists:get_value(module_b, Config) | + [invalidate_filename(M) || + M <- [ScenarioModuleInfo, HelperModuleInfo]]], + + ScenarioModule = maps:get(module, ScenarioModuleInfo), + HelperModule = maps:get(module, HelperModuleInfo), + + %% loading original test modules with correct beam file names before + %% amoc_code_server initialization. therefore, these modules are + %% no different from the regular modules included in the release. + [load_module(MI) || MI <- OriginalTestModules], + + {ok, Pid} = amoc_code_server:start_link(), + + %% after starting amoc_code_server 'uploaded_modules' ETS is empty. + %% however, both ScenarioModule and HelperModule must be added into + %% 'configurable_modules' ETS during the initial modules' analysis + assert_uploaded_modules_ets([]), + ConfigurableModules = amoc_code_server:list_configurable_modules(), + ?assert(lists:member(HelperModule, ConfigurableModules)), + ScenarioModules = amoc_code_server:list_scenario_modules(), + ?assert(lists:member(ScenarioModule, ScenarioModules)), + ?assert(amoc_code_server:does_scenario_exist(ScenarioModule)), + + %% uploading of the modules must succeed, since MD5 of the loaded and + %% uploaded modules are the same. however, no modules reloading should + %% take place and original versions of the modules must be added into + %% 'uploaded_modules' ETS. + ExpectedUploadingRet = [{M, ok} || #{module := M} <- TestModules], + ?assertEqualLists(ExpectedUploadingRet, upload_modules(TestModules)), + ?assertError(_, assert_uploaded_modules_ets(TestModules)), + assert_uploaded_modules_ets(OriginalTestModules), + [?assertError(_, assert_module_loaded(MI)) || MI <- TestModules], + [assert_module_loaded(MI) || MI <- OriginalTestModules], + + %% also, 'configurable_modules' ETS must remain unchanged + ?assert(amoc_code_server:does_scenario_exist(ScenarioModule)), + ?assertEqualLists(ScenarioModules, amoc_code_server:list_scenario_modules()), + ?assertEqualLists(ConfigurableModules, amoc_code_server:list_configurable_modules()), + + gen_server:stop(Pid), + ok. + +uploading_module_with_different_md5_test(Config) -> + [#{module := ModuleName} = ModuleInfoA, ModuleInfoB, ModuleInfoC] = + [proplists:get_value(Key, Config) || + Key <- [module_a, module_b, module_c]], + + {ok, Pid} = amoc_code_server:start_link(), + + %% note that test module is loaded only after the start of the amoc_code_server, + %% so they are not added into 'configurable_modules' ETS table during the initial + %% modules' analysis on amoc_code_server startup. + ConfigurableModules = amoc_code_server:list_configurable_modules(), + ScenarioModules = amoc_code_server:list_scenario_modules(), + + ?assertNot(code:is_loaded(ModuleName)), + assert_uploaded_modules_ets([]), + + ?assertEqual([{ModuleName, ok}], upload_modules([ModuleInfoA])), + assert_uploaded_modules_ets([ModuleInfoA]), + assert_module_loaded(ModuleInfoA), + + %% uploading module with the same MD5 just returns 'ok' + %% without actual modules reloading + ?assertEqual([{ModuleName, ok}], upload_modules([ModuleInfoB])), + ?assertError(_, assert_module_loaded(ModuleInfoB)), + assert_module_loaded(ModuleInfoA), + + %% uploading module with another MD5 returns error + %% without actual modules reloading + ?assertEqual([{ModuleName, {error, module_version_has_changed}}], + upload_modules([ModuleInfoC])), + ?assertError(_, assert_module_loaded(ModuleInfoC)), + assert_module_loaded(ModuleInfoA), + + %% none of the actions above lead to the changes + %% in the 'configurable_modules' ETS + ?assertEqualLists(ConfigurableModules, + amoc_code_server:list_configurable_modules()), + ?assertEqualLists(ScenarioModules, + amoc_code_server:list_scenario_modules()), + + gen_server:stop(Pid), + ok. + +%%--------------------------------------------------------- +%% local functions +%%--------------------------------------------------------- +extract_gen_server_call_request({'$gen_call', _From, CallData}) -> + %% this function crashes if supplied message is not gen_server call + CallData. + +start_system_events_logging(Pid) -> + ?assertEqual(ok, sys:log(Pid, true)), + ?assertEqual({ok, []}, sys:log(Pid, get)). + +stop_system_events_logging(Pid) -> + {ok, LoggedEvents} = sys:log(Pid, get), + ?assertEqual(ok, sys:log(Pid, false)), + LoggedEvents. + +recreate_working_dir(Config) -> + DataDir = proplists:get_value(data_dir, Config), + WorkingDir = filename:join(DataDir, "working_dir"), + case filelib:is_dir(WorkingDir) of + true -> ?assertEqual(ok, file:del_dir_r(WorkingDir)); + false -> ok + end, + ?assertEqual(ok, filelib:ensure_dir(WorkingDir)), + ?assertEqual(ok, file:make_dir(WorkingDir)), + WorkingDir. + +unload_modules_and_restore_code_path(Config) -> + [unload_module(ModuleInfo) || {_, #{module := _} = ModuleInfo} <- Config], + restore_code_path(proplists:get_value(original_code_path, Config)). + +generate_module(Config, TemplateVars) -> + IncompleteModuleInfo = generate_source_code(Config, TemplateVars), + compile_and_load_module(IncompleteModuleInfo). + +generate_source_code(Config, TemplateVars0) -> + DefaultTemplateVars = #{"module_name"=>"some_module", + "tag" => [$\n, $\n, $\n]}, + TemplateVars = maps:merge(DefaultTemplateVars, TemplateVars0), + DataDir = proplists:get_value(data_dir, Config), + TemplateFile = filename:join(DataDir, "module.mustache"), + Template = bbmustache:parse_file(TemplateFile), + SourceCode = bbmustache:compile(Template, TemplateVars), + % ct:pal("Module:~n~n----~n~s~n----~n", [SourceCode]), + ModuleName = maps:get("module_name", TemplateVars), + Module = list_to_atom(ModuleName), + WorkingDir = proplists:get_value(working_dir, Config), + ?assertEqual(true, filelib:is_dir(WorkingDir)), + #{module => Module, source_code => SourceCode, working_dir => WorkingDir}. + +restore_code_path(OriginalCodePath) -> + CurrentCodePath = code:get_path(), + PathsToRemove = CurrentCodePath -- OriginalCodePath, + ct:pal("PathsToRemove = ~p", [PathsToRemove]), + [?assertEqual(true, code:del_path(P)) || P<- PathsToRemove]. + +%% module info helper functions +compile_and_load_module(#{working_dir := WorkingDir, module := Module, + source_code := SourceCode} = ModuleInfo) -> + ModuleName = atom_to_list(Module), + OutDirTemplate = filename:join(WorkingDir, ModuleName ++ ".XXXXX"), + OutDir = string:trim(os:cmd("mktemp -d '" ++ OutDirTemplate ++ "'")), + % ct:pal("OutDirTemplate = '~s'", [OutDirTemplate]), + % ct:pal("OutDir = '~s'", [OutDir]), + ErlFile = filename:join(WorkingDir, ModuleName ++ ".erl"), + BeamFile = filename:join(OutDir, ModuleName ++ ".beam"), + file:write_file(ErlFile, SourceCode), + {ok, Module} = compile:file(filename:rootname(ErlFile), + [{outdir, OutDir}]), + code:purge(Module), %% purge old version of module, if any + {module, Module} = code:load_abs(filename:rootname(BeamFile)), + code:purge(Module), %% purge old version of module, if any + code:add_pathz(OutDir), %% required for code:get_object_code/1 + {Module, Binary, BeamFile} = code:get_object_code(Module), + true = code:del_path(OutDir), + MD5 = Module:module_info(md5), + % ct:pal("~p module MD5: ~p", [Module, MD5]), + % ct:pal("~p module bin: ~p", [Module, erlang:md5(Binary)]), + ModuleInfo#{beam_filename => BeamFile, erl_file => ErlFile, + out_dir => OutDir, binary => Binary, md5 => MD5, + %% '$RECORD_NAME' key is required for consistency with internal + %% uploaded_module record defined at amoc_code_sever + '$RECORD_NAME' => uploaded_module}. + +compare_module_info(ModuleInfoA, ModuleInfoB, EqualKeys0) + when map_size(ModuleInfoA) =:= map_size(ModuleInfoB) -> + ModuleInfoKeys = maps:keys(ModuleInfoA), + ?assertEqual(ModuleInfoKeys, maps:keys(ModuleInfoA)), + EqualKeys = ['$RECORD_NAME' | EqualKeys0], + DifferentKeys = ModuleInfoKeys -- EqualKeys, + compare_maps(ModuleInfoA, ModuleInfoB, EqualKeys, DifferentKeys). + +compare_maps(MapA, MapB, EqualKeys, DifferentKeys) -> + [?assertEqual(maps:get(Key, MapA), maps:get(Key, MapB)) || + Key <- EqualKeys], + [?assertNotEqual(maps:get(Key, MapA), maps:get(Key, MapB)) || + Key <- DifferentKeys]. + +verify_upload_module_requests(ModuleInfoList, GenServerCalls) -> + List1 = [map_to_uploaded_module(MI) || MI <- ModuleInfoList], + List2 = [ M || {upload_module, M} <- GenServerCalls], + ?assertEqualLists(List1, List2). + +unload_module(#{module := Module}) -> + unload_module(Module); +unload_module(Module) when is_atom(Module) -> + case erlang:module_loaded(Module) of + true -> + code:purge(Module), + ?assertEqual(true, code:delete(Module)), + code:purge(Module), + ?assertEqual(false, erlang:module_loaded(Module)), + ?assertEqual(false, erlang:check_old_code(Module)); + false -> ok + end. + +load_module(#{module := Module, binary := Binary, + beam_filename := BeamFile} = ModuleInfo) -> + unload_module(ModuleInfo), + code:load_binary(Module, BeamFile, Binary). + +invalidate_filename(#{module := Module} = ModuleInfo) -> + ModuleName = atom_to_list(Module), + %% this DummyBeamFilename path is intentionally invalid, so + %% amoc_code_server couldn't add a directory into code path + DummyBeamFilename = ?DUMMY_CODE_PATH ++ ModuleName ++ ".beam", + ModuleInfo#{beam_filename := DummyBeamFilename}. + +assert_invalid_filename(#{beam_filename := ?DUMMY_CODE_PATH ++ _ = Path, + module := Module}) -> + ?assertMatch({file, Path}, code:is_loaded(Module)); +assert_invalid_filename(Module) when is_atom(Module) -> + ?assertMatch({file, ?DUMMY_CODE_PATH ++ _}, code:is_loaded(Module)). + +assert_module_loaded(#{beam_filename := BeamFileA, module := Module, md5 := MD5}) -> + ?assertEqual(MD5, Module:module_info(md5)), + ?assertEqual({file, BeamFileA}, code:is_loaded(Module)), + ?assertEqual(BeamFileA, code:which(Module)). + +assert_uploaded_modules_ets(Modules) -> + ExpectedModules = [map_to_uploaded_module(M) || M <- Modules], + UploadedModules = ets:tab2list(uploaded_modules), + ?assertEqualLists(ExpectedModules, UploadedModules). + +upload_modules(Modules) -> + UploadedModuleRecords = + [{ModName, map_to_uploaded_module(M)} || #{module := ModName} = M <- Modules], + [{ModName, amoc_code_server:upload_module(node(), UploadedModule)} || + {ModName, UploadedModule} <- UploadedModuleRecords]. diff --git a/test/amoc_code_server_SUITE_data/module.mustache b/test/amoc_code_server_SUITE_data/module.mustache new file mode 100644 index 00000000..049caa20 --- /dev/null +++ b/test/amoc_code_server_SUITE_data/module.mustache @@ -0,0 +1,14 @@ + +%% this narrowed-down template allows generation of the module +%% with one and the same MD5 but a different binary returned +%% by the code:get_object_code/1 interface. + +-module({{module_name}}). + +{{tag}} + +-export([some_function/0]). + +some_function() -> ok. + +another_function() -> ok. diff --git a/test/amoc_config_scenario_SUITE.erl b/test/amoc_config_scenario_SUITE.erl index 23466c04..850a6a26 100644 --- a/test/amoc_config_scenario_SUITE.erl +++ b/test/amoc_config_scenario_SUITE.erl @@ -91,7 +91,7 @@ end_per_testcase(_, Config) -> parse_scenario_settings(_) -> mock_ets_tables(), - ets:insert(amoc_scenarios, {amoc_controller, configurable}), + ets:insert(configurable_modules, {amoc_controller, configurable}), ScenarioSettings = [{interarrival, 500}, {var1, def1}], Ret = amoc_config_scenario:parse_scenario_settings(?MODULE, ScenarioSettings), @@ -244,7 +244,7 @@ update_settings_undef_param(_) -> implicit_variable_redefinition(_) -> mock_ets_tables(), - ets:insert(amoc_scenarios, {?MODULE, configurable}), + ets:insert(configurable_modules, {?MODULE, configurable}), Ret = amoc_config_scenario:parse_scenario_settings(?MODULE, []), ?assertEqual({error, parameter_overriding, {var0, ?MODULE, ?MODULE}}, Ret), assert_no_update_calls(), @@ -296,7 +296,7 @@ invalid_module_attributes(_) -> mock_ets_tables() -> EtsOptions = [named_table, protected, {read_concurrency, true}], - amoc_scenarios = ets:new(amoc_scenarios, EtsOptions), + configurable_modules = ets:new(configurable_modules, EtsOptions), amoc_config_utils:create_amoc_config_ets(). assert_no_update_calls() -> diff --git a/test/amoc_coordinator_SUITE.erl b/test/amoc_coordinator_SUITE.erl index 2f69d3e3..88dc3f57 100644 --- a/test/amoc_coordinator_SUITE.erl +++ b/test/amoc_coordinator_SUITE.erl @@ -6,6 +6,9 @@ -compile(export_all). -define(MOCK_MOD, mock_mod). +-define(TELEMETRY_HANDLER, telemetry_handler). +-define(TELEMETRY_HANDLER_CONFIG, #{dummy_config => true}). + all() -> [execute_plan_without_timeout, @@ -17,14 +20,27 @@ init_per_suite(Config) -> meck:expect(?MOCK_MOD, f_1, ['_', '_'], ok), meck:expect(?MOCK_MOD, f_2, ['_', '_', '_'], ok), meck:expect(?MOCK_MOD, f_3, ['_', '_', '_', '_'], ok), + meck:new(?TELEMETRY_HANDLER, [non_strict, no_link]), + meck:expect(?TELEMETRY_HANDLER, handler, ['_', '_', '_', '_'], ok), + application:start(telemetry), + TelemetryEvents = [[amoc, coordinator, Event] || + Event <- [start, stop, timeout, reset, add]], + TelemetryHandler = fun ?TELEMETRY_HANDLER:handler/4, + telemetry:attach_many(?TELEMETRY_HANDLER, TelemetryEvents, + TelemetryHandler, ?TELEMETRY_HANDLER_CONFIG), Config. +end_per_suite(_Config) -> + application:stop(telemetry), + meck:unload(). + init_per_testcase(_, Config) -> meck:reset(?MOCK_MOD), + meck:reset(?TELEMETRY_HANDLER), Config. -end_per_suite(_Config) -> - meck:unload(). +end_per_testcase(_Config) -> + ok. execute_plan_without_timeout(_Config) -> N = 4, Name = ?FUNCTION_NAME, @@ -44,9 +60,10 @@ execute_plan_without_timeout(_Config) -> History = meck:history(?MOCK_MOD), [?assertEqual(stop, check_item_calls(History, Item, Tag, N)) || {Item, Tag} <- [{Item1, item1}, {Item2, item2}, {Item3, item3}, - {All1, all1}, {All2}]], + {All1, all1}, {All2, all2}]], - nothing_after_tags(History, [all1, all2]). + nothing_after_tags(History, [all1, all2]), + assert_telemetry_events(Name, [start, {N, add}, stop]). reset_plan_without_timeout(_Config) -> N1 = 5, N2 = 6, Name = ?FUNCTION_NAME, @@ -57,16 +74,19 @@ reset_plan_without_timeout(_Config) -> All2 = {all, mocked_action(all2, 1)}, Item3 = {3, mocked_action(item3, 1)}], - ?assertEqual(ok, amoc_coordinator:start(Name, Plan, infinity)), + ?assertEqual(ok, amoc_coordinator:start(Name, Plan, 1)), %% timeout is 1 second [amoc_coordinator:add(Name, User) || User <- lists:seq(1, N1)], amoc_coordinator:reset(Name), meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {reset, '_'}], 1000), + %% ensure that timeout doesn't occur after reset + ?assertError(timeout, meck:wait(?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000)), + History1 = meck:history(?MOCK_MOD), [?assertEqual(reset, check_item_calls(History1, Item, Tag, N1)) || {Item, Tag} <- [{Item1, item1}, {Item2, item2}, {Item3, item3}, - {All1, all1}, {All2}]], + {All1, all1}, {All2, all2}]], nothing_after_tags(History1, [all1, all2]), @@ -76,16 +96,47 @@ reset_plan_without_timeout(_Config) -> amoc_coordinator:reset(Name), meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {reset, '_'}], 1000), + %% ensure that timeout doesn't occur after reset + ?assertError(timeout, meck:wait(?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000)), + History2 = meck:history(?MOCK_MOD), [?assertEqual(reset, check_item_calls(History2, Item, Tag, N2)) || {Item, Tag} <- [{Item1, item1}, {Item2, item2}, {Item3, item3}, - {All1, all1}, {All2}]], + {All1, all1}, {All2, all2}]], nothing_after_tags(History2, [all1, all2]), + meck:reset(?MOCK_MOD), + + %% reset can be triggered twice in a row, and all the handlers + %% are triggered twice in this case + amoc_coordinator:reset(Name), + meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {reset, '_'}], 1000), + + %% ensure that timeout doesn't occur after reset + ?assertError(timeout, meck:wait(?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000)), + + History3 = meck:history(?MOCK_MOD), + [?assertEqual(reset, check_item_calls(History3, Item, Tag, 0)) || + {Item, Tag} <- [{Item1, item1}, {Item2, item2}, {Item3, item3}, + {All1, all1}, {All2, all2}]], + + nothing_after_tags(History3, [all1, all2]), + + meck:reset(?MOCK_MOD), amoc_coordinator:stop(Name), - meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000). + meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000), + + History4 = meck:history(?MOCK_MOD), + [?assertEqual(stop, check_item_calls(History4, Item, Tag, 0)) || + {Item, Tag} <- [{Item1, item1}, {Item2, item2}, {Item3, item3}, + {All1, all1}, {All2, all2}]], + + nothing_after_tags(History4, [all1, all2]), + + assert_telemetry_events(Name, [start, {N1, add}, reset, + {N2, add}, {2, reset}, stop]). execute_plan_with_timeout(_Config) -> @@ -97,11 +148,16 @@ execute_plan_with_timeout(_Config) -> All2 = {all, mocked_action(all2, 1)}, Item3 = {3, mocked_action(item3, 1)}], - ?assertEqual(ok, amoc_coordinator:start(Name, Plan, 1)), + ?assertEqual(ok, amoc_coordinator:start(Name, Plan, 1)), %% timeout is 1 second + + %% ensure there's no timeout happens if no users are added yet. + ?assertError(timeout, meck:wait(?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000)), + [amoc_coordinator:add(Name, User) || User <- lists:seq(1, N1)], meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000), + %% ensure that timeout occurs just once if no new users added. ?assertError(timeout, meck:wait(length(Plan) + 1, ?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000)), History1 = meck:history(?MOCK_MOD), @@ -116,6 +172,7 @@ execute_plan_with_timeout(_Config) -> meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000), + %% ensure that timeout occurs just once if no new users added. ?assertError(timeout, meck:wait(length(Plan) + 1, ?MOCK_MOD, f_1, ['_', {timeout, '_'}], 2000)), History2 = meck:history(?MOCK_MOD), @@ -125,8 +182,20 @@ execute_plan_with_timeout(_Config) -> nothing_after_tags(History2, [all1, all2]), + meck:reset(?MOCK_MOD), + amoc_coordinator:stop(Name), - meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000). + meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000), + + History3 = meck:history(?MOCK_MOD), + [?assertEqual(stop, check_item_calls(History3, Item, Tag, 0)) || + {Item, Tag} <- [{Item1, item1}, {Item2, item2}, {Item3, item3}, + {All1, all1}, {All2, all2}]], + + nothing_after_tags(History2, [all1, all2]), + + assert_telemetry_events(Name, [start, {N1, add}, timeout, + {N2, add}, timeout, stop]). %% Helpers @@ -228,3 +297,39 @@ distinct_pairs(Acc, [Element1 | Tail]) -> %% Tail has at least 2 elements NewAcc = [{Element1, Element2} || Element2 <- Tail] ++ Acc, distinct_pairs(NewAcc, Tail). + +assert_telemetry_events(Name, EventList) -> + History = meck:history(?TELEMETRY_HANDLER), + ct:pal("meck history = ~p", [History]), + UnfoldedEventList = unfold_event_list(EventList), + assert_telemetry_events(Name, History, UnfoldedEventList), + ok. + +unfold_event_list(EventList) -> + lists:flatten( + [case E of + {N, Event} when is_integer(N) andalso N > 0 -> + [Event || _ <- lists:seq(1, N)]; + Event -> Event + end || E <- EventList]). + +assert_telemetry_events(_Name, [], []) -> ok; +assert_telemetry_events(_Name, History, EventList) + when length(History) > length(EventList) -> + ct:fail("unexpected telemetry events:~n ~p~n ~p", [History, EventList]); +assert_telemetry_events(_Name, History, EventList) + when length(History) < length(EventList) -> + ct:fail("missing telemetry events:~n ~p~n ~p", [History, EventList]); +assert_telemetry_events(Name, [{_Pid, Call, _Ret} | History], [Event | EventList]) -> + assert_telemetry_handler_call(Name, Call, Event), + assert_telemetry_events(Name, History, EventList). + +assert_telemetry_handler_call(Name, Call, Event) -> + EventName = [amoc, coordinator, Event], + Measurements = #{count => 1}, + EventMetadata = #{name => Name}, + HandlerConfig = ?TELEMETRY_HANDLER_CONFIG, + ExpectedHandlerCall = {?TELEMETRY_HANDLER, handler, + [EventName, Measurements, + EventMetadata, HandlerConfig]}, + ?assertEqual(ExpectedHandlerCall, Call). diff --git a/test/scenario_template.hrl b/test/scenario_template.hrl deleted file mode 100644 index 70cefe1a..00000000 --- a/test/scenario_template.hrl +++ /dev/null @@ -1,22 +0,0 @@ --define(DUMMY_SCENARIO_MODULE(Name), <<" -%%============================================================================== -%% @doc -%% some edoc -%% @end -%%============================================================================== --module(",(atom_to_binary(Name,utf8))/binary,"). --behaviour(amoc_scenario). - --required_variable(#{name => some_parameter, description => \"some parameter\"}). - --export([start/1]). --export([init/0]). - --spec init() -> ok. -init() -> - ok. - --spec start(amoc_scenario:user_id()) -> any(). -start(Id) -> - amoc_user:stop(). -">>). \ No newline at end of file