Skip to content

Commit

Permalink
Remove gaps in InMemoryIndex segment versions
Browse files Browse the repository at this point in the history
  • Loading branch information
lalinsky committed Oct 14, 2024
1 parent 939b4fa commit 109f078
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 40 deletions.
55 changes: 20 additions & 35 deletions src/InMemoryIndex.zig
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,13 @@ pub fn update(self: *Self, changes: []const Change) !void {

node.data.ensureSorted();

if (node.data.items.items.len > self.max_items_per_segment) {
node.data.frozen = true;
}

self.write_lock.lock();
self.segments.append(node);
if (node.prev) |prev| {
node.data.version = prev.data.version + 1;
} else {
node.data.version = 1;
}
self.checkSegments();
committed = true;
self.write_lock.unlock();

Expand Down Expand Up @@ -152,7 +147,7 @@ fn prepareMerge(self: *Self) !?Merge {
var num_segments: usize = 0;
var segments_iter = self.segments.first;
while (segments_iter) |node| : (segments_iter = node.next) {
if (node.data.frozen) {
if (node.data.frozen or node.data.items.items.len > self.max_items_per_segment) {
continue;
}
num_segments += 1;
Expand All @@ -164,17 +159,17 @@ fn prepareMerge(self: *Self) !?Merge {
}

var best_node: ?*Segments.Node = null;
var best_score: f32 = std.math.inf(f32);
var best_score: f64 = std.math.inf(f64);
segments_iter = self.segments.first;
var level_size = total_size / 2;
var level_size = @as(f64, @floatFromInt(total_size)) / 2;
while (segments_iter) |node| : (segments_iter = node.next) {
if (node.data.frozen) {
if (node.data.frozen or node.data.items.items.len > self.max_items_per_segment) {
continue;
}
if (node.next) |nextNode| {
const size = node.data.items.items.len + nextNode.data.items.items.len;
const raw_score = if (size > level_size) size - level_size else level_size - size;
const score = std.math.log2(@as(f32, @floatFromInt(raw_score)));
const merge_size = node.data.items.items.len + nextNode.data.items.items.len;
const score = @as(f64, @floatFromInt(merge_size)) - level_size;
// std.debug.print("segment {} {} level_size={}, merge_size={} score={}\n", .{ node.data.version, node.data.items.items.len, level_size, merge_size, score });
if (score < best_score) {
best_node = node;
best_score = score;
Expand Down Expand Up @@ -209,10 +204,7 @@ fn prepareMerge(self: *Self) !?Merge {
node.data.deinit();
}
}
node.data.version = segment2.version;
node.data.merged = segment1.merged + segment2.merged + 1;

log.debug("Merging in-memory segments {}:{} and {}:{}", .{ segment1.version, segment1.merged, segment2.version, segment2.merged });
node.data.version = segment1.version;

var total_docs: usize = 0;
var total_items: usize = 0;
Expand Down Expand Up @@ -254,22 +246,16 @@ fn prepareMerge(self: *Self) !?Merge {

node.data.ensureSorted();

if (node.data.items.items.len > self.max_items_per_segment) {
node.data.frozen = true;
}

committed = true;
return merge;
}

fn checkSegments(self: *Self) void {
if (std.debug.runtime_safety) {
var iter = self.segments.first;
while (iter) |node| : (iter = node.next) {
if (node.prev) |prev| {
assert(node.data.version == 1 + node.data.merged + prev.data.version);
} else {
assert(node.data.version == 1 + node.data.merged);
var iter = self.segments.first;
while (iter) |node| : (iter = node.next) {
if (node.prev) |prev| {
if (!node.data.frozen) {
node.data.version = prev.data.version + 1;
}
}
}
Expand All @@ -279,13 +265,11 @@ fn commitMerge(self: *Self, merge: Merge) void {
self.write_lock.lock();
defer self.write_lock.unlock();

log.debug("Adding in-memory segment {}:{}", .{ merge.replacement.data.version, merge.replacement.data.merged });
self.segments.insertAfter(merge.last, merge.replacement);

var iter: ?*Segments.Node = merge.first;
while (iter) |node| {
iter = node.next;
log.debug("Removing in-memory segment {}:{}", .{ node.data.version, node.data.merged });
self.segments.remove(node);
self.destroyNode(node);
if (node == merge.last) break;
Expand All @@ -304,7 +288,7 @@ fn mergeSegments(self: *Self) !void {
}
}

pub fn freezeFirstSegment(self: *Self, if_larger_than: usize) ?*Segment {
pub fn freezeFirstSegment(self: *Self) ?*Segment {
self.merge_lock.lock();
defer self.merge_lock.unlock();

Expand All @@ -318,8 +302,7 @@ pub fn freezeFirstSegment(self: *Self, if_larger_than: usize) ?*Segment {
if (segment.frozen) {
return segment;
}
log.debug("segment {} has {} items", .{ segment.version, segment.items.items.len });
if (segment.items.items.len >= if_larger_than) {
if (segment.items.items.len >= self.max_items_per_segment) {
segment.frozen = true;
return segment;
}
Expand Down Expand Up @@ -489,12 +472,14 @@ test "freeze segment" {
var index = Self.init(std.testing.allocator);
defer index.deinit();

index.max_items_per_segment = 100;

try index.update(&[_]Change{.{ .insert = .{
.id = 1,
.hashes = &[_]u32{ 1, 2, 3 },
} }});

const segment1 = index.freezeFirstSegment(0);
const segment1 = index.freezeFirstSegment();
try std.testing.expect(segment1 == null);

for (1..100) |i| {
Expand All @@ -504,10 +489,10 @@ test "freeze segment" {
} }});
}

const segment2 = index.freezeFirstSegment(0);
const segment2 = index.freezeFirstSegment();
try std.testing.expect(segment2 != null);
try std.testing.expect(segment2.?.frozen);

const segment3 = index.freezeFirstSegment(0);
const segment3 = index.freezeFirstSegment();
try std.testing.expect(segment3 == segment2);
}
1 change: 0 additions & 1 deletion src/InMemorySegment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ version: u32,
docs: std.AutoHashMap(u32, bool),
items: std.ArrayList(Item),
frozen: bool = false,
merged: u32 = 0,

pub fn init(allocator: std.mem.Allocator) Self {
return .{
Expand Down
6 changes: 2 additions & 4 deletions src/Index.zig
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ last_cleanup_at: i64 = 0,
cleanup_interval: i64 = 1000,
run_cleanup: bool = true,

const min_segment_size = 1_000_000;

const Task = union(enum) {
cleanup: void,

Expand Down Expand Up @@ -76,9 +74,9 @@ fn cleanup(self: *Self) !void {

// try self.stage.cleanup();

const segment = self.stage.freezeFirstSegment(min_segment_size);
const segment = self.stage.freezeFirstSegment();
if (segment) |s| {
const name = try std.fmt.allocPrint(self.allocator, "segment_{}_{}.dat", .{ s.version - s.merged, s.version });
const name = try std.fmt.allocPrint(self.allocator, "segment_{}.dat", .{s.version});
defer self.allocator.free(name);
log.info("writing segment {s} to disk", .{name});
var file = try self.dir.createFile(name, .{});
Expand Down

0 comments on commit 109f078

Please sign in to comment.