From 6de43a82c0e4830312a6019e1862d5703059b9c3 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Sat, 22 Feb 2025 16:55:28 +1000 Subject: [PATCH] cherrypick changes to file and chunk from i174-multichunk --- internal/chunk/chunk.go | 15 ++++- internal/chunk/chunk_test.go | 39 +++++++++++++ internal/chunk/file.go | 67 +++++++++-------------- internal/chunk/file_test.go | 103 ++++++++++++++--------------------- 4 files changed, 120 insertions(+), 104 deletions(-) diff --git a/internal/chunk/chunk.go b/internal/chunk/chunk.go index bcbb766e..39db9f44 100644 --- a/internal/chunk/chunk.go +++ b/internal/chunk/chunk.go @@ -27,7 +27,7 @@ const ( CSearchMessages CSearchFiles - CAny ChunkType = 255 // special value for any type + CAny ChunkType = 255 // Special type, meaning any type of chunk. ) var ErrUnsupChunkType = fmt.Errorf("unsupported chunk type") @@ -186,6 +186,19 @@ func (id GroupID) IsThread() bool { return strings.HasPrefix(string(id), threadPrefix) } +// ExtractChannelID attempts to extract the channel ID from the GroupID if it +// is a channel or a thread ID. Otherwise, ok will be false. +func (id GroupID) ExtractChannelID() (channelID string, ok bool) { + if id.IsThread() { + channelID, _, ok = id.AsThreadID() + return + } + if id.IsChannel() { + return id.AsChannelID() + } + return "", false +} + // asThreadID returns the channelID and threadTS from the GroupID. If the // GroupID is not a thread ID, it returns false. func (id GroupID) AsThreadID() (channelID, threadTS string, ok bool) { diff --git a/internal/chunk/chunk_test.go b/internal/chunk/chunk_test.go index aa0ffa8b..a848547b 100644 --- a/internal/chunk/chunk_test.go +++ b/internal/chunk/chunk_test.go @@ -315,3 +315,42 @@ func TestGroupID_AsThreadID(t *testing.T) { }) } } + +func TestGroupID_ExtractChannelID(t *testing.T) { + tests := []struct { + name string + id GroupID + wantChannelID string + wantOk bool + }{ + { + name: "channel", + id: "C123", + wantChannelID: "C123", + wantOk: true, + }, + { + name: "thread", + id: "tC123:1234", + wantChannelID: "C123", + wantOk: true, + }, + { + name: "invalid", + id: "invalid", + wantChannelID: "", + wantOk: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotChannelID, gotOk := tt.id.ExtractChannelID() + if gotChannelID != tt.wantChannelID { + t.Errorf("GroupID.ExtractChannelID() gotChannelID = %v, want %v", gotChannelID, tt.wantChannelID) + } + if gotOk != tt.wantOk { + t.Errorf("GroupID.ExtractChannelID() gotOk = %v, want %v", gotOk, tt.wantOk) + } + }) + } +} diff --git a/internal/chunk/file.go b/internal/chunk/file.go index cd37f0cb..b01afacc 100644 --- a/internal/chunk/file.go +++ b/internal/chunk/file.go @@ -25,6 +25,7 @@ import ( var ( ErrNotFound = errors.New("not found") + ErrNoData = errors.New("no data") ErrDataMisaligned = errors.New("internal error: index and file data misaligned") ) @@ -126,7 +127,7 @@ func indexChunks(dec decoder) (index, error) { idx[id] = append(idx[id], offset) } - slog.Default().Debug("indexing chunks", "len(idx)", len(idx), "caller", osext.Caller(2), "took", time.Since(start).String(), "per_sec", float64(len(idx))/time.Since(start).Seconds()) + slog.Debug("indexing chunks", "len(idx)", len(idx), "caller", osext.Caller(2), "took", time.Since(start).String(), "per_sec", float64(len(idx))/time.Since(start).Seconds()) return idx, nil } @@ -226,22 +227,12 @@ func (f *File) State() (*state.State, error) { // AllMessages returns all the messages for the given channel posted to it (no // thread). The messages are in the order as they appear in the file. -func (f *File) AllMessages(ctx context.Context, channelID string) ([]slack.Message, error) { - var mm []slack.Message - err := f.Sorted(ctx, false, func(ts time.Time, m *slack.Message) error { - if m.Channel != channelID { - return nil - } - if m.ThreadTimestamp != "" && m.ThreadTimestamp != m.Timestamp { - return nil - } - mm = append(mm, *m) - return nil - }) +func (f *File) AllMessages(_ context.Context, channelID string) ([]slack.Message, error) { + m, err := f.allMessagesForID(GroupID(channelID)) if err != nil { return nil, fmt.Errorf("failed getting messages for %s: %w", channelID, err) } - return mm, nil + return m, nil } // AllThreadMessages returns all the messages for the given thread. It does @@ -305,22 +296,6 @@ func (f *File) AllChannelInfos() ([]slack.Channel, error) { return chans, nil } -// AllChannelInfoWithMembers returns all channels with Members populated. -func (f *File) AllChannelInfoWithMembers() ([]slack.Channel, error) { - c, err := f.AllChannelInfos() - if err != nil { - return nil, err - } - for i := range c { - members, err := f.ChannelUsers(c[i].ID) - if err != nil { - return nil, err - } - c[i].Members = members - } - return c, nil -} - // int64s implements sort.Interface for []int64. type int64s []int64 @@ -430,6 +405,7 @@ type offts map[int64]offsetInfo type offsetInfo struct { ID GroupID Type ChunkType + TS int64 // timestamp of the chunk Timestamps []int64 } @@ -471,6 +447,7 @@ func (f *File) offsetTimestamps(ctx context.Context) (offts, error) { ret[offset] = offsetInfo{ ID: chunk.ID(), Type: chunk.Type, + TS: chunk.Timestamp, Timestamps: ts, } } @@ -488,9 +465,13 @@ type Addr struct { // the message with this timestamp within the message slice. It converts the // string timestamp to an int64 timestamp using structures.TS2int, but the // original string timestamp returned in the TimeOffset struct. -func timeOffsets(ots offts) map[int64]Addr { +func timeOffsets(ots offts, chanID string) map[int64]Addr { ret := make(map[int64]Addr, len(ots)) for offset, info := range ots { + channelID, ok := info.ID.ExtractChannelID() + if !ok || channelID != chanID { + continue + } for i, ts := range info.Timestamps { ret[ts] = Addr{ Offset: offset, @@ -502,8 +483,9 @@ func timeOffsets(ots offts) map[int64]Addr { } // Sorted iterates over all the messages in the chunkfile in chronological -// order. If desc is true, the slice will be iterated in reverse order. -func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *slack.Message) error) error { +// order for the requested chanID. If desc is true, the slice will be +// iterated in reverse order. +func (f *File) Sorted(ctx context.Context, chanID string, desc bool, fn func(ts time.Time, m *slack.Message) error) error { ctx, task := trace.NewTask(ctx, "file.Sorted") defer task.End() @@ -515,12 +497,15 @@ func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *s } rgnTos := trace.StartRegion(ctx, "timeOffsets") - tos := timeOffsets(ots) + tos := timeOffsets(ots, chanID) rgnTos.End() tsList := make([]int64, 0, len(tos)) for ts := range tos { tsList = append(tsList, ts) } + if len(tsList) == 0 { + return ErrNoData + } trace.WithRegion(ctx, "sorted.sort", func() { if desc { @@ -537,8 +522,8 @@ func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *s for _, ts := range tsList { tmOff := tos[ts] // read the new chunk from the file only in case that the offset has - // changed. - if tmOff.Offset != prevOffset { + // changed or the chunk is nil (initial value). + if tmOff.Offset != prevOffset || chunk == nil { var err error chunk, err = f.chunkAt(tmOff.Offset) if err != nil { @@ -547,7 +532,7 @@ func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *s prevOffset = tmOff.Offset } - if err := fn(fasttime.Int2Time(ts).UTC(), &chunk.Messages[tmOff.Index]); err != nil { + if err := fn(fasttime.Int2Time(ts), &chunk.Messages[tmOff.Index]); err != nil { return err } } @@ -639,13 +624,15 @@ func (f *File) Latest(ctx context.Context) (map[GroupID]time.Time, error) { if len(info.Timestamps) == 0 { continue } + sort.Sort(sort.Reverse(int64s(info.Timestamps))) + latest := fasttime.Int2Time(info.Timestamps[0]) curr, ok := ret[info.ID] if !ok { - ret[info.ID] = fasttime.Int2Time(info.Timestamps[0]) + ret[info.ID] = latest continue } - if fasttime.Int2Time(info.Timestamps[0]).After(curr) { - ret[info.ID] = fasttime.Int2Time(info.Timestamps[0]) + if latest.After(curr) { + ret[info.ID] = latest } } return ret, nil diff --git a/internal/chunk/file_test.go b/internal/chunk/file_test.go index 5c971639..9470be2b 100644 --- a/internal/chunk/file_test.go +++ b/internal/chunk/file_test.go @@ -129,20 +129,20 @@ var archivedChannel = []Chunk{ } var testChunks = []Chunk{ - {Type: CChannelInfo, ChannelID: TestChannelID, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID, NumMembers: 2}}}}, - {Type: CChannelUsers, ChannelID: TestChannelID, ChannelUsers: []string{"user1", "user2"}}, - {Type: CMessages, ChannelID: TestChannelID, Messages: []slack.Message{ + {Type: CChannelInfo, Timestamp: 123456, ChannelID: TestChannelID, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID, NumMembers: 2}}}}, + {Type: CChannelUsers, Timestamp: 123456, ChannelID: TestChannelID, ChannelUsers: []string{"user1", "user2"}}, + {Type: CMessages, Timestamp: 123456, ChannelID: TestChannelID, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.100000", Text: "message1"}}, {Msg: slack.Msg{Timestamp: "1234567890.200000", Text: "message2"}}, {Msg: slack.Msg{Timestamp: "1234567890.300000", Text: "message3"}}, {Msg: slack.Msg{Timestamp: "1234567890.400000", Text: "message4"}}, {Msg: slack.Msg{Timestamp: "1234567890.500000", Text: "message5"}}, }}, - {Type: CMessages, ChannelID: TestChannelID, Messages: []slack.Message{ + {Type: CMessages, Timestamp: 123456, ChannelID: TestChannelID, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.600000", Text: "Hello, again!"}}, {Msg: slack.Msg{Timestamp: "1234567890.700000", Text: "And again!"}}, }}, - {Type: CMessages, ChannelID: TestChannelID, Messages: []slack.Message{ + {Type: CMessages, Timestamp: 123456, ChannelID: TestChannelID, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.800000", Text: "And again!"}}, { Msg: slack.Msg{ @@ -187,20 +187,20 @@ var testChunks = []Chunk{ }, }, // chunks from another channel - {Type: CChannelInfo, ChannelID: TestChannelID2, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID2, NumMembers: 2}}}}, - {Type: CChannelUsers, ChannelID: TestChannelID2, ChannelUsers: []string{"user3", "user4"}}, - {Type: CMessages, ChannelID: TestChannelID2, Messages: []slack.Message{ + {Type: CChannelInfo, Timestamp: 123456, ChannelID: TestChannelID2, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID2, NumMembers: 2}}}}, + {Type: CChannelUsers, Timestamp: 123456, ChannelID: TestChannelID2, ChannelUsers: []string{"user3", "user4"}}, + {Type: CMessages, Timestamp: 123456, ChannelID: TestChannelID2, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.100000", Text: "message1"}}, {Msg: slack.Msg{Timestamp: "1234567890.200000", Text: "message2"}}, {Msg: slack.Msg{Timestamp: "1234567890.300000", Text: "message3"}}, {Msg: slack.Msg{Timestamp: "1234567890.400000", Text: "message4"}}, {Msg: slack.Msg{Timestamp: "1234567890.500000", Text: "message5"}}, }}, - {Type: CMessages, ChannelID: TestChannelID2, Messages: []slack.Message{ + {Type: CMessages, Timestamp: 123456, ChannelID: TestChannelID2, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.600000", Text: "Hello, again!"}}, {Msg: slack.Msg{Timestamp: "1234567890.700000", Text: "And again!"}}, }}, - {Type: CMessages, ChannelID: TestChannelID2, Messages: []slack.Message{ + {Type: CMessages, Timestamp: 123456, ChannelID: TestChannelID2, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.800000", Text: "And again!"}}, { Msg: slack.Msg{ @@ -458,14 +458,14 @@ func TestFile_offsetTimestamps(t *testing.T) { rs: marshalChunks(testChunks...), }, want: offts{ - 661: offsetInfo{ID: TestChannelID, Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}}, - 1491: offsetInfo{ID: TestChannelID, Timestamps: []int64{1234567890600000, 1234567890700000}}, - 1854: offsetInfo{ID: TestChannelID, Timestamps: []int64{1234567890800000, 1234567890800000}}, - 2305: offsetInfo{ID: "tC1234567890:1234567890.800000", Type: CThreadMessages, Timestamps: []int64{1234567890900000, 1234567891100000}}, - 3798: offsetInfo{ID: TestChannelID2, Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}}, - 4627: offsetInfo{ID: TestChannelID2, Timestamps: []int64{1234567890600000, 1234567890700000}}, - 4989: offsetInfo{ID: TestChannelID2, Timestamps: []int64{1234567890800000, 1234567890800000}}, - 5439: offsetInfo{ID: "tC987654321:1234567890.800000", Type: CThreadMessages, Timestamps: []int64{1234567890900000, 1234567891100000}}, + 671: offsetInfo{ID: TestChannelID, TS: 123456, Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}}, + 1506: offsetInfo{ID: TestChannelID, TS: 123456, Timestamps: []int64{1234567890600000, 1234567890700000}}, + 1874: offsetInfo{ID: TestChannelID, TS: 123456, Timestamps: []int64{1234567890800000, 1234567890800000}}, + 2330: offsetInfo{ID: "tC1234567890:1234567890.800000", Type: CThreadMessages, TS: 1234567890, Timestamps: []int64{1234567890900000, 1234567891100000}}, + 3833: offsetInfo{ID: TestChannelID2, TS: 123456, Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}}, + 4667: offsetInfo{ID: TestChannelID2, TS: 123456, Timestamps: []int64{1234567890600000, 1234567890700000}}, + 5034: offsetInfo{ID: TestChannelID2, TS: 123456, Timestamps: []int64{1234567890800000, 1234567890800000}}, + 5489: offsetInfo{ID: "tC987654321:1234567890.800000", Type: CThreadMessages, TS: 1234567890, Timestamps: []int64{1234567890900000, 1234567891100000}}, }, }, } @@ -487,7 +487,8 @@ func TestFile_offsetTimestamps(t *testing.T) { func Test_timeOffsets(t *testing.T) { type args struct { - ots offts + ots offts + chanID string } tests := []struct { name string @@ -500,6 +501,7 @@ func Test_timeOffsets(t *testing.T) { ots: offts{ 596: offsetInfo{ID: TestChannelID, Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}}, }, + chanID: TestChannelID, }, want: map[int64]Addr{ 1234567890100000: { @@ -527,7 +529,7 @@ func Test_timeOffsets(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want, timeOffsets(tt.args.ots)) + assert.Equal(t, tt.want, timeOffsets(tt.args.ots, tt.args.chanID)) }) } } @@ -557,7 +559,8 @@ func TestFile_Sorted(t *testing.T) { rs io.ReadSeeker } type args struct { - fn func(ts time.Time, m *slack.Message) error + channelID string + fn func(ts time.Time, m *slack.Message) error } tests := []struct { @@ -573,6 +576,7 @@ func TestFile_Sorted(t *testing.T) { rs: marshalChunks(testChunks...), }, args: args{ + channelID: TestChannelID, fn: func(ts time.Time, m *slack.Message) error { return nil }, @@ -591,6 +595,20 @@ func TestFile_Sorted(t *testing.T) { }, wantErr: false, }, + { + name: "different channel", + fields: fields{ + rs: marshalChunks(testChunks...), + }, + args: args{ + channelID: "different", + fn: func(ts time.Time, m *slack.Message) error { + return nil + }, + }, + wantFnCalls: []sortedArgs{}, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -605,7 +623,7 @@ func TestFile_Sorted(t *testing.T) { return nil } - if err := p.Sorted(context.Background(), false, recorder); (err != nil) != tt.wantErr { + if err := p.Sorted(context.Background(), tt.args.channelID, false, recorder); (err != nil) != tt.wantErr { t.Errorf("File.Sorted() error = %v, wantErr %v", err, tt.wantErr) } assert.Equal(t, tt.wantFnCalls.String(), rec.String()) @@ -780,47 +798,6 @@ func TestFile_AllChannelInfos(t *testing.T) { } } -func TestFile_AllChannelInfoWithMembers(t *testing.T) { - memchans := []slack.Channel{ - *testChunks[0].Channel, - *testChunks[6].Channel, - } - memchans[0].Members = testChunks[1].ChannelUsers - memchans[1].Members = testChunks[7].ChannelUsers - type fields struct { - rs io.ReadSeeker - } - tests := []struct { - name string - fields fields - want []slack.Channel - wantErr bool - }{ - { - "finds all users and channels", - fields{ - marshalChunks(testChunks...), - }, - memchans, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - f := &File{ - rs: tt.fields.rs, - idx: mkindex(tt.fields.rs), - } - got, err := f.AllChannelInfoWithMembers() - if (err != nil) != tt.wantErr { - t.Errorf("File.AllChannelInfoWithMembers() error = %v, wantErr %v", err, tt.wantErr) - return - } - assert.Equal(t, got, tt.want) - }) - } -} - func TestFile_ChannelInfo(t *testing.T) { chanWithUsers := *testChunks[0].Channel chanWithUsers.Members = testChunks[1].ChannelUsers