diff --git a/go/api/base_client.go b/go/api/base_client.go index e534f1aa19..8ff86c18a8 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -6614,3 +6614,205 @@ func (client *baseClient) CopyWithOptions( } return handleBoolResponse(result) } + +// Returns stream entries matching a given range of IDs. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// start - The start position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// end - The end position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// +// Return value: +// +// A `map` of key to stream entry data, where entry data is an array of +// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive. +// +// Example: +// +// // Retrieve all stream entries +// res, err := client.XRange( +// "key", +// options.NewInfiniteStreamBoundary(options.NegativeInfinity), +// options.NewInfiniteStreamBoundary(options.PositiveInfinity), +// ) +// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]] +// +// // Retrieve exactly one stream entry by id +// res, err := client.XRange( +// "key", +// options.NewStreamBoundary(streamId, true), +// options.NewStreamBoundary(streamId, true), +// ) +// fmt.Println(res) // map[key:[["field1", "entry1"]] +// +// [valkey.io]: https://valkey.io/commands/xrange/ +func (client *baseClient) XRange( + key string, + start options.StreamBoundary, + end options.StreamBoundary, +) (map[string][][]string, error) { + return client.XRangeWithOptions(key, start, end, nil) +} + +// Returns stream entries matching a given range of IDs. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// start - The start position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// end - The end position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// opts - Stream range options. +// +// Return value: +// +// A `map` of key to stream entry data, where entry data is an array of +// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive. +// +// Example: +// +// // Retrieve all stream entries +// res, err := client.XRangeWithOptions( +// "key", +// options.NewInfiniteStreamBoundary(options.NegativeInfinity), +// options.NewInfiniteStreamBoundary(options.PositiveInfinity), +// options.NewStreamRangeOptions().SetCount(10), +// ) +// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]] +// +// // Retrieve exactly one stream entry by id +// res, err := client.XRangeWithOptions( +// "key", +// options.NewStreamBoundary(streamId, true), +// options.NewStreamBoundary(streamId, true), +// options.NewStreamRangeOptions().SetCount(1), +// ) +// fmt.Println(res) // map[key:[["field1", "entry1"]] +// +// [valkey.io]: https://valkey.io/commands/xrange/ +func (client *baseClient) XRangeWithOptions( + key string, + start options.StreamBoundary, + end options.StreamBoundary, + opts *options.StreamRangeOptions, +) (map[string][][]string, error) { + args := []string{key, string(start), string(end)} + if opts != nil { + optionArgs, err := opts.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optionArgs...) + } + result, err := client.executeCommand(C.XRange, args) + if err != nil { + return nil, err + } + return handleMapOfArrayOfStringArrayOrNilResponse(result) +} + +// Returns stream entries matching a given range of IDs in reverse order. +// Equivalent to `XRange` but returns entries in reverse order. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// start - The start position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// end - The end position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// +// Return value: +// +// A `map` of key to stream entry data, where entry data is an array of +// pairings with format `[[field, entry], [field, entry], ...]`. +// +// Example: +// +// // Retrieve all stream entries +// res, err := client.XRevRange( +// "key", +// options.NewInfiniteStreamBoundary(options.PositiveInfinity), +// options.NewInfiniteStreamBoundary(options.NegativeInfinity), +// ) +// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]] +// +// [valkey.io]: https://valkey.io/commands/xrevrange/ +func (client *baseClient) XRevRange( + key string, + start options.StreamBoundary, + end options.StreamBoundary, +) (map[string][][]string, error) { + return client.XRevRangeWithOptions(key, start, end, nil) +} + +// Returns stream entries matching a given range of IDs in reverse order. +// Equivalent to `XRange` but returns entries in reverse order. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// start - The start position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// end - The end position. +// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status. +// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary. +// opts - Stream range options. +// +// Return value: +// +// A `map` of key to stream entry data, where entry data is an array of +// pairings with format `[[field, entry], [field, entry], ...]`. +// Returns `nil` if `count` is non-positive. +// +// Example: +// +// // Retrieve all stream entries +// res, err := client.XRevRangeWithOptions( +// "key", +// options.NewInfiniteStreamBoundary(options.PositiveInfinity), +// options.NewInfiniteStreamBoundary(options.NegativeInfinity), +// options.NewStreamRangeOptions().SetCount(10), +// ) +// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]] +// +// [valkey.io]: https://valkey.io/commands/xrevrange/ +func (client *baseClient) XRevRangeWithOptions( + key string, + start options.StreamBoundary, + end options.StreamBoundary, + opts *options.StreamRangeOptions, +) (map[string][][]string, error) { + args := []string{key, string(start), string(end)} + if opts != nil { + optionArgs, err := opts.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optionArgs...) + } + result, err := client.executeCommand(C.XRevRange, args) + if err != nil { + return nil, err + } + return handleMapOfArrayOfStringArrayOrNilResponse(result) +} diff --git a/go/api/options/constants.go b/go/api/options/constants.go index 1b0e33d540..45913c5b5c 100644 --- a/go/api/options/constants.go +++ b/go/api/options/constants.go @@ -10,3 +10,12 @@ const ( NoScores string = "NOSCORES" // Valkey API keyword for the no scores option for zscan command. WithValues string = "WITHVALUES" // Valkey API keyword to query hash values along their names in `HRANDFIELD`. ) + +type InfBoundary string + +const ( + // The highest bound in the sorted set + PositiveInfinity InfBoundary = "+" + // The lowest bound in the sorted set + NegativeInfinity InfBoundary = "-" +) diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index ef2876f6a6..62108e4d5c 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -398,3 +398,45 @@ func (sco *StreamClaimOptions) ToArgs() ([]string, error) { return optionArgs, nil } + +type StreamBoundary string + +// Create a new stream boundary. +func NewStreamBoundary(streamId string, isInclusive bool) StreamBoundary { + if !isInclusive { + return StreamBoundary("(" + streamId) + } + return StreamBoundary(streamId) +} + +// Create a new stream boundary defined by an infinity. +func NewInfiniteStreamBoundary(bound InfBoundary) StreamBoundary { + return StreamBoundary(string(bound)) +} + +// Optional arguments for `XRange` and `XRevRange` in [StreamCommands] +type StreamRangeOptions struct { + count int64 + countIsSet bool +} + +func NewStreamRangeOptions() *StreamRangeOptions { + return &StreamRangeOptions{} +} + +// Set the count. +func (sro *StreamRangeOptions) SetCount(count int64) *StreamRangeOptions { + sro.count = count + sro.countIsSet = true + return sro +} + +func (sro *StreamRangeOptions) ToArgs() ([]string, error) { + var args []string + + if sro.countIsSet { + args = append(args, "COUNT", utils.IntToString(sro.count)) + } + + return args, nil +} diff --git a/go/api/options/zrange_options.go b/go/api/options/zrange_options.go index d89e9124b8..77cda0c530 100644 --- a/go/api/options/zrange_options.go +++ b/go/api/options/zrange_options.go @@ -39,18 +39,10 @@ type RangeByLex struct { } type ( - InfBoundary string scoreBoundary string lexBoundary string ) -const ( - // The highest bound in the sorted set - PositiveInfinity InfBoundary = "+" - // The lowest bound in the sorted set - NegativeInfinity InfBoundary = "-" -) - // Create a new inclusive score boundary. func NewInclusiveScoreBoundary(bound float64) scoreBoundary { return scoreBoundary(utils.FloatToString(bound)) diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index e49c2a5db5..ad04c6adc5 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -711,6 +711,14 @@ func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) ( return claimedEntries, nil } +func handleMapOfArrayOfStringArrayOrNilResponse(response *C.struct_CommandResponse) (map[string][][]string, error) { + if response.response_type == uint32(C.Null) { + return nil, nil + } + + return handleMapOfArrayOfStringArrayResponse(response) +} + func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) { defer C.free_command_response(response) var null XAutoClaimResponse // default response diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index b6e1ccdb77..dd8a0888d3 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -108,4 +108,22 @@ type StreamCommands interface { ids []string, options *options.StreamClaimOptions, ) ([]string, error) + + XRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error) + + XRangeWithOptions( + key string, + start options.StreamBoundary, + end options.StreamBoundary, + options *options.StreamRangeOptions, + ) (map[string][][]string, error) + + XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error) + + XRevRangeWithOptions( + key string, + start options.StreamBoundary, + end options.StreamBoundary, + options *options.StreamRangeOptions, + ) (map[string][][]string, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 623cb58c6f..b302afe85c 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -7252,3 +7252,204 @@ func (suite *GlideTestSuite) TestCopyWithOptions() { assert.Equal(t, value, resultGet.Value()) }) } + +func (suite *GlideTestSuite) TestXRangeAndXRevRange() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.New().String() + key2 := uuid.New().String() + stringKey := uuid.New().String() + positiveInfinity := options.NewInfiniteStreamBoundary(options.PositiveInfinity) + negativeInfinity := options.NewInfiniteStreamBoundary(options.NegativeInfinity) + + // add stream entries + streamId1, err := client.XAdd( + key, + [][]string{{"field1", "value1"}}, + ) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), streamId1) + + streamId2, err := client.XAdd( + key, + [][]string{{"field2", "value2"}}, + ) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), streamId2) + + xlenResult, err := client.XLen(key) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(2), xlenResult) + + // get everything from the stream + xrangeResult, err := client.XRange( + key, + negativeInfinity, + positiveInfinity, + ) + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + map[string][][]string{streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}}, + xrangeResult, + ) + + // get everything from the stream in reverse + xrevrangeResult, err := client.XRevRange( + key, + positiveInfinity, + negativeInfinity, + ) + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + map[string][][]string{streamId2.Value(): {{"field2", "value2"}}, streamId1.Value(): {{"field1", "value1"}}}, + xrevrangeResult, + ) + + // returns empty map if + before - + xrangeResult, err = client.XRange( + key, + positiveInfinity, + negativeInfinity, + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrangeResult) + + // rev search returns empty if - before + + xrevrangeResult, err = client.XRevRange( + key, + negativeInfinity, + positiveInfinity, + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrevrangeResult) + + streamId3, err := client.XAdd( + key, + [][]string{{"field3", "value3"}}, + ) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), streamId3) + + // get the newest stream entry + xrangeResult, err = client.XRangeWithOptions( + key, + options.NewStreamBoundary(streamId2.Value(), false), + positiveInfinity, + options.NewStreamRangeOptions().SetCount(1), + ) + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + map[string][][]string{streamId3.Value(): {{"field3", "value3"}}}, + xrangeResult, + ) + + // doing the same with rev search + xrevrangeResult, err = client.XRevRangeWithOptions( + key, + positiveInfinity, + options.NewStreamBoundary(streamId2.Value(), false), + options.NewStreamRangeOptions().SetCount(1), + ) + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + map[string][][]string{streamId3.Value(): {{"field3", "value3"}}}, + xrevrangeResult, + ) + + // both xrange and xrevrange return nil with a zero/negative count + xrangeResult, err = client.XRangeWithOptions( + key, + negativeInfinity, + positiveInfinity, + options.NewStreamRangeOptions().SetCount(0), + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrangeResult) + + xrevrangeResult, err = client.XRevRangeWithOptions( + key, + positiveInfinity, + negativeInfinity, + options.NewStreamRangeOptions().SetCount(-1), + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrevrangeResult) + + // xrange and xrevrange against an empty stream + xdelResult, err := client.XDel(key, []string{streamId1.Value(), streamId2.Value(), streamId3.Value()}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(3), xdelResult) + + xrangeResult, err = client.XRange( + key, + negativeInfinity, + positiveInfinity, + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrangeResult) + + xrevrangeResult, err = client.XRevRange( + key, + positiveInfinity, + negativeInfinity, + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrevrangeResult) + + // xrange and xrevrange against a non-existent stream + xrangeResult, err = client.XRange( + key2, + negativeInfinity, + positiveInfinity, + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrangeResult) + + xrevrangeResult, err = client.XRevRange( + key2, + positiveInfinity, + negativeInfinity, + ) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), xrevrangeResult) + + // xrange and xrevrange against a non-stream key + _, err = client.Set(stringKey, "test") + assert.NoError(suite.T(), err) + _, err = client.XRange( + stringKey, + negativeInfinity, + positiveInfinity, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XRevRange( + stringKey, + positiveInfinity, + negativeInfinity, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + // xrange and xrevrange when range bound is not a valid id + _, err = client.XRange( + key, + options.NewStreamBoundary("invalid-id", false), + positiveInfinity, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XRevRange( + key, + options.NewStreamBoundary("invalid-id", false), + negativeInfinity, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +}