Skip to content

Commit

Permalink
fix export conversion and viewing
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 22, 2025
1 parent 124fc3d commit ba7a064
Show file tree
Hide file tree
Showing 16 changed files with 672 additions and 28 deletions.
15 changes: 8 additions & 7 deletions internal/chunk/dbproc/repository/dbmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/fasttime"
"github.com/rusq/slackdump/v3/internal/structures"
)
Expand Down Expand Up @@ -117,7 +118,7 @@ type MessageRepository interface {
AllForThread(ctx context.Context, conn sqlx.QueryerContext, channelID, threadID string) (iter.Seq2[DBMessage, error], error)
// All returns all thread and channel messages in ascending or descending
// time order.
All(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error)
Sorted(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error)
}

type messageRepository struct {
Expand All @@ -129,11 +130,11 @@ func NewMessageRepository() MessageRepository {
}

func (r messageRepository) Count(ctx context.Context, conn sqlx.QueryerContext, channelID string) (int64, error) {
return r.countTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}})
return r.countTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}}, chunk.CMessages, chunk.CThreadMessages)
}

func (r messageRepository) AllForID(ctx context.Context, conn sqlx.QueryerContext, channelID string) (iter.Seq2[DBMessage, error], error) {
return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, UserKeyOrder: true})
return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, UserKeyOrder: true}, chunk.CMessages, chunk.CThreadMessages)
}

// threadCond returns a condition for selecting messages that are part of a
Expand All @@ -152,17 +153,17 @@ func (r messageRepository) CountThread(ctx context.Context, conn sqlx.QueryerCon
if err != nil {
return 0, fmt.Errorf("countThread fasttime: %w", err)
}
return r.countTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}})
return r.countTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}}, chunk.CMessages, chunk.CThreadMessages)
}

func (r messageRepository) AllForThread(ctx context.Context, conn sqlx.QueryerContext, channelID, threadID string) (iter.Seq2[DBMessage, error], error) {
parentID, err := fasttime.TS2int(threadID)
if err != nil {
return nil, fmt.Errorf("allForThread fasttime: %w", err)
}
return r.allOfTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}, UserKeyOrder: true})
return r.allOfTypeWhere(ctx, conn, queryParams{Where: r.threadCond(), Binds: []any{channelID, parentID}, UserKeyOrder: true}, chunk.CMessages, chunk.CThreadMessages)
}

func (r messageRepository) All(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error) {
return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, OrderBy: []string{"ID " + order.String()}})
func (r messageRepository) Sorted(ctx context.Context, conn sqlx.QueryerContext, channelID string, order Order) (iter.Seq2[DBMessage, error], error) {
return r.allOfTypeWhere(ctx, conn, queryParams{Where: "CHANNEL_ID = ?", Binds: []any{channelID}, OrderBy: []string{"T.ID " + order.String()}}, chunk.CMessages, chunk.CThreadMessages)
}
29 changes: 29 additions & 0 deletions internal/chunk/dbproc/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"database/sql"
"iter"
"time"

"github.com/jmoiron/sqlx"

"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/dbproc/repository"
"github.com/rusq/slackdump/v3/internal/fasttime"
)

const preallocSz = 100
Expand Down Expand Up @@ -135,6 +137,33 @@ func (s *Source) AllThreadMessages(ctx context.Context, channelID, threadID stri
return valueIter(it), nil
}

func (s *Source) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error {
tx, err := s.conn.BeginTxx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return err
}
defer tx.Rollback()

mr := repository.NewMessageRepository()
it, err := mr.Sorted(ctx, s.conn, channelID, repository.Asc)
if err != nil {
return err
}
for c, err := range it {
if err != nil {
return err
}
v, err := c.Val()
if err != nil {
return err
}
if err := cb(fasttime.Int2Time(c.ID), &v); err != nil {
return err
}
}
return nil
}

