Skip to content

Commit

Permalink
Merge pull request #422 from rusq/i174-resume-timestamp
Browse files Browse the repository at this point in the history
Time parsing fix + "inclusive" stream parameter
  • Loading branch information
rusq authored Jan 25, 2025
2 parents a481242 + 9a3e07e commit 54374f9
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 54 deletions.
18 changes: 11 additions & 7 deletions cmd/slackdump/internal/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,16 @@ func NewDirectory(name string) (*chunk.Directory, error) {

// ArchiveController returns the default archive controller initialised based
// on global configuration parameters.
func ArchiveController(ctx context.Context, cd *chunk.Directory, sess *slackdump.Session) (*control.Controller, error) {
func ArchiveController(ctx context.Context, cd *chunk.Directory, sess *slackdump.Session, opts ...stream.Option) (*control.Controller, error) {
lg := cfg.Log

sopts := []stream.Option{
stream.OptLatest(time.Time(cfg.Latest)),
stream.OptOldest(time.Time(cfg.Oldest)),
stream.OptResultFn(resultLogger(lg)),
}
sopts = append(sopts, opts...)

// start attachment downloader
dl := fileproc.NewDownloader(
ctx,
Expand All @@ -107,14 +115,10 @@ func ArchiveController(ctx context.Context, cd *chunk.Directory, sess *slackdump
fsadapter.NewDirectory(cd.Name()),
lg,
)
stream := sess.Stream(
stream.OptLatest(time.Time(cfg.Latest)),
stream.OptOldest(time.Time(cfg.Oldest)),
stream.OptResultFn(resultLogger(lg)),
)

ctrl := control.New(
cd,
stream,
sess.Stream(sopts...),
control.WithLogger(lg),
control.WithFlags(control.Flags{MemberOnly: cfg.MemberOnly, RecordFiles: cfg.RecordFiles}),
control.WithFiler(fileproc.New(dl)),
Expand Down
40 changes: 17 additions & 23 deletions cmd/slackdump/internal/resume/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"strings"
"text/tabwriter"
"time"

Expand All @@ -16,6 +16,7 @@ import (
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v3/internal/source"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/stream"
)

var CmdResume = &base.Command{
Expand Down Expand Up @@ -88,29 +89,28 @@ func Resume(ctx context.Context, sess *slackdump.Session, src source.Sourcer, fl

lg.Info("scanning messages")

l, err := src.Latest(ctx)
latest, err := src.Latest(ctx)
if err != nil {
return fmt.Errorf("error loading latest timestamps: %w", err)
}

// by this point we have all the channels and maybe threads along with their
// respective latest timestamps.
if slog.Default().Enabled(ctx, slog.LevelDebug) {
printLatest(l)
}
debugprint(strlatest(latest))
// remove all threads from the list if they are disabled
el := make([]structures.EntityItem, 0, len(l))
for sl, ts := range l {
el := make([]structures.EntityItem, 0, len(latest))
for sl, ts := range latest {
if sl.IsThread() && !p.IncludeThreads {
continue
}
item := structures.EntityItem{
Id: sl.String(),
Oldest: tmin(time.Time(cfg.Oldest), ts),
Oldest: ts,
Latest: time.Time(cfg.Latest),
Include: true,
}
el = append(el, item)
debugprint(fmt.Sprintf("%s: %d->%d", item.Id, ts.UTC().UnixMicro(), item.Oldest.UnixMicro()))
}
list := structures.NewEntityListFromItems(el...)

Expand All @@ -120,7 +120,7 @@ func Resume(ctx context.Context, sess *slackdump.Session, src source.Sourcer, fl
}
defer cd.Close()

ctrl, err := archive.ArchiveController(ctx, cd, sess)
ctrl, err := archive.ArchiveController(ctx, cd, sess, stream.OptInclusive(false))
if err != nil {
return fmt.Errorf("error creating archive controller: %w", err)
}
Expand All @@ -132,25 +132,19 @@ func Resume(ctx context.Context, sess *slackdump.Session, src source.Sourcer, fl
return nil
}

func tmin(a, b time.Time) time.Time {
if a.Before(b) {
return a
}
return b
}

func tmax(a, b time.Time) time.Time {
if a.After(b) {
return a
func debugprint(a ...any) {
if slog.Default().Enabled(context.Background(), slog.LevelDebug) {
fmt.Println(a...)
}
return b
}

func printLatest(l map[structures.SlackLink]time.Time) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0)
func strlatest(l map[structures.SlackLink]time.Time) string {
var buf strings.Builder
tw := tabwriter.NewWriter(&buf, 0, 0, 1, ' ', 0)
fmt.Fprintln(tw, "Group ID\tLatest")
for gid, ts := range l {
fmt.Fprintf(tw, "%s\t%s\n", gid, ts.Format("2006-01-02 15:04:05"))
fmt.Fprintf(tw, "%s\t%s\n", gid, ts.Format("2006-01-02 15:04:05 MST"))
}
tw.Flush()
return buf.String()
}
28 changes: 14 additions & 14 deletions internal/structures/slacktime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package structures
// in this file: slack timestamp parsing functions

import (
"errors"
"fmt"
"strconv"
"strings"
"time"

"errors"
)

// ParseThreadID parses the thread id (ie. p1577694990000400) and returns
Expand All @@ -29,27 +28,28 @@ func ParseSlackTS(timestamp string) (time.Time, error) {
base = 10
bit = 64
)
sHi, sLo, found := strings.Cut(timestamp, ".")

var hi, lo int64
hi, err := strconv.ParseInt(sHi, base, bit)
sSec, sMicro, found := strings.Cut(timestamp, ".")
if sSec == "" {
return time.Time{}, errors.New("empty timestamp")
}
var t int64
var err error
if !found {
t, err = strconv.ParseInt(sSec+"000000", base, bit)
} else {
t, err = strconv.ParseInt(sSec+sMicro, base, bit)
}
if err != nil {
return time.Time{}, err
}
if found {
lo, err = strconv.ParseInt(sLo, base, bit)
if err != nil {
return time.Time{}, err
}
}
return time.Unix(hi, lo).UTC(), nil
return time.UnixMicro(t).UTC(), nil
}

func FormatSlackTS(ts time.Time) string {
if ts.IsZero() || ts.Before(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)) {
return ""
}
hi := ts.Unix()
lo := ts.UnixNano() % 1_000_000
lo := ts.UnixMicro() % 1_000_000
return fmt.Sprintf("%d.%06d", hi, lo)
}
18 changes: 10 additions & 8 deletions internal/structures/slacktime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ func Test_parseSlackTS(t *testing.T) {
want time.Time
wantErr bool
}{
{"valid time", args{"1534552745.065949"}, time.Date(2018, 8, 18, 0, 39, 05, 65949, time.UTC), false},
{"another valid time", args{"1638494510.037400"}, time.Date(2021, 12, 3, 1, 21, 50, 37400, time.UTC), false},
{"the time when I slipped", args{"1645551829.244659"}, time.Date(2022, 2, 22, 17, 43, 49, 244659, time.UTC), false},
{"time without millis", args{"0"}, time.Date(1970, 1, 1, 0, 00, 00, 0, time.UTC), false},
{"valid time", args{"1534552745.065949"}, time.UnixMicro(1534552745065949).UTC(), false},
{"another valid time", args{"1638494510.037400"}, time.Date(2021, 12, 3, 1, 21, 50, 37400000, time.UTC), false},
{"the time when I slipped", args{"1645551829.244659"}, time.Date(2022, 2, 22, 17, 43, 49, 244659000, time.UTC), false},
{"time without millis", args{"0"}, time.Date(1970, 1, 1, 0, 0o0, 0o0, 0, time.UTC), false},
{"invalid time", args{"x"}, time.Time{}, true},
{"invalid time", args{"x.x"}, time.Time{}, true},
{"invalid time", args{"4.x"}, time.Time{}, true},
{"invalid time", args{"x.4"}, time.Time{}, true},
{"invalid time", args{".4"}, time.Time{}, true},
{"polly time", args{"1737160363.583369"}, time.UnixMicro(1737160363583369).UTC(), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -40,7 +41,7 @@ func Test_parseSlackTS(t *testing.T) {
}
}

func Test_formatSlackTS(t *testing.T) {
func Test_FormatSlackTS(t *testing.T) {
type args struct {
ts time.Time
}
Expand All @@ -49,11 +50,12 @@ func Test_formatSlackTS(t *testing.T) {
args args
want string
}{
{"ok", args{time.Date(2018, 8, 18, 0, 39, 05, 65949, time.UTC)}, "1534552745.065949"},
{"another valid time", args{time.Date(2021, 12, 3, 1, 21, 50, 37400, time.UTC)}, "1638494510.037400"},
{"ok", args{time.Date(2018, 8, 18, 0, 39, 0o5, 65949000, time.UTC)}, "1534552745.065949"},
{"another valid time", args{time.Date(2021, 12, 3, 1, 21, 50, 37400000, time.UTC)}, "1638494510.037400"},
{"empty", args{time.Time{}}, ""},
{"Happy new 1970 year", args{time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)}, "0.000000"},
{"prepare for the future", args{time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(-1 * time.Nanosecond)}, ""},
{"polly message", args{time.UnixMicro(1737160363583369)}, "1737160363.583369"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -77,7 +79,7 @@ func Test_parseThreadID(t *testing.T) {
{
"valid threadID",
args{"p1577694990000400"},
time.Date(2019, 12, 30, 8, 36, 30, 400, time.UTC),
time.Date(2019, 12, 30, 8, 36, 30, 400000, time.UTC),
false,
},
{
Expand Down
4 changes: 2 additions & 2 deletions stream/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (cs *Stream) channel(ctx context.Context, req request, callback func(mm []s
Limit: cs.limits.tier.Request.Conversations,
Oldest: structures.FormatSlackTS(structures.NVLTime(req.Oldest, cs.oldest)),
Latest: structures.FormatSlackTS(structures.NVLTime(req.Latest, cs.latest)),
Inclusive: true,
Inclusive: cs.inclusive,
})
return apiErr
}); err != nil {
Expand Down Expand Up @@ -247,7 +247,7 @@ func (cs *Stream) thread(ctx context.Context, req request, callback func(mm []sl
Limit: cs.limits.tier.Request.Replies,
Oldest: structures.FormatSlackTS(structures.NVLTime(req.Oldest, cs.oldest)),
Latest: structures.FormatSlackTS(structures.NVLTime(req.Latest, cs.latest)),
Inclusive: true,
Inclusive: cs.inclusive,
})
return apiErr
}); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Stream struct {
limits rateLimits
chanCache *chanCache
fastSearch bool
inclusive bool
resultFn []func(sr Result) error
}

Expand Down Expand Up @@ -173,13 +174,20 @@ func OptFastSearch() Option {
}
}

func OptInclusive(b bool) Option {
return func(cs *Stream) {
cs.inclusive = b
}
}

// New creates a new Stream instance that allows to stream different
// slack entities.
func New(cl Slacker, l *network.Limits, opts ...Option) *Stream {
cs := &Stream{
client: cl,
limits: limits(l),
chanCache: new(chanCache),
inclusive: true,
}
for _, opt := range opts {
opt(cs)
Expand Down

0 comments on commit 54374f9

Please sign in to comment.