Skip to content

Commit

Permalink
[Sync Release 1.3 commit] XRange and XRevRange (#3093)
Browse files Browse the repository at this point in the history
* Added fix for xrange xrevrange in Go

---------

Signed-off-by: Edward Liang <[email protected]>
  • Loading branch information
edlng authored Feb 6, 2025
1 parent 6a98816 commit c6c4c45
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 26 deletions.
32 changes: 16 additions & 16 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6812,7 +6812,7 @@ func (client *baseClient) CopyWithOptions(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
//
// Example:
Expand All @@ -6823,22 +6823,22 @@ func (client *baseClient) CopyWithOptions(
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
// fmt.Println(res) // [{streamId [["field1", "entry1"], ["field2", "entry2"]]}]
//
// // Retrieve exactly one stream entry by id
// res, err := client.XRange(
// "key",
// options.NewStreamBoundary(streamId, true),
// options.NewStreamBoundary(streamId, true),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"]]
// fmt.Println(res) // [{streamId [["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrange/
func (client *baseClient) XRange(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
return client.XRangeWithOptions(key, start, end, nil)
}

Expand All @@ -6859,7 +6859,7 @@ func (client *baseClient) XRange(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
//
// Example:
Expand All @@ -6871,7 +6871,7 @@ func (client *baseClient) XRange(
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// options.NewStreamRangeOptions().SetCount(10),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
// fmt.Println(res) // [{streamId [["field1", "entry1"], ["field2", "entry2"]]}]
//
// // Retrieve exactly one stream entry by id
// res, err := client.XRangeWithOptions(
Expand All @@ -6880,15 +6880,15 @@ func (client *baseClient) XRange(
// options.NewStreamBoundary(streamId, true),
// options.NewStreamRangeOptions().SetCount(1),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"]]
// fmt.Println(res) // [{streamId [["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrange/
func (client *baseClient) XRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
opts *options.StreamRangeOptions,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
args := []string{key, string(start), string(end)}
if opts != nil {
optionArgs, err := opts.ToArgs()
Expand All @@ -6901,7 +6901,7 @@ func (client *baseClient) XRangeWithOptions(
if err != nil {
return nil, err
}
return handleMapOfArrayOfStringArrayOrNilResponse(result)
return handleXRangeResponse(result)
}

// Returns stream entries matching a given range of IDs in reverse order.
Expand All @@ -6921,7 +6921,7 @@ func (client *baseClient) XRangeWithOptions(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`.
//
// Example:
Expand All @@ -6932,14 +6932,14 @@ func (client *baseClient) XRangeWithOptions(
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// )
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
// fmt.Println(res) // [{streamID ["field2", "entry2"], ["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrevrange/
func (client *baseClient) XRevRange(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
return client.XRevRangeWithOptions(key, start, end, nil)
}

Expand All @@ -6961,7 +6961,7 @@ func (client *baseClient) XRevRange(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`.
// Returns `nil` if `count` is non-positive.
//
Expand All @@ -6974,15 +6974,15 @@ func (client *baseClient) XRevRange(
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// options.NewStreamRangeOptions().SetCount(10),
// )
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
// fmt.Println(res) // [{streamID [["field2", "entry2"], ["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrevrange/
func (client *baseClient) XRevRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
opts *options.StreamRangeOptions,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
args := []string{key, string(start), string(end)}
if opts != nil {
optionArgs, err := opts.ToArgs()
Expand All @@ -6995,7 +6995,7 @@ func (client *baseClient) XRevRangeWithOptions(
if err != nil {
return nil, err
}
return handleMapOfArrayOfStringArrayOrNilResponse(result)
return handleXRevRangeResponse(result)
}

// Returns information about the stream stored at `key`.
Expand Down
87 changes: 85 additions & 2 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "C"
import (
"fmt"
"reflect"
"sort"
"strconv"
"unsafe"

Expand Down Expand Up @@ -833,12 +834,94 @@ func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) (
return claimedEntries, nil
}

func handleMapOfArrayOfStringArrayOrNilResponse(response *C.struct_CommandResponse) (map[string][][]string, error) {
func handleXRangeResponse(response *C.struct_CommandResponse) ([]XRangeResponse, error) {
defer C.free_command_response(response)

if response.response_type == uint32(C.Null) {
return nil, nil
}

typeErr := checkResponseType(response, C.Map, false)
if typeErr != nil {
return nil, typeErr
}
mapData, err := parseMap(response)
if err != nil {
return nil, err
}
converted, err := mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
false,
},
false,
}.convert(mapData)
if err != nil {
return nil, err
}
claimedEntries, ok := converted.(map[string][][]string)
if !ok {
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type of second element: %T", converted)}
}

xRangeResponseArray := make([]XRangeResponse, 0, len(claimedEntries))

for k, v := range claimedEntries {
xRangeResponseArray = append(xRangeResponseArray, XRangeResponse{k, v})
}

sort.Slice(xRangeResponseArray, func(i, j int) bool {
return xRangeResponseArray[i].StreamId < xRangeResponseArray[j].StreamId
})
return xRangeResponseArray, nil
}

func handleXRevRangeResponse(response *C.struct_CommandResponse) ([]XRangeResponse, error) {
defer C.free_command_response(response)

if response.response_type == uint32(C.Null) {
return nil, nil
}

return handleMapOfArrayOfStringArrayResponse(response)
typeErr := checkResponseType(response, C.Map, false)
if typeErr != nil {
return nil, typeErr
}
mapData, err := parseMap(response)
if err != nil {
return nil, err
}
converted, err := mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
false,
},
false,
}.convert(mapData)
if err != nil {
return nil, err
}
claimedEntries, ok := converted.(map[string][][]string)
if !ok {
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type of second element: %T", converted)}
}

xRangeResponseArray := make([]XRangeResponse, 0, len(claimedEntries))

for k, v := range claimedEntries {
xRangeResponseArray = append(xRangeResponseArray, XRangeResponse{k, v})
}

sort.Slice(xRangeResponseArray, func(i, j int) bool {
return xRangeResponseArray[i].StreamId > xRangeResponseArray[j].StreamId
})
return xRangeResponseArray, nil
}

func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) {
Expand Down
6 changes: 6 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type MemberAndScore struct {
Score float64
}

// Response type of [XRange] and [XRevRange] commands.
type XRangeResponse struct {
StreamId string
Entries [][]string
}

// Response type of [XAutoClaim] command.
type XAutoClaimResponse struct {
NextEntry string
Expand Down
8 changes: 4 additions & 4 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,21 @@ type StreamCommands interface {

XInfoStreamFullWithOptions(key string, options *options.XInfoStreamOptions) (map[string]any, error)

XRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)
XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

XRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
options *options.StreamRangeOptions,
) (map[string][][]string, error)
) ([]XRangeResponse, error)

XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)
XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

XRevRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
options *options.StreamRangeOptions,
) (map[string][][]string, error)
) ([]XRangeResponse, error)
}
18 changes: 14 additions & 4 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7506,7 +7506,10 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}},
[]api.XRangeResponse{
{StreamId: streamId1.Value(), Entries: [][]string{{"field1", "value1"}}},
{StreamId: streamId2.Value(), Entries: [][]string{{"field2", "value2"}}},
},
xrangeResult,
)

Expand All @@ -7519,7 +7522,10 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId2.Value(): {{"field2", "value2"}}, streamId1.Value(): {{"field1", "value1"}}},
[]api.XRangeResponse{
{StreamId: streamId2.Value(), Entries: [][]string{{"field2", "value2"}}},
{StreamId: streamId1.Value(), Entries: [][]string{{"field1", "value1"}}},
},
xrevrangeResult,
)

Expand Down Expand Up @@ -7558,7 +7564,9 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId3.Value(): {{"field3", "value3"}}},
[]api.XRangeResponse{
{StreamId: streamId3.Value(), Entries: [][]string{{"field3", "value3"}}},
},
xrangeResult,
)

Expand All @@ -7572,7 +7580,9 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId3.Value(): {{"field3", "value3"}}},
[]api.XRangeResponse{
{StreamId: streamId3.Value(), Entries: [][]string{{"field3", "value3"}}},
},
xrevrangeResult,
)

Expand Down

0 comments on commit c6c4c45

Please sign in to comment.