Skip to content

Commit

Permalink
cherrypick changes to file and chunk from i174-multichunk
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 22, 2025
1 parent 09d6341 commit 2f1ed49
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 105 deletions.
15 changes: 14 additions & 1 deletion internal/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
CSearchMessages
CSearchFiles

CAny ChunkType = 255 // special value for any type
CAny ChunkType = 255 // Special type, meaning any type of chunk.
)

var ErrUnsupChunkType = fmt.Errorf("unsupported chunk type")
Expand Down Expand Up @@ -186,6 +186,19 @@ func (id GroupID) IsThread() bool {
return strings.HasPrefix(string(id), threadPrefix)
}

// ExtractChannelID attempts to extract the channel ID from the GroupID if it
// is a channel or a thread ID. Otherwise, ok will be false.
func (id GroupID) ExtractChannelID() (channelID string, ok bool) {
if id.IsThread() {
channelID, _, ok = id.AsThreadID()
return
}
if id.IsChannel() {
return id.AsChannelID()
}
return "", false
}

// asThreadID returns the channelID and threadTS from the GroupID. If the
// GroupID is not a thread ID, it returns false.
func (id GroupID) AsThreadID() (channelID, threadTS string, ok bool) {
Expand Down
39 changes: 39 additions & 0 deletions internal/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,42 @@ func TestGroupID_AsThreadID(t *testing.T) {
})
}
}

func TestGroupID_ExtractChannelID(t *testing.T) {
tests := []struct {
name string
id GroupID
wantChannelID string
wantOk bool
}{
{
name: "channel",
id: "C123",
wantChannelID: "C123",
wantOk: true,
},
{
name: "thread",
id: "tC123:1234",
wantChannelID: "C123",
wantOk: true,
},
{
name: "invalid",
id: "invalid",
wantChannelID: "",
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotChannelID, gotOk := tt.id.ExtractChannelID()
if gotChannelID != tt.wantChannelID {
t.Errorf("GroupID.ExtractChannelID() gotChannelID = %v, want %v", gotChannelID, tt.wantChannelID)
}
if gotOk != tt.wantOk {
t.Errorf("GroupID.ExtractChannelID() gotOk = %v, want %v", gotOk, tt.wantOk)
}
})
}
}
5 changes: 4 additions & 1 deletion internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ const (
FSearch FileID = "search"
)

const UploadsDir = "__uploads" // for serving files
const (
UploadsDir = "__uploads" // for serving files
AvatarsDir = "__avatars"
)

// Directory is an abstraction over the directory with chunk files. It
// provides a way to write chunk files and read channels, users and messages
Expand Down
67 changes: 27 additions & 40 deletions internal/chunk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

var (
ErrNotFound = errors.New("not found")
ErrNoData = errors.New("no data")
ErrDataMisaligned = errors.New("internal error: index and file data misaligned")
)

Expand Down Expand Up @@ -126,7 +127,7 @@ func indexChunks(dec decoder) (index, error) {
idx[id] = append(idx[id], offset)
}

slog.Default().Debug("indexing chunks", "len(idx)", len(idx), "caller", osext.Caller(2), "took", time.Since(start).String(), "per_sec", float64(len(idx))/time.Since(start).Seconds())
slog.Debug("indexing chunks", "len(idx)", len(idx), "caller", osext.Caller(2), "took", time.Since(start).String(), "per_sec", float64(len(idx))/time.Since(start).Seconds())
return idx, nil
}