func (s *Source) ChannelInfo(ctx context.Context, channelID string) (*slack.Channel, error) {
cr := repository.NewChannelRepository()

Expand Down
10 changes: 10 additions & 0 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,13 @@ func (d *Directory) Latest(ctx context.Context) (map[GroupID]time.Time, error) {
}
return latest, nil
}

func (d *Directory) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error {
// TODO: this is oversimplification. The messages for the channel in
// canonical chunk directory may be stored in multiple files.
f, err := d.Open(FileID(channelID))
if err != nil {
return err
}
return f.Sorted(ctx, channelID, desc, cb)
}
6 changes: 4 additions & 2 deletions internal/chunk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,10 @@ func timeOffsets(ots offts, chanID string) map[int64]Addr {
}

// Sorted iterates over all the messages in the chunkfile in chronological
// order for the requested chanID. If desc is true, the slice will be
// iterated in reverse order.
// order for the requested chanID. If desc is true, the slice will be iterated
// in reverse order. It does not differentiate between the channel and thread
// messages. The function fn is called for each message in the slice. If the
// function returns an error, the iteration stops and the error is returned.
func (f *File) Sorted(ctx context.Context, chanID string, desc bool, fn func(ts time.Time, m *slack.Message) error) error {
ctx, task := trace.NewTask(ctx, "file.Sorted")
defer task.End()
Expand Down
32 changes: 15 additions & 17 deletions internal/chunk/transform/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime/trace"
"sort"
"sync/atomic"
"time"

"github.com/rusq/fsadapter"
"github.com/rusq/slack"
Expand Down Expand Up @@ -101,24 +102,13 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err
uidx := types.Users(e.getUsers()).IndexByID()
trgdir := ExportChanName(ci)

it, err := e.src.AllMessages(ctx, ci.ID)
if err != nil {
return fmt.Errorf("error getting messages for %q: %w", ci.ID, err)
}
// loop variables
var (
prevDt string
currDt string
mm []export.ExportMessage
)
for m, err := range it {
if err != nil {
return fmt.Errorf("error reading message: %w", err)
}
ts, err := structures.ParseSlackTS(m.Timestamp)
if err != nil {
return fmt.Errorf("error parsing timestamp: %w", err)
}
if err := e.src.Sorted(ctx, ci.ID, false, func(ts time.Time, m *slack.Message) error {
currDt = ts.Format("2006-01-02")
if currDt != prevDt || prevDt == "" {
if prevDt != "" {
Expand All @@ -131,10 +121,11 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err
prevDt = currDt
}

// the "thread" is only used to collect statistics. Thread messages
// are passed by Sorted and written as a normal course of action.
// NOTE: the "thread" is only used to collect statistics. Thread
// messages are passed by Sorted and written as a normal course of
// action.
var thread []slack.Message
if structures.IsThreadStart(&m) && m.LatestReply != structures.LatestReplyNoReplies {
if structures.IsThreadStart(m) && m.LatestReply != structures.LatestReplyNoReplies {
// get the thread for the initial thread message only.
var err error
itTm, err := e.src.AllThreadMessages(ctx, ci.ID, m.ThreadTimestamp)
Expand All @@ -158,12 +149,19 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err

// apply all message functions.
for _, fn := range e.msgFunc {
if err := fn(ci, &m); err != nil {
if err := fn(ci, m); err != nil {
return fmt.Errorf("error updating message: %w", err)
}
}

mm = append(mm, *toExportMessage(&m, thread, uidx[m.User]))
mm = append(mm, *toExportMessage(m, thread, uidx[m.User]))
return nil
}); err != nil {
if errors.Is(err, chunk.ErrNoData) {
lg.DebugContext(ctx, "no messages for the channel", "channel", ci.ID)
return nil
}
return fmt.Errorf("error reading messages: %w", err)
}

if len(mm) > 0 {
Expand Down
76 changes: 76 additions & 0 deletions internal/chunk/transform/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ package transform

import (
"context"
"os"
"path/filepath"
"reflect"
"sync/atomic"
"testing"

"github.com/rusq/fsadapter"
"github.com/rusq/slack"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/fixtures"
"github.com/rusq/slackdump/v3/internal/source"
"github.com/rusq/slackdump/v3/internal/source/mock_source"
"github.com/rusq/slackdump/v3/internal/testutil"
)

func Test_transform(t *testing.T) {
Expand Down Expand Up @@ -110,3 +115,74 @@ func TestExpConverter_getUsers(t *testing.T) {
})
}
}

func TestExpConverter_writeMessages(t *testing.T) {
type fields struct {
// src source.Sourcer
// fsa fsadapter.FS
users atomic.Value
msgFunc []msgUpdFunc
}
type args struct {
ctx context.Context
ci *slack.Channel
}
tests := []struct {
name string
fields fields
expectFn func(ms *mock_source.MockSourcer, mst *mock_source.MockStorage)
args args
wantFiles map[string]testutil.FileInfo
wantErr bool
}{
{
name: "threaded messages",
fields: fields{},
args: args{
ctx: context.Background(),
ci: fixtures.Load[[]*slack.Channel](fixtures.TestChannels)[0],
},
expectFn: func(ms *mock_source.MockSourcer, mst *mock_source.MockStorage) {
chanmsg := testutil.Slice2Seq2(
fixtures.Load[[]slack.Message](fixtures.ConvertPublic1AllMessagesJSON),
)
threadmsg := testutil.Slice2Seq2(
fixtures.Load[[]slack.Message](fixtures.ConvertPublic1AllThreadMessagesJSON),
)
ms.EXPECT().AllMessages(gomock.Any(), gomock.Any()).
Return(chanmsg, nil)
ms.EXPECT().AllThreadMessages(gomock.Any(), gomock.Any(), gomock.Any()).
Return(threadmsg, nil)
},
wantFiles: map[string]testutil.FileInfo{
"random/2025-01-10.json": {Name: "2025-01-10.json", Size: 1892},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
ms := mock_source.NewMockSourcer(ctrl)
mst := mock_source.NewMockStorage(ctrl)
if tt.expectFn != nil {
tt.expectFn(ms, mst)
}

dir := t.TempDir()
fsa := fsadapter.NewDirectory(dir)

e := &ExpConverter{
src: ms,
fsa: fsa,
users: tt.fields.users,
msgFunc: tt.fields.msgFunc,
}
if err := e.writeMessages(tt.args.ctx, tt.args.ci); (err != nil) != tt.wantErr {
t.Errorf("ExpConverter.writeMessages() error = %v, wantErr %v", err, tt.wantErr)
}
gotfiles := testutil.CollectFiles(t, os.DirFS(dir))
assert.Equal(t, tt.wantFiles, gotfiles)
})
}
}
61 changes: 61 additions & 0 deletions internal/fixtures/assets/converter/chan1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[
{
"client_msg_id": "54ee98bd-56ea-4210-b3e8-8351d759d3ab",
"type": "message",
"user": "UHSD97ZA5",
"text": "test",
"ts": "1736478514.897649",
"team": "THY5HTZ8U",
"blocks": [
{
"type": "rich_text",
"block_id": "gB9fq",
"elements": [
{
"type": "rich_text_section",
"elements": [
{
"type": "text",
"text": "test"
}
]
}
]
}
]
},
{
"client_msg_id": "324eed73-bf36-4f15-9927-3c4a63bf2341",
"type": "message",
"user": "UHSD97ZA5",
"text": "test2",
"ts": "1736478630.905399",
"thread_ts": "1736478630.905399",
"last_read": "1736554783.040199",
"subscribed": true,
"reply_count": 2,
"reply_users": [
"U0887G31L03",
"UHSD97ZA5"
],
"latest_reply": "1736554783.040199",
"team": "THY5HTZ8U",
"blocks": [
{
"type": "rich_text",
"block_id": "Qm0o+",
"elements": [
{
"type": "rich_text_section",
"elements": [
{
"type": "text",
"text": "test2"
}
]
}
]
}
]
}
]
Loading

0 comments on commit ba7a064

Please sign in to comment.