Skip to content

Commit

Permalink
Go: Add command XRange and XRevRange (#2989)
Browse files Browse the repository at this point in the history
* Go: Add command XRange and XRevRange

Signed-off-by: TJ Zhang <[email protected]>
  • Loading branch information
tjzhang-BQ authored and EdricCua committed Jan 28, 2025
1 parent a679656 commit 62ecfb7
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 8 deletions.
202 changes: 202 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6614,3 +6614,205 @@ func (client *baseClient) CopyWithOptions(
}
return handleBoolResponse(result)
}

// Returns stream entries matching a given range of IDs.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// start - The start position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
// end - The end position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
//
// Example:
//
// // Retrieve all stream entries
// res, err := client.XRange(
// "key",
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
//
// // Retrieve exactly one stream entry by id
// res, err := client.XRange(
// "key",
// options.NewStreamBoundary(streamId, true),
// options.NewStreamBoundary(streamId, true),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"]]
//
// [valkey.io]: https://valkey.io/commands/xrange/
func (client *baseClient) XRange(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
) (map[string][][]string, error) {
return client.XRangeWithOptions(key, start, end, nil)
}

// Returns stream entries matching a given range of IDs.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// start - The start position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
// end - The end position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
// opts - Stream range options.
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
//
// Example:
//
// // Retrieve all stream entries
// res, err := client.XRangeWithOptions(
// "key",
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// options.NewStreamRangeOptions().SetCount(10),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
//
// // Retrieve exactly one stream entry by id
// res, err := client.XRangeWithOptions(
// "key",
// options.NewStreamBoundary(streamId, true),
// options.NewStreamBoundary(streamId, true),
// options.NewStreamRangeOptions().SetCount(1),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"]]
//
// [valkey.io]: https://valkey.io/commands/xrange/
func (client *baseClient) XRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
opts *options.StreamRangeOptions,
) (map[string][][]string, error) {
args := []string{key, string(start), string(end)}
if opts != nil {
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
}
result, err := client.executeCommand(C.XRange, args)
if err != nil {
return nil, err
}
return handleMapOfArrayOfStringArrayOrNilResponse(result)
}

// Returns stream entries matching a given range of IDs in reverse order.
// Equivalent to `XRange` but returns entries in reverse order.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// start - The start position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
// end - The end position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`.
//
// Example:
//
// // Retrieve all stream entries
// res, err := client.XRevRange(
// "key",
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// )
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
//
// [valkey.io]: https://valkey.io/commands/xrevrange/
func (client *baseClient) XRevRange(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
) (map[string][][]string, error) {
return client.XRevRangeWithOptions(key, start, end, nil)
}

// Returns stream entries matching a given range of IDs in reverse order.
// Equivalent to `XRange` but returns entries in reverse order.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// start - The start position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
// end - The end position.
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
// opts - Stream range options.
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`.
// Returns `nil` if `count` is non-positive.
//
// Example:
//
// // Retrieve all stream entries
// res, err := client.XRevRangeWithOptions(
// "key",
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// options.NewStreamRangeOptions().SetCount(10),
// )
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
//
// [valkey.io]: https://valkey.io/commands/xrevrange/
func (client *baseClient) XRevRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
opts *options.StreamRangeOptions,
) (map[string][][]string, error) {
args := []string{key, string(start), string(end)}
if opts != nil {
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
}
result, err := client.executeCommand(C.XRevRange, args)
if err != nil {
return nil, err
}
return handleMapOfArrayOfStringArrayOrNilResponse(result)
}
9 changes: 9 additions & 0 deletions go/api/options/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ const (
NoScores string = "NOSCORES" // Valkey API keyword for the no scores option for zscan command.
WithValues string = "WITHVALUES" // Valkey API keyword to query hash values along their names in `HRANDFIELD`.
)

type InfBoundary string

const (
// The highest bound in the sorted set
PositiveInfinity InfBoundary = "+"
// The lowest bound in the sorted set
NegativeInfinity InfBoundary = "-"
)
42 changes: 42 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,45 @@ func (sco *StreamClaimOptions) ToArgs() ([]string, error) {

return optionArgs, nil
}

type StreamBoundary string

// Create a new stream boundary.
func NewStreamBoundary(streamId string, isInclusive bool) StreamBoundary {
if !isInclusive {
return StreamBoundary("(" + streamId)
}
return StreamBoundary(streamId)
}

// Create a new stream boundary defined by an infinity.
func NewInfiniteStreamBoundary(bound InfBoundary) StreamBoundary {
return StreamBoundary(string(bound))
}

// Optional arguments for `XRange` and `XRevRange` in [StreamCommands]
type StreamRangeOptions struct {
count int64
countIsSet bool
}

func NewStreamRangeOptions() *StreamRangeOptions {
return &StreamRangeOptions{}
}

// Set the count.
func (sro *StreamRangeOptions) SetCount(count int64) *StreamRangeOptions {
sro.count = count
sro.countIsSet = true
return sro
}

func (sro *StreamRangeOptions) ToArgs() ([]string, error) {
var args []string

if sro.countIsSet {
args = append(args, "COUNT", utils.IntToString(sro.count))
}

return args, nil
}
8 changes: 0 additions & 8 deletions go/api/options/zrange_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,10 @@ type RangeByLex struct {
}

type (
InfBoundary string
scoreBoundary string
lexBoundary string
)

const (
// The highest bound in the sorted set
PositiveInfinity InfBoundary = "+"
// The lowest bound in the sorted set
NegativeInfinity InfBoundary = "-"
)

// Create a new inclusive score boundary.
func NewInclusiveScoreBoundary(bound float64) scoreBoundary {
return scoreBoundary(utils.FloatToString(bound))
Expand Down
8 changes: 8 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,14 @@ func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) (
return claimedEntries, nil
}

func handleMapOfArrayOfStringArrayOrNilResponse(response *C.struct_CommandResponse) (map[string][][]string, error) {
if response.response_type == uint32(C.Null) {
return nil, nil
}

return handleMapOfArrayOfStringArrayResponse(response)
}

func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) {
defer C.free_command_response(response)
var null XAutoClaimResponse // default response
Expand Down
18 changes: 18 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,22 @@ type StreamCommands interface {
ids []string,
options *options.StreamClaimOptions,
) ([]string, error)

XRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)

XRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
options *options.StreamRangeOptions,
) (map[string][][]string, error)

XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)

XRevRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
options *options.StreamRangeOptions,
) (map[string][][]string, error)
}
Loading

0 comments on commit 62ecfb7

Please sign in to comment.