Expand Down Expand Up @@ -226,22 +227,12 @@ func (f *File) State() (*state.State, error) {

// AllMessages returns all the messages for the given channel posted to it (no
// thread). The messages are in the order as they appear in the file.
func (f *File) AllMessages(ctx context.Context, channelID string) ([]slack.Message, error) {
var mm []slack.Message
err := f.Sorted(ctx, false, func(ts time.Time, m *slack.Message) error {
if m.Channel != channelID {
return nil
}
if m.ThreadTimestamp != "" && m.ThreadTimestamp != m.Timestamp {
return nil
}
mm = append(mm, *m)
return nil
})
func (f *File) AllMessages(_ context.Context, channelID string) ([]slack.Message, error) {
m, err := f.allMessagesForID(GroupID(channelID))
if err != nil {
return nil, fmt.Errorf("failed getting messages for %s: %w", channelID, err)
}
return mm, nil
return m, nil
}

// AllThreadMessages returns all the messages for the given thread. It does
Expand Down Expand Up @@ -305,22 +296,6 @@ func (f *File) AllChannelInfos() ([]slack.Channel, error) {
return chans, nil
}

// AllChannelInfoWithMembers returns all channels with Members populated.
func (f *File) AllChannelInfoWithMembers() ([]slack.Channel, error) {
c, err := f.AllChannelInfos()
if err != nil {
return nil, err
}
for i := range c {
members, err := f.ChannelUsers(c[i].ID)
if err != nil {
return nil, err
}
c[i].Members = members
}
return c, nil
}

// int64s implements sort.Interface for []int64.
type int64s []int64

Expand Down Expand Up @@ -430,6 +405,7 @@ type offts map[int64]offsetInfo
type offsetInfo struct {
ID GroupID
Type ChunkType
TS int64 // timestamp of the chunk
Timestamps []int64
}

Expand Down Expand Up @@ -471,6 +447,7 @@ func (f *File) offsetTimestamps(ctx context.Context) (offts, error) {
ret[offset] = offsetInfo{
ID: chunk.ID(),
Type: chunk.Type,
TS: chunk.Timestamp,
Timestamps: ts,
}
}
Expand All @@ -488,9 +465,13 @@ type Addr struct {
// the message with this timestamp within the message slice. It converts the
// string timestamp to an int64 timestamp using structures.TS2int, but the
// original string timestamp returned in the TimeOffset struct.
func timeOffsets(ots offts) map[int64]Addr {
func timeOffsets(ots offts, chanID string) map[int64]Addr {
ret := make(map[int64]Addr, len(ots))
for offset, info := range ots {
channelID, ok := info.ID.ExtractChannelID()
if !ok || channelID != chanID {
continue
}
for i, ts := range info.Timestamps {
ret[ts] = Addr{
Offset: offset,
Expand All @@ -502,8 +483,9 @@ func timeOffsets(ots offts) map[int64]Addr {
}

// Sorted iterates over all the messages in the chunkfile in chronological
// order. If desc is true, the slice will be iterated in reverse order.
func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *slack.Message) error) error {
// order for the requested chanID. If desc is true, the slice will be
// iterated in reverse order.
func (f *File) Sorted(ctx context.Context, chanID string, desc bool, fn func(ts time.Time, m *slack.Message) error) error {
ctx, task := trace.NewTask(ctx, "file.Sorted")
defer task.End()

Expand All @@ -515,12 +497,15 @@ func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *s
}

rgnTos := trace.StartRegion(ctx, "timeOffsets")
tos := timeOffsets(ots)
tos := timeOffsets(ots, chanID)
rgnTos.End()
tsList := make([]int64, 0, len(tos))
for ts := range tos {
tsList = append(tsList, ts)
}
if len(tsList) == 0 {
return ErrNoData
}

trace.WithRegion(ctx, "sorted.sort", func() {
if desc {
Expand All @@ -537,8 +522,8 @@ func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *s
for _, ts := range tsList {
tmOff := tos[ts]
// read the new chunk from the file only in case that the offset has
// changed.
if tmOff.Offset != prevOffset {
// changed or the chunk is nil (initial value).
if tmOff.Offset != prevOffset || chunk == nil {
var err error
chunk, err = f.chunkAt(tmOff.Offset)
if err != nil {
Expand All @@ -547,7 +532,7 @@ func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *s
prevOffset = tmOff.Offset
}

if err := fn(fasttime.Int2Time(ts).UTC(), &chunk.Messages[tmOff.Index]); err != nil {
if err := fn(fasttime.Int2Time(ts), &chunk.Messages[tmOff.Index]); err != nil {
return err
}
}
Expand Down Expand Up @@ -639,13 +624,15 @@ func (f *File) Latest(ctx context.Context) (map[GroupID]time.Time, error) {
if len(info.Timestamps) == 0 {
continue
}
sort.Sort(sort.Reverse(int64s(info.Timestamps)))
latest := fasttime.Int2Time(info.Timestamps[0])
curr, ok := ret[info.ID]
if !ok {
ret[info.ID] = fasttime.Int2Time(info.Timestamps[0])
ret[info.ID] = latest
continue
}
if fasttime.Int2Time(info.Timestamps[0]).After(curr) {
ret[info.ID] = fasttime.Int2Time(info.Timestamps[0])
if latest.After(curr) {
ret[info.ID] = latest
}
}
return ret, nil
Expand Down
Loading

0 comments on commit 2f1ed49

Please sign in to comment.