diff --git a/internal/chunk/dbproc/repository/dbmessage.go b/internal/chunk/dbproc/repository/dbmessage.go index 630cc94a..c3c23422 100644 --- a/internal/chunk/dbproc/repository/dbmessage.go +++ b/internal/chunk/dbproc/repository/dbmessage.go @@ -9,6 +9,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/rusq/slack" + "github.com/rusq/slackdump/v3/internal/chunk" "github.com/rusq/slackdump/v3/internal/fasttime" "github.com/rusq/slackdump/v3/internal/structures" ) @@ -117,7 +118,7 @@ type MessageRepository interface { AllForThread(ctx context.Context, conn sqlx.QueryerContext, channelID, threadID string) (iter.Seq2[DBMessage, error], error) // All returns all thread and channel messages in ascending or descending // time order. - All(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error) + Sorted(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error) } type messageRepository struct { @@ -129,11 +130,11 @@ func NewMessageRepository() MessageRepository { } func (r messageRepository) Count(ctx context.Context, conn sqlx.QueryerContext, channelID string) (int64, error) { - return r.countTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}}) + return r.countTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}}, chunk.CMessages, chunk.CThreadMessages) } func (r messageRepository) AllForID(ctx context.Context, conn sqlx.QueryerContext, channelID string) (iter.Seq2[DBMessage, error], error) { - return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, UserKeyOrder: true}) + return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, UserKeyOrder: true}, chunk.CMessages, chunk.CThreadMessages) } // threadCond returns a condition for selecting messages that are part of a @@ -152,7 +153,7 @@ func (r messageRepository) CountThread(ctx context.Context, conn sqlx.QueryerCon if err != nil { return 0, fmt.Errorf("countThread fasttime: %w", err) } - return r.countTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}}) + return r.countTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}}, chunk.CMessages, chunk.CThreadMessages) } func (r messageRepository) AllForThread(ctx context.Context, conn sqlx.QueryerContext, channelID, threadID string) (iter.Seq2[DBMessage, error], error) { @@ -160,9 +161,9 @@ func (r messageRepository) AllForThread(ctx context.Context, conn sqlx.QueryerCo if err != nil { return nil, fmt.Errorf("allForThread fasttime: %w", err) } - return r.allOfTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}, UserKeyOrder: true}) + return r.allOfTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}, UserKeyOrder: true}, chunk.CMessages, chunk.CThreadMessages) } -func (r messageRepository) All(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error) { - return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, OrderBy: []string{"ID " + order.String()}}) +func (r messageRepository) Sorted(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error) { + return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, OrderBy: []string{"T.ID " + order.String()}}, chunk.CMessages, chunk.CThreadMessages) } diff --git a/internal/chunk/dbproc/source.go b/internal/chunk/dbproc/source.go index 9cf7c28b..b496beaf 100644 --- a/internal/chunk/dbproc/source.go +++ b/internal/chunk/dbproc/source.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "iter" + "time" "github.com/jmoiron/sqlx" @@ -11,6 +12,7 @@ import ( "github.com/rusq/slackdump/v3/internal/chunk" "github.com/rusq/slackdump/v3/internal/chunk/dbproc/repository" + "github.com/rusq/slackdump/v3/internal/fasttime" ) const preallocSz = 100 @@ -135,6 +137,33 @@ func (s *Source) AllThreadMessages(ctx context.Context, channelID, threadID stri return valueIter(it), nil } +func (s *Source) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error { + tx, err := s.conn.BeginTxx(ctx, &sql.TxOptions{ReadOnly: true}) + if err != nil { + return err + } + defer tx.Rollback() + + mr := repository.NewMessageRepository() + it, err := mr.Sorted(ctx, s.conn, channelID, repository.Asc) + if err != nil { + return err + } + for c, err := range it { + if err != nil { + return err + } + v, err := c.Val() + if err != nil { + return err + } + if err := cb(fasttime.Int2Time(c.ID), &v); err != nil { + return err + } + } + return nil +} + func (s *Source) ChannelInfo(ctx context.Context, channelID string) (*slack.Channel, error) { cr := repository.NewChannelRepository() diff --git a/internal/chunk/directory.go b/internal/chunk/directory.go index 62e43a20..12bb9ebf 100644 --- a/internal/chunk/directory.go +++ b/internal/chunk/directory.go @@ -559,3 +559,13 @@ func (d *Directory) Latest(ctx context.Context) (map[GroupID]time.Time, error) { } return latest, nil } + +func (d *Directory) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error { + // TODO: this is oversimplification. The messages for the channel in + // canonical chunk directory may be stored in multiple files. + f, err := d.Open(FileID(channelID)) + if err != nil { + return err + } + return f.Sorted(ctx, channelID, desc, cb) +} diff --git a/internal/chunk/file.go b/internal/chunk/file.go index b01afacc..5645b35a 100644 --- a/internal/chunk/file.go +++ b/internal/chunk/file.go @@ -483,8 +483,10 @@ func timeOffsets(ots offts, chanID string) map[int64]Addr { } // Sorted iterates over all the messages in the chunkfile in chronological -// order for the requested chanID. If desc is true, the slice will be -// iterated in reverse order. +// order for the requested chanID. If desc is true, the slice will be iterated +// in reverse order. It does not differentiate between the channel and thread +// messages. The function fn is called for each message in the slice. If the +// function returns an error, the iteration stops and the error is returned. func (f *File) Sorted(ctx context.Context, chanID string, desc bool, fn func(ts time.Time, m *slack.Message) error) error { ctx, task := trace.NewTask(ctx, "file.Sorted") defer task.End() diff --git a/internal/chunk/transform/export.go b/internal/chunk/transform/export.go index 2b0574ba..f0929bfd 100644 --- a/internal/chunk/transform/export.go +++ b/internal/chunk/transform/export.go @@ -10,6 +10,7 @@ import ( "runtime/trace" "sort" "sync/atomic" + "time" "github.com/rusq/fsadapter" "github.com/rusq/slack" @@ -101,24 +102,13 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err uidx := types.Users(e.getUsers()).IndexByID() trgdir := ExportChanName(ci) - it, err := e.src.AllMessages(ctx, ci.ID) - if err != nil { - return fmt.Errorf("error getting messages for %q: %w", ci.ID, err) - } // loop variables var ( prevDt string currDt string mm []export.ExportMessage ) - for m, err := range it { - if err != nil { - return fmt.Errorf("error reading message: %w", err) - } - ts, err := structures.ParseSlackTS(m.Timestamp) - if err != nil { - return fmt.Errorf("error parsing timestamp: %w", err) - } + if err := e.src.Sorted(ctx, ci.ID, false, func(ts time.Time, m *slack.Message) error { currDt = ts.Format("2006-01-02") if currDt != prevDt || prevDt == "" { if prevDt != "" { @@ -131,10 +121,11 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err prevDt = currDt } - // the "thread" is only used to collect statistics. Thread messages - // are passed by Sorted and written as a normal course of action. + // NOTE: the "thread" is only used to collect statistics. Thread + // messages are passed by Sorted and written as a normal course of + // action. var thread []slack.Message - if structures.IsThreadStart(&m) && m.LatestReply != structures.LatestReplyNoReplies { + if structures.IsThreadStart(m) && m.LatestReply != structures.LatestReplyNoReplies { // get the thread for the initial thread message only. var err error itTm, err := e.src.AllThreadMessages(ctx, ci.ID, m.ThreadTimestamp) @@ -158,12 +149,19 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err // apply all message functions. for _, fn := range e.msgFunc { - if err := fn(ci, &m); err != nil { + if err := fn(ci, m); err != nil { return fmt.Errorf("error updating message: %w", err) } } - mm = append(mm, *toExportMessage(&m, thread, uidx[m.User])) + mm = append(mm, *toExportMessage(m, thread, uidx[m.User])) + return nil + }); err != nil { + if errors.Is(err, chunk.ErrNoData) { + lg.DebugContext(ctx, "no messages for the channel", "channel", ci.ID) + return nil + } + return fmt.Errorf("error reading messages: %w", err) } if len(mm) > 0 { diff --git a/internal/chunk/transform/export_test.go b/internal/chunk/transform/export_test.go index 486f2340..6ca446db 100644 --- a/internal/chunk/transform/export_test.go +++ b/internal/chunk/transform/export_test.go @@ -2,6 +2,7 @@ package transform import ( "context" + "os" "path/filepath" "reflect" "sync/atomic" @@ -9,10 +10,14 @@ import ( "github.com/rusq/fsadapter" "github.com/rusq/slack" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" "github.com/rusq/slackdump/v3/internal/chunk" "github.com/rusq/slackdump/v3/internal/fixtures" "github.com/rusq/slackdump/v3/internal/source" + "github.com/rusq/slackdump/v3/internal/source/mock_source" + "github.com/rusq/slackdump/v3/internal/testutil" ) func Test_transform(t *testing.T) { @@ -110,3 +115,74 @@ func TestExpConverter_getUsers(t *testing.T) { }) } } + +func TestExpConverter_writeMessages(t *testing.T) { + type fields struct { + // src source.Sourcer + // fsa fsadapter.FS + users atomic.Value + msgFunc []msgUpdFunc + } + type args struct { + ctx context.Context + ci *slack.Channel + } + tests := []struct { + name string + fields fields + expectFn func(ms *mock_source.MockSourcer, mst *mock_source.MockStorage) + args args + wantFiles map[string]testutil.FileInfo + wantErr bool + }{ + { + name: "threaded messages", + fields: fields{}, + args: args{ + ctx: context.Background(), + ci: fixtures.Load[[]*slack.Channel](fixtures.TestChannels)[0], + }, + expectFn: func(ms *mock_source.MockSourcer, mst *mock_source.MockStorage) { + chanmsg := testutil.Slice2Seq2( + fixtures.Load[[]slack.Message](fixtures.ConvertPublic1AllMessagesJSON), + ) + threadmsg := testutil.Slice2Seq2( + fixtures.Load[[]slack.Message](fixtures.ConvertPublic1AllThreadMessagesJSON), + ) + ms.EXPECT().AllMessages(gomock.Any(), gomock.Any()). + Return(chanmsg, nil) + ms.EXPECT().AllThreadMessages(gomock.Any(), gomock.Any(), gomock.Any()). + Return(threadmsg, nil) + }, + wantFiles: map[string]testutil.FileInfo{ + "random/2025-01-10.json": {Name: "2025-01-10.json", Size: 1892}, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + ms := mock_source.NewMockSourcer(ctrl) + mst := mock_source.NewMockStorage(ctrl) + if tt.expectFn != nil { + tt.expectFn(ms, mst) + } + + dir := t.TempDir() + fsa := fsadapter.NewDirectory(dir) + + e := &ExpConverter{ + src: ms, + fsa: fsa, + users: tt.fields.users, + msgFunc: tt.fields.msgFunc, + } + if err := e.writeMessages(tt.args.ctx, tt.args.ci); (err != nil) != tt.wantErr { + t.Errorf("ExpConverter.writeMessages() error = %v, wantErr %v", err, tt.wantErr) + } + gotfiles := testutil.CollectFiles(t, os.DirFS(dir)) + assert.Equal(t, tt.wantFiles, gotfiles) + }) + } +} diff --git a/internal/fixtures/assets/converter/chan1.json b/internal/fixtures/assets/converter/chan1.json new file mode 100644 index 00000000..ef7e39f4 --- /dev/null +++ b/internal/fixtures/assets/converter/chan1.json @@ -0,0 +1,61 @@ +[ +{ + "client_msg_id": "54ee98bd-56ea-4210-b3e8-8351d759d3ab", + "type": "message", + "user": "UHSD97ZA5", + "text": "test", + "ts": "1736478514.897649", + "team": "THY5HTZ8U", + "blocks": [ + { + "type": "rich_text", + "block_id": "gB9fq", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "text", + "text": "test" + } + ] + } + ] + } + ] +}, +{ + "client_msg_id": "324eed73-bf36-4f15-9927-3c4a63bf2341", + "type": "message", + "user": "UHSD97ZA5", + "text": "test2", + "ts": "1736478630.905399", + "thread_ts": "1736478630.905399", + "last_read": "1736554783.040199", + "subscribed": true, + "reply_count": 2, + "reply_users": [ + "U0887G31L03", + "UHSD97ZA5" + ], + "latest_reply": "1736554783.040199", + "team": "THY5HTZ8U", + "blocks": [ + { + "type": "rich_text", + "block_id": "Qm0o+", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "text", + "text": "test2" + } + ] + } + ] + } + ] +} +] \ No newline at end of file diff --git a/internal/fixtures/assets/converter/chan1_thread_1736478630.905399.json b/internal/fixtures/assets/converter/chan1_thread_1736478630.905399.json new file mode 100644 index 00000000..56845ad8 --- /dev/null +++ b/internal/fixtures/assets/converter/chan1_thread_1736478630.905399.json @@ -0,0 +1,68 @@ +[ +{ + "client_msg_id": "f6770037-fcf1-41aa-8d11-d2ee103e05d6", + "type": "message", + "user": "U0887G31L03", + "text": "A message from a deactivated account", + "ts": "1736554746.482349", + "thread_ts": "1736478630.905399", + "parent_user_id": "UHSD97ZA5", + "team": "THY5HTZ8U", + "replace_original": false, + "delete_original": false, + "metadata": { + "event_type": "", + "event_payload": null + }, + "blocks": [ + { + "type": "rich_text", + "block_id": "gvs1d", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "text", + "text": "A message from a deactivated account" + } + ] + } + ] + } + ] +}, +{ + "client_msg_id": "c8c88073-b090-4216-b87c-5f64a8f97bd3", + "type": "message", + "user": "UHSD97ZA5", + "text": "Reply to the same thread", + "ts": "1736554783.040199", + "thread_ts": "1736478630.905399", + "parent_user_id": "UHSD97ZA5", + "team": "THY5HTZ8U", + "replace_original": false, + "delete_original": false, + "metadata": { + "event_type": "", + "event_payload": null + }, + "blocks": [ + { + "type": "rich_text", + "block_id": "tiJat", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "text", + "text": "Reply to the same thread" + } + ] + } + ] + } + ] +} +] \ No newline at end of file diff --git a/internal/fixtures/converter.go b/internal/fixtures/converter.go new file mode 100644 index 00000000..18605302 --- /dev/null +++ b/internal/fixtures/converter.go @@ -0,0 +1,10 @@ +package fixtures + +import _ "embed" + +var ( + //go:embed assets/converter/chan1.json + ConvertPublic1AllMessagesJSON string + //go:embed assets/converter/chan1_thread_1736478630.905399.json + ConvertPublic1AllThreadMessagesJSON string +) diff --git a/internal/source/chunkdir.go b/internal/source/chunkdir.go index ea6bf8ee..885e9e75 100644 --- a/internal/source/chunkdir.go +++ b/internal/source/chunkdir.go @@ -147,3 +147,7 @@ func (c *ChunkDir) Files() Storage { func (c *ChunkDir) Avatars() Storage { return c.avatars } + +func (c *ChunkDir) Sorted(ctx context.Context, id string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error { + return c.d.Sorted(ctx, id, desc, cb) +} diff --git a/internal/source/database.go b/internal/source/database.go index fb545d1d..6587297b 100644 --- a/internal/source/database.go +++ b/internal/source/database.go @@ -107,3 +107,7 @@ func (d *Database) Files() Storage { func (d *Database) Avatars() Storage { return d.avatars } + +func (d *Database) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error { + return d.s.Sorted(ctx, channelID, desc, cb) +} diff --git a/internal/source/export.go b/internal/source/export.go index 124077c6..df108aef 100644 --- a/internal/source/export.go +++ b/internal/source/export.go @@ -96,7 +96,26 @@ func (e *Export) Type() string { // AllMessages returns all channel messages without thread messages. func (e *Export) AllMessages(_ context.Context, channelID string) (iter.Seq2[slack.Message, error], error) { - return e.walkChannelMessages(channelID) + it, err := e.walkChannelMessages(channelID) + if err != nil { + return nil, err + } + return func(yield func(slack.Message, error) bool) { + for m, err := range it { + if err != nil { + yield(slack.Message{}, err) + return + } + + if m.ThreadTimestamp != "" && !structures.IsThreadStart(&m) { + // skip thread messages + continue + } + if !yield(m, nil) { + return + } + } + }, nil } func (e *Export) walkChannelMessages(channelID string) (iter.Seq2[slack.Message, error], error) { @@ -126,7 +145,8 @@ func (e *Export) walkChannelMessages(channelID string) (iter.Seq2[slack.Message, slog.Default().Debug("skipping an empty message", "pth", pth, "index", i) continue } - if !yield(slack.Message{Msg: *m.Msg}, nil) { + sm := slack.Message{Msg: *m.Msg} + if !yield(sm, nil) { return fs.SkipAll } } @@ -194,3 +214,8 @@ func (e *Export) Files() Storage { func (e *Export) Avatars() Storage { return e.avatars } + +func (e *Export) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error { + // TODO + return errors.New("not supported yet") +} diff --git a/internal/source/mock_source/mock_source.go b/internal/source/mock_source/mock_source.go new file mode 100644 index 00000000..7d2f22a0 --- /dev/null +++ b/internal/source/mock_source/mock_source.go @@ -0,0 +1,289 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/rusq/slackdump/v3/internal/source (interfaces: Sourcer,Storage) +// +// Generated by this command: +// +// mockgen -destination=mock_source/mock_source.go . Sourcer,Storage +// + +// Package mock_source is a generated GoMock package. +package mock_source + +import ( + context "context" + fs "io/fs" + iter "iter" + reflect "reflect" + time "time" + + slack "github.com/rusq/slack" + source "github.com/rusq/slackdump/v3/internal/source" + structures "github.com/rusq/slackdump/v3/internal/structures" + gomock "go.uber.org/mock/gomock" +) + +// MockSourcer is a mock of Sourcer interface. +type MockSourcer struct { + ctrl *gomock.Controller + recorder *MockSourcerMockRecorder + isgomock struct{} +} + +// MockSourcerMockRecorder is the mock recorder for MockSourcer. +type MockSourcerMockRecorder struct { + mock *MockSourcer +} + +// NewMockSourcer creates a new mock instance. +func NewMockSourcer(ctrl *gomock.Controller) *MockSourcer { + mock := &MockSourcer{ctrl: ctrl} + mock.recorder = &MockSourcerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSourcer) EXPECT() *MockSourcerMockRecorder { + return m.recorder +} + +// AllMessages mocks base method. +func (m *MockSourcer) AllMessages(ctx context.Context, channelID string) (iter.Seq2[slack.Message, error], error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllMessages", ctx, channelID) + ret0, _ := ret[0].(iter.Seq2[slack.Message, error]) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllMessages indicates an expected call of AllMessages. +func (mr *MockSourcerMockRecorder) AllMessages(ctx, channelID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllMessages", reflect.TypeOf((*MockSourcer)(nil).AllMessages), ctx, channelID) +} + +// AllThreadMessages mocks base method. +func (m *MockSourcer) AllThreadMessages(ctx context.Context, channelID, threadID string) (iter.Seq2[slack.Message, error], error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllThreadMessages", ctx, channelID, threadID) + ret0, _ := ret[0].(iter.Seq2[slack.Message, error]) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllThreadMessages indicates an expected call of AllThreadMessages. +func (mr *MockSourcerMockRecorder) AllThreadMessages(ctx, channelID, threadID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllThreadMessages", reflect.TypeOf((*MockSourcer)(nil).AllThreadMessages), ctx, channelID, threadID) +} + +// Avatars mocks base method. +func (m *MockSourcer) Avatars() source.Storage { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Avatars") + ret0, _ := ret[0].(source.Storage) + return ret0 +} + +// Avatars indicates an expected call of Avatars. +func (mr *MockSourcerMockRecorder) Avatars() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Avatars", reflect.TypeOf((*MockSourcer)(nil).Avatars)) +} + +// ChannelInfo mocks base method. +func (m *MockSourcer) ChannelInfo(ctx context.Context, channelID string) (*slack.Channel, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChannelInfo", ctx, channelID) + ret0, _ := ret[0].(*slack.Channel) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChannelInfo indicates an expected call of ChannelInfo. +func (mr *MockSourcerMockRecorder) ChannelInfo(ctx, channelID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChannelInfo", reflect.TypeOf((*MockSourcer)(nil).ChannelInfo), ctx, channelID) +} + +// Channels mocks base method. +func (m *MockSourcer) Channels(ctx context.Context) ([]slack.Channel, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Channels", ctx) + ret0, _ := ret[0].([]slack.Channel) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Channels indicates an expected call of Channels. +func (mr *MockSourcerMockRecorder) Channels(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Channels", reflect.TypeOf((*MockSourcer)(nil).Channels), ctx) +} + +// Close mocks base method. +func (m *MockSourcer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSourcerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSourcer)(nil).Close)) +} + +// Files mocks base method. +func (m *MockSourcer) Files() source.Storage { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Files") + ret0, _ := ret[0].(source.Storage) + return ret0 +} + +// Files indicates an expected call of Files. +func (mr *MockSourcerMockRecorder) Files() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Files", reflect.TypeOf((*MockSourcer)(nil).Files)) +} + +// Latest mocks base method. +func (m *MockSourcer) Latest(ctx context.Context) (map[structures.SlackLink]time.Time, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Latest", ctx) + ret0, _ := ret[0].(map[structures.SlackLink]time.Time) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Latest indicates an expected call of Latest. +func (mr *MockSourcerMockRecorder) Latest(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Latest", reflect.TypeOf((*MockSourcer)(nil).Latest), ctx) +} + +// Name mocks base method. +func (m *MockSourcer) Name() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) + return ret0 +} + +// Name indicates an expected call of Name. +func (mr *MockSourcerMockRecorder) Name() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockSourcer)(nil).Name)) +} + +// Sorted mocks base method. +func (m *MockSourcer) Sorted(ctx context.Context, channelID string, desc bool, cb func(time.Time, *slack.Message) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Sorted", ctx, channelID, desc, cb) + ret0, _ := ret[0].(error) + return ret0 +} + +// Sorted indicates an expected call of Sorted. +func (mr *MockSourcerMockRecorder) Sorted(ctx, channelID, desc, cb any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Sorted", reflect.TypeOf((*MockSourcer)(nil).Sorted), ctx, channelID, desc, cb) +} + +// Type mocks base method. +func (m *MockSourcer) Type() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Type") + ret0, _ := ret[0].(string) + return ret0 +} + +// Type indicates an expected call of Type. +func (mr *MockSourcerMockRecorder) Type() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Type", reflect.TypeOf((*MockSourcer)(nil).Type)) +} + +// Users mocks base method. +func (m *MockSourcer) Users(ctx context.Context) ([]slack.User, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Users", ctx) + ret0, _ := ret[0].([]slack.User) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Users indicates an expected call of Users. +func (mr *MockSourcerMockRecorder) Users(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Users", reflect.TypeOf((*MockSourcer)(nil).Users), ctx) +} + +// WorkspaceInfo mocks base method. +func (m *MockSourcer) WorkspaceInfo(ctx context.Context) (*slack.AuthTestResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WorkspaceInfo", ctx) + ret0, _ := ret[0].(*slack.AuthTestResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WorkspaceInfo indicates an expected call of WorkspaceInfo. +func (mr *MockSourcerMockRecorder) WorkspaceInfo(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkspaceInfo", reflect.TypeOf((*MockSourcer)(nil).WorkspaceInfo), ctx) +} + +// MockStorage is a mock of Storage interface. +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder + isgomock struct{} +} + +// MockStorageMockRecorder is the mock recorder for MockStorage. +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance. +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// FS mocks base method. +func (m *MockStorage) FS() fs.FS { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FS") + ret0, _ := ret[0].(fs.FS) + return ret0 +} + +// FS indicates an expected call of FS. +func (mr *MockStorageMockRecorder) FS() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FS", reflect.TypeOf((*MockStorage)(nil).FS)) +} + +// File mocks base method. +func (m *MockStorage) File(id, name string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "File", id, name) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// File indicates an expected call of File. +func (mr *MockStorageMockRecorder) File(id, name any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "File", reflect.TypeOf((*MockStorage)(nil).File), id, name) +} diff --git a/internal/source/source.go b/internal/source/source.go index 49c08cd8..49fbee10 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -29,6 +29,8 @@ import ( ) // Sourcer is an interface for retrieving data from different sources. +// +//go:generate mockgen -destination=mock_source/mock_source.go . Sourcer,Storage type Sourcer interface { // Name should return the name of the retriever underlying media, i.e. // directory or archive. @@ -44,6 +46,12 @@ type Sourcer interface { // AllThreadMessages should return all messages for the given tuple // (channelID, threadID). AllThreadMessages(ctx context.Context, channelID, threadID string) (iter.Seq2[slack.Message, error], error) + // Sorted should iterate over all (both channel and thread) messages for + // the requested channel id. If desc is true, it must return messages in + // descending order (by timestamp), otherwise in ascending order. The + // callback function cb should be called for each message. If cb returns an + // error, the iteration should be stopped and the error should be returned. + Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error // ChannelInfo should return the channel information for the given channel // id. ChannelInfo(ctx context.Context, channelID string) (*slack.Channel, error) @@ -239,3 +247,8 @@ func unmarshal[T ~[]S, S any](fsys fs.FS, name string) (T, error) { } return v, nil } + +func (d *Dump) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error { + // TODO: implement + return errors.New("not implemented") +} diff --git a/internal/testutil/fs.go b/internal/testutil/fs.go new file mode 100644 index 00000000..634a5722 --- /dev/null +++ b/internal/testutil/fs.go @@ -0,0 +1,34 @@ +package testutil + +import ( + "io/fs" + "testing" +) + +type FileInfo struct { + Name string + Size int64 +} + +// CollectFiles returns a map of file paths to file info. +func CollectFiles(t *testing.T, fsys fs.FS) (ret map[string]FileInfo) { + t.Helper() + ret = make(map[string]FileInfo) + if err := fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + fi, err := d.Info() + if err != nil { + return err + } + ret[path] = FileInfo{Name: d.Name(), Size: fi.Size()} + return nil + }); err != nil { + t.Fatal(err) + } + return +} diff --git a/internal/testutil/iter.go b/internal/testutil/iter.go index 144fef90..55f33453 100644 --- a/internal/testutil/iter.go +++ b/internal/testutil/iter.go @@ -13,3 +13,23 @@ func Seq2Collect[T, U any](it iter.Seq2[T, U]) (ret []IterVal[T, U]) { } return } + +func IterVal2Iter[T, U any](s []IterVal[T, U]) iter.Seq2[T, U] { + return func(yield func(T, U) bool) { + for _, v := range s { + if !yield(v.T, v.U) { + return + } + } + } +} + +func Slice2Seq2[T any](s []T) iter.Seq2[T, error] { + return func(yield func(T, error) bool) { + for _, v := range s { + if !yield(v, nil) { + return + } + } + } +}