Skip to content

Commit

Permalink
Merge pull request #297 from emizio/lm/handle_gaps_in_stream_forward_…
Browse files Browse the repository at this point in the history
…or_back

Stream.stream_forward and Stream.stream_backward should handle gaps
  • Loading branch information
cdegroot authored Feb 26, 2025
2 parents 362f087 + aa24810 commit 8060344
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
4 changes: 2 additions & 2 deletions lib/event_store/streams/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ defmodule EventStore.Streams.Stream do
fn next_version ->
case read_storage_forward(conn, stream, next_version, read_batch_size, opts) do
{:ok, []} -> {:halt, next_version}
{:ok, events} -> {events, next_version + length(events)}
{:ok, events} -> {events, List.last(events).event_number + 1}
end
end,
fn _next_version -> :ok end
Expand All @@ -282,7 +282,7 @@ defmodule EventStore.Streams.Stream do
next_version ->
case read_storage_backward(conn, stream, next_version, read_batch_size, opts) do
{:ok, []} -> {:halt, next_version}
{:ok, events} -> {events, next_version - length(events)}
{:ok, events} -> {events, List.last(events).event_number - 1}
end
end,
fn _next_version -> :ok end
Expand Down
57 changes: 55 additions & 2 deletions test/streams/all_stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule EventStore.Streams.AllStreamTest do
end

describe "stream forward" do
setup [:append_events_to_streams]
setup [:append_events_to_streams, :enable_hard_deletes]

test "should stream events from all streams using single event batch size", %{
conn: conn,
Expand Down Expand Up @@ -94,10 +94,31 @@ defmodule EventStore.Streams.AllStreamTest do
assert Enum.map(read_events, & &1.event_number) == [1, 2, 3, 4, 5, 6]
assert Enum.map(read_events, & &1.stream_version) == [1, 2, 3, 1, 2, 3]
end

test "should handle gaps in all stream", %{
conn: conn,
schema: schema,
serializer: serializer,
stream1_uuid: stream1_uuid
} do

:ok = EventStore.delete_stream(stream1_uuid, :stream_exists, :hard)

read_events =
Stream.stream_forward(conn, @all_stream, 0,
read_batch_size: 1,
schema: schema,
serializer: serializer
)
|> Enum.to_list()

assert length(read_events) == 3
assert Enum.map(read_events, & &1.event_number) == [4, 5, 6]
end
end

describe "stream backward" do
setup [:append_events_to_streams]
setup [:append_events_to_streams, :enable_hard_deletes]

test "should stream events from all streams using single event batch size", %{
conn: conn,
Expand Down Expand Up @@ -181,6 +202,27 @@ defmodule EventStore.Streams.AllStreamTest do
assert Enum.map(read_events, & &1.event_number) == [3, 2, 1]
assert Enum.map(read_events, & &1.stream_version) == [3, 2, 1]
end

test "should handle gaps in all stream", %{
conn: conn,
schema: schema,
serializer: serializer,
stream2_uuid: stream2_uuid
} do

:ok = EventStore.delete_stream(stream2_uuid, :stream_exists, :hard)

read_events =
Stream.stream_backward(conn, @all_stream, -1,
read_batch_size: 1,
schema: schema,
serializer: serializer
)
|> Enum.to_list()

assert length(read_events) == 3
assert Enum.map(read_events, & &1.event_number) == [3, 2, 1]
end
end

describe "subscribe to all streams" do
Expand Down Expand Up @@ -284,4 +326,15 @@ defmodule EventStore.Streams.AllStreamTest do

{stream_uuid, events}
end

defp enable_hard_deletes(_context) do
restart_event_store_with_config(enable_hard_deletes: true)
end

defp restart_event_store_with_config(config) do
stop_supervised!(TestEventStore)
start_supervised!({TestEventStore, config})

:ok
end
end

0 comments on commit 8060344

Please sign in to comment.