From 1b97ea20809d9969775bd7f874fec092fb66fb1b Mon Sep 17 00:00:00 2001 From: Adam Rutkowski Date: Wed, 26 Feb 2025 09:14:21 +0100 Subject: [PATCH] Revert "Balance sessions across worker queues instead of explicit locks (#5113)" This reverts commit ccdf50b7349b1cc1da3f8804b583f32ab3563eb9. --- config/runtime.exs | 2 + lib/plausible/application.ex | 1 - lib/plausible/ingestion/event.ex | 2 +- lib/plausible/session/balancer.ex | 39 ------------------ lib/plausible/session/balancer_supervisor.ex | 33 --------------- lib/plausible/session/cache_store.ex | 42 ++++++-------------- test/plausible/ingestion/event_test.exs | 13 +++--- test/plausible/session/cache_store_test.exs | 42 ++++++++++---------- test/support/test_utils.ex | 5 +-- 9 files changed, 42 insertions(+), 137 deletions(-) delete mode 100644 lib/plausible/session/balancer.ex delete mode 100644 lib/plausible/session/balancer_supervisor.ex diff --git a/config/runtime.exs b/config/runtime.exs index 16b2befd1a7c..47b702e5ad1a 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -977,4 +977,6 @@ unless s3_disabled? do imports_bucket: s3_env_value.("S3_IMPORTS_BUCKET") end +config :plausible, Plausible.Cache.Adapter, sessions: [partitions: 4] + config :phoenix_storybook, enabled: env !== "prod" diff --git a/lib/plausible/application.ex b/lib/plausible/application.ex index 391047e74df2..2ff596170789 100644 --- a/lib/plausible/application.ex +++ b/lib/plausible/application.ex @@ -15,7 +15,6 @@ defmodule Plausible.Application do children = [ - Plausible.Session.BalancerSupervisor, Plausible.Cache.Stats, Plausible.PromEx, {Plausible.Auth.TOTP.Vault, key: totp_vault_key()}, diff --git a/lib/plausible/ingestion/event.ex b/lib/plausible/ingestion/event.ex index d08e8d4c26a5..9c7c3561500f 100644 --- a/lib/plausible/ingestion/event.ex +++ b/lib/plausible/ingestion/event.ex @@ -393,7 +393,7 @@ defmodule Plausible.Ingestion.Event do event.clickhouse_event, event.clickhouse_session_attrs, previous_user_id, - buffer_insert: write_buffer_insert + write_buffer_insert ) case session_result do diff --git a/lib/plausible/session/balancer.ex b/lib/plausible/session/balancer.ex deleted file mode 100644 index 11235adcd033..000000000000 --- a/lib/plausible/session/balancer.ex +++ /dev/null @@ -1,39 +0,0 @@ -defmodule Plausible.Session.Balancer do - @moduledoc "Serialize session processing to avoid explicit locks" - use GenServer - - def start_link(id) do - GenServer.start_link(__MODULE__, id, name: via(id)) - end - - @impl true - def init(id) do - {:ok, %{id: id}} - end - - def dispatch(user_id, fun, opts) do - timeout = Keyword.fetch!(opts, :timeout) - local? = Keyword.get(opts, :local?, false) - - if local? do - fun.() - else - worker = :erlang.phash2(user_id, Plausible.Session.BalancerSupervisor.size()) + 1 - [{pid, _}] = Registry.lookup(Plausible.Session.Balancer.Registry, worker) - GenServer.call(pid, {:process, fun}, timeout) - end - end - - @impl true - def handle_call({:process, fun}, _from, state) do - try do - response = fun.() - {:reply, response, state} - rescue - e -> - {:reply, {:error, e}, state} - end - end - - defp via(id), do: {:via, Registry, {Plausible.Session.Balancer.Registry, id}} -end diff --git a/lib/plausible/session/balancer_supervisor.ex b/lib/plausible/session/balancer_supervisor.ex deleted file mode 100644 index e35969841d80..000000000000 --- a/lib/plausible/session/balancer_supervisor.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Plausible.Session.BalancerSupervisor do - @moduledoc "Serialize session processing to avoid explicit locks" - use Supervisor - - if Mix.env() in [:test, :ce_test] do - def size(), - do: 10 - else - def size(), do: 100 - end - - def start_link(_) do - Supervisor.start_link(__MODULE__, size(), name: __MODULE__) - end - - def init(size) do - children = - for id <- 1..size do - %{ - id: id, - start: {Plausible.Session.Balancer, :start_link, [id]}, - restart: :permanent - } - end - - Supervisor.init( - [ - {Registry, [keys: :unique, name: Plausible.Session.Balancer.Registry]} | children - ], - strategy: :one_for_one - ) - end -end diff --git a/lib/plausible/session/cache_store.ex b/lib/plausible/session/cache_store.ex index f115698f46d6..54a5bb488fb5 100644 --- a/lib/plausible/session/cache_store.ex +++ b/lib/plausible/session/cache_store.ex @@ -2,43 +2,27 @@ defmodule Plausible.Session.CacheStore do require Logger alias Plausible.Session.WriteBuffer - @lock_timeout 1000 + @lock_timeout 500 @lock_telemetry_event [:plausible, :sessions, :cache, :lock] def lock_telemetry_event, do: @lock_telemetry_event - def on_event(event, session_attributes, prev_user_id, opts \\ []) do - buffer_insert = Keyword.get(opts, :buffer_insert, &WriteBuffer.insert/1) - skip_balancer? = Keyword.get(opts, :skip_balancer?, false) + def on_event(event, session_attributes, prev_user_id, buffer_insert \\ &WriteBuffer.insert/1) do lock_requested_at = System.monotonic_time() - try do - response = - Plausible.Session.Balancer.dispatch( - event.user_id, - fn -> - lock_duration = System.monotonic_time() - lock_requested_at - :telemetry.execute(@lock_telemetry_event, %{duration: lock_duration}, %{}) - - found_session = - find_session(event, event.user_id) || find_session(event, prev_user_id) - - handle_event(event, found_session, session_attributes, buffer_insert) - end, - timeout: @lock_timeout, - local?: skip_balancer? - ) - - case response do - {:error, e} -> raise e - _ -> {:ok, response} + Plausible.Cache.Adapter.with_lock( + :sessions, + {event.site_id, event.user_id}, + @lock_timeout, + fn -> + lock_duration = System.monotonic_time() - lock_requested_at + :telemetry.execute(@lock_telemetry_event, %{duration: lock_duration}, %{}) + found_session = find_session(event, event.user_id) || find_session(event, prev_user_id) + + handle_event(event, found_session, session_attributes, buffer_insert) end - catch - :exit, {:timeout, _} -> - Sentry.capture_message("Timeout while handling session event") - {:error, :timeout} - end + ) end defp handle_event(%{name: "engagement"} = event, found_session, _, _) do diff --git a/test/plausible/ingestion/event_test.exs b/test/plausible/ingestion/event_test.exs index 38e185ebc619..0b64ff341f83 100644 --- a/test/plausible/ingestion/event_test.exs +++ b/test/plausible/ingestion/event_test.exs @@ -1,5 +1,5 @@ defmodule Plausible.Ingestion.EventTest do - use Plausible.DataCase, async: false + use Plausible.DataCase, async: true use Plausible.Teams.Test import Phoenix.ConnTest @@ -282,9 +282,10 @@ defmodule Plausible.Ingestion.EventTest do test = self() - very_slow_buffer = fn _sessions -> + very_slow_buffer = fn sessions -> send(test, :slow_buffer_insert_started) - Process.sleep(800) + Process.sleep(1000) + Plausible.Session.WriteBuffer.insert(sessions) end first_conn = @@ -314,11 +315,7 @@ defmodule Plausible.Ingestion.EventTest do receive do :slow_buffer_insert_started -> - assert {:ok, %{buffered: [], dropped: [dropped]}} = - Event.build_and_buffer(second_request, - session_write_buffer_insert: very_slow_buffer - ) - + assert {:ok, %{buffered: [], dropped: [dropped]}} = Event.build_and_buffer(second_request) assert dropped.drop_reason == :lock_timeout end end diff --git a/test/plausible/session/cache_store_test.exs b/test/plausible/session/cache_store_test.exs index dd215b4442d6..1bf2a45fc0bc 100644 --- a/test/plausible/session/cache_store_test.exs +++ b/test/plausible/session/cache_store_test.exs @@ -43,13 +43,11 @@ defmodule Plausible.Session.CacheStoreTest do } do telemetry_event = CacheStore.lock_telemetry_event() - test_pid = self() - :telemetry.attach( "#{test}-telemetry-handler", telemetry_event, fn ^telemetry_event, %{duration: d}, _, _ when is_integer(d) -> - send(test_pid, {:telemetry_handled, d}) + send(self(), {:telemetry_handled, d}) end, %{} ) @@ -58,7 +56,7 @@ defmodule Plausible.Session.CacheStoreTest do event2 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id) event3 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id) - CacheStore.on_event(event1, @session_params, nil, buffer_insert: buffer) + CacheStore.on_event(event1, @session_params, nil, buffer) assert_receive({:buffer, :insert, [[session1]]}) assert_receive({:telemetry_handled, duration}) @@ -67,7 +65,7 @@ defmodule Plausible.Session.CacheStoreTest do [event2, event3] |> Enum.map(fn e -> Task.async(fn -> - CacheStore.on_event(e, @session_params, nil, buffer_insert: slow_buffer) + CacheStore.on_event(e, @session_params, nil, slow_buffer) end) end) |> Task.await_many() @@ -124,7 +122,7 @@ defmodule Plausible.Session.CacheStoreTest do async1 = Task.async(fn -> - CacheStore.on_event(event1, @session_params, nil, buffer_insert: very_slow_buffer) + CacheStore.on_event(event1, @session_params, nil, very_slow_buffer) end) # Ensure next events are executed after processing event1 starts @@ -132,12 +130,12 @@ defmodule Plausible.Session.CacheStoreTest do async2 = Task.async(fn -> - CacheStore.on_event(event2, @session_params, nil, buffer_insert: buffer) + CacheStore.on_event(event2, @session_params, nil, buffer) end) async3 = Task.async(fn -> - CacheStore.on_event(event3, @session_params, nil, buffer_insert: buffer) + CacheStore.on_event(event3, @session_params, nil, buffer) end) Task.await_many([async1, async2, async3]) @@ -162,7 +160,7 @@ defmodule Plausible.Session.CacheStoreTest do async1 = Task.async(fn -> - CacheStore.on_event(event1, @session_params, nil, buffer_insert: very_slow_buffer) + CacheStore.on_event(event1, @session_params, nil, very_slow_buffer) end) # Ensure next events are executed after processing event1 starts @@ -170,14 +168,14 @@ defmodule Plausible.Session.CacheStoreTest do async2 = Task.async(fn -> - CacheStore.on_event(event2, @session_params, nil, buffer_insert: buffer) + CacheStore.on_event(event2, @session_params, nil, buffer) end) Process.sleep(100) async3 = Task.async(fn -> - CacheStore.on_event(event3, @session_params, nil, buffer_insert: buffer) + CacheStore.on_event(event3, @session_params, nil, buffer) end) Task.await_many([async1, async2, async3]) @@ -200,7 +198,7 @@ defmodule Plausible.Session.CacheStoreTest do event = build(:event, name: "pageview") assert_raise RuntimeError, "boom", fn -> - CacheStore.on_event(event, @session_params, nil, buffer_insert: crashing_buffer) + CacheStore.on_event(event, @session_params, nil, crashing_buffer) end end @@ -212,7 +210,7 @@ defmodule Plausible.Session.CacheStoreTest do "meta.value": ["true", "false"] ) - CacheStore.on_event(event, @session_params, nil, buffer_insert: buffer) + CacheStore.on_event(event, @session_params, nil, buffer) assert_receive({:buffer, :insert, [sessions]}) assert [session] = sessions @@ -255,8 +253,8 @@ defmodule Plausible.Session.CacheStoreTest do | timestamp: timestamp } - CacheStore.on_event(event1, %{}, nil, buffer_insert: buffer) - CacheStore.on_event(event2, %{}, nil, buffer_insert: buffer) + CacheStore.on_event(event1, %{}, nil, buffer) + CacheStore.on_event(event2, %{}, nil, buffer) assert_receive({:buffer, :insert, [[_negative_record, session]]}) assert session.is_bounce == false assert session.duration == 10 @@ -269,8 +267,8 @@ defmodule Plausible.Session.CacheStoreTest do pageview = build(:pageview, timestamp: NaiveDateTime.shift(now, second: -10)) engagement = %{pageview | name: "engagement", timestamp: now} - CacheStore.on_event(pageview, %{}, nil, buffer_insert: buffer) - CacheStore.on_event(engagement, %{}, nil, buffer_insert: buffer) + CacheStore.on_event(pageview, %{}, nil, buffer) + CacheStore.on_event(engagement, %{}, nil, buffer) assert_receive({:buffer, :insert, [[session]]}) assert session.is_bounce == true @@ -284,7 +282,7 @@ defmodule Plausible.Session.CacheStoreTest do pageview1 = build(:event, name: "pageview", timestamp: start) - CacheStore.on_event(pageview1, %{}, nil, buffer_insert: buffer) + CacheStore.on_event(pageview1, %{}, nil, buffer) assert_receive({:buffer, :insert, [[start_session]]}) for delta <- [20, 40, 60] do @@ -294,11 +292,11 @@ defmodule Plausible.Session.CacheStoreTest do timestamp: start |> NaiveDateTime.shift(minute: delta) }) - CacheStore.on_event(engagement, %{}, nil, buffer_insert: buffer) + CacheStore.on_event(engagement, %{}, nil, buffer) end pageview2 = Map.put(pageview1, :timestamp, start |> NaiveDateTime.shift(minute: 80)) - CacheStore.on_event(pageview2, %{}, nil, buffer_insert: buffer) + CacheStore.on_event(pageview2, %{}, nil, buffer) assert_receive({:buffer, :insert, [[_negative_record, updated_session]]}) assert updated_session.session_id == start_session.session_id @@ -502,8 +500,8 @@ defmodule Plausible.Session.CacheStoreTest do event2 = %{event1 | timestamp: timestamp} - CacheStore.on_event(event1, %{}, nil, buffer_insert: buffer) - CacheStore.on_event(event2, %{}, nil, buffer_insert: buffer) + CacheStore.on_event(event1, %{}, nil, buffer) + CacheStore.on_event(event2, %{}, nil, buffer) assert_receive({:buffer, :insert, [[_negative_record, session]]}) assert session.duration == 10 diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index f404457aa73b..0707b72f1535 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -191,10 +191,7 @@ defmodule Plausible.TestUtils do defp populate_native_stats(events) do for event_params <- events do - {:ok, session} = - Plausible.Session.CacheStore.on_event(event_params, event_params, nil, - skip_balancer?: true - ) + {:ok, session} = Plausible.Session.CacheStore.on_event(event_params, event_params, nil) event_params |> Plausible.ClickhouseEventV2.merge_session(session)