diff --git a/go/api/base_client.go b/go/api/base_client.go index 632625ae67..1bb44162c0 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -6812,7 +6812,7 @@ func (client *baseClient) CopyWithOptions( // // Return value: // -// A `map` of key to stream entry data, where entry data is an array of +// An `array` of stream entry data, where entry data is an array of // pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive. // // Example: @@ -6823,7 +6823,7 @@ func (client *baseClient) CopyWithOptions( // options.NewInfiniteStreamBoundary(options.NegativeInfinity), // options.NewInfiniteStreamBoundary(options.PositiveInfinity), // ) -// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]] +// fmt.Println(res) // [{streamId [["field1", "entry1"], ["field2", "entry2"]]}] // // // Retrieve exactly one stream entry by id // res, err := client.XRange( @@ -6831,14 +6831,14 @@ func (client *baseClient) CopyWithOptions( // options.NewStreamBoundary(streamId, true), // options.NewStreamBoundary(streamId, true), // ) -// fmt.Println(res) // map[key:[["field1", "entry1"]] +// fmt.Println(res) // [{streamId [["field1", "entry1"]]}] // // [valkey.io]: https://valkey.io/commands/xrange/ func (client *baseClient) XRange( key string, start options.StreamBoundary, end options.StreamBoundary, -) (map[string][][]string, error) { +) ([]XRangeResponse, error) { return client.XRangeWithOptions(key, start, end, nil) } @@ -6859,7 +6859,7 @@ func (client *baseClient) XRange( // // Return value: // -// A `map` of key to stream entry data, where entry data is an array of +// An `array` of stream entry data, where entry data is an array of // pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive. // // Example: @@ -6871,7 +6871,7 @@ func (client *baseClient) XRange( // options.NewInfiniteStreamBoundary(options.PositiveInfinity), // options.NewStreamRangeOptions().SetCount(10), // ) -// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]] +// fmt.Println(res) // [{streamId [["field1", "entry1"], ["field2", "entry2"]]}] // // // Retrieve exactly one stream entry by id // res, err := client.XRangeWithOptions( @@ -6880,7 +6880,7 @@ func (client *baseClient) XRange( // options.NewStreamBoundary(streamId, true), // options.NewStreamRangeOptions().SetCount(1), // ) -// fmt.Println(res) // map[key:[["field1", "entry1"]] +// fmt.Println(res) // [{streamId [["field1", "entry1"]]}] // // [valkey.io]: https://valkey.io/commands/xrange/ func (client *baseClient) XRangeWithOptions( @@ -6888,7 +6888,7 @@ func (client *baseClient) XRangeWithOptions( start options.StreamBoundary, end options.StreamBoundary, opts *options.StreamRangeOptions, -) (map[string][][]string, error) { +) ([]XRangeResponse, error) { args := []string{key, string(start), string(end)} if opts != nil { optionArgs, err := opts.ToArgs() @@ -6901,7 +6901,7 @@ func (client *baseClient) XRangeWithOptions( if err != nil { return nil, err } - return handleMapOfArrayOfStringArrayOrNilResponse(result) + return handleXRangeResponse(result) } // Returns stream entries matching a given range of IDs in reverse order. @@ -6921,7 +6921,7 @@ func (client *baseClient) XRangeWithOptions( // // Return value: // -// A `map` of key to stream entry data, where entry data is an array of +// An `array` of stream entry data, where entry data is an array of // pairings with format `[[field, entry], [field, entry], ...]`. // // Example: @@ -6932,14 +6932,14 @@ func (client *baseClient) XRangeWithOptions( // options.NewInfiniteStreamBoundary(options.PositiveInfinity), // options.NewInfiniteStreamBoundary(options.NegativeInfinity), // ) -// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]] +// fmt.Println(res) // [{streamID ["field2", "entry2"], ["field1", "entry1"]]}] // // [valkey.io]: https://valkey.io/commands/xrevrange/ func (client *baseClient) XRevRange( key string, start options.StreamBoundary, end options.StreamBoundary, -) (map[string][][]string, error) { +) ([]XRangeResponse, error) { return client.XRevRangeWithOptions(key, start, end, nil) } @@ -6961,7 +6961,7 @@ func (client *baseClient) XRevRange( // // Return value: // -// A `map` of key to stream entry data, where entry data is an array of +// An `array` of stream entry data, where entry data is an array of // pairings with format `[[field, entry], [field, entry], ...]`. // Returns `nil` if `count` is non-positive. // @@ -6974,7 +6974,7 @@ func (client *baseClient) XRevRange( // options.NewInfiniteStreamBoundary(options.NegativeInfinity), // options.NewStreamRangeOptions().SetCount(10), // ) -// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]] +// fmt.Println(res) // [{streamID [["field2", "entry2"], ["field1", "entry1"]]}] // // [valkey.io]: https://valkey.io/commands/xrevrange/ func (client *baseClient) XRevRangeWithOptions( @@ -6982,7 +6982,7 @@ func (client *baseClient) XRevRangeWithOptions( start options.StreamBoundary, end options.StreamBoundary, opts *options.StreamRangeOptions, -) (map[string][][]string, error) { +) ([]XRangeResponse, error) { args := []string{key, string(start), string(end)} if opts != nil { optionArgs, err := opts.ToArgs() @@ -6995,7 +6995,7 @@ func (client *baseClient) XRevRangeWithOptions( if err != nil { return nil, err } - return handleMapOfArrayOfStringArrayOrNilResponse(result) + return handleXRevRangeResponse(result) } // Returns information about the stream stored at `key`. diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index c72ce1659f..1475e15b2a 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -9,6 +9,7 @@ import "C" import ( "fmt" "reflect" + "sort" "strconv" "unsafe" @@ -833,12 +834,94 @@ func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) ( return claimedEntries, nil } -func handleMapOfArrayOfStringArrayOrNilResponse(response *C.struct_CommandResponse) (map[string][][]string, error) { +func handleXRangeResponse(response *C.struct_CommandResponse) ([]XRangeResponse, error) { + defer C.free_command_response(response) + + if response.response_type == uint32(C.Null) { + return nil, nil + } + + typeErr := checkResponseType(response, C.Map, false) + if typeErr != nil { + return nil, typeErr + } + mapData, err := parseMap(response) + if err != nil { + return nil, err + } + converted, err := mapConverter[[][]string]{ + arrayConverter[[]string]{ + arrayConverter[string]{ + nil, + false, + }, + false, + }, + false, + }.convert(mapData) + if err != nil { + return nil, err + } + claimedEntries, ok := converted.(map[string][][]string) + if !ok { + return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type of second element: %T", converted)} + } + + xRangeResponseArray := make([]XRangeResponse, 0, len(claimedEntries)) + + for k, v := range claimedEntries { + xRangeResponseArray = append(xRangeResponseArray, XRangeResponse{k, v}) + } + + sort.Slice(xRangeResponseArray, func(i, j int) bool { + return xRangeResponseArray[i].StreamId < xRangeResponseArray[j].StreamId + }) + return xRangeResponseArray, nil +} + +func handleXRevRangeResponse(response *C.struct_CommandResponse) ([]XRangeResponse, error) { + defer C.free_command_response(response) + if response.response_type == uint32(C.Null) { return nil, nil } - return handleMapOfArrayOfStringArrayResponse(response) + typeErr := checkResponseType(response, C.Map, false) + if typeErr != nil { + return nil, typeErr + } + mapData, err := parseMap(response) + if err != nil { + return nil, err + } + converted, err := mapConverter[[][]string]{ + arrayConverter[[]string]{ + arrayConverter[string]{ + nil, + false, + }, + false, + }, + false, + }.convert(mapData) + if err != nil { + return nil, err + } + claimedEntries, ok := converted.(map[string][][]string) + if !ok { + return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type of second element: %T", converted)} + } + + xRangeResponseArray := make([]XRangeResponse, 0, len(claimedEntries)) + + for k, v := range claimedEntries { + xRangeResponseArray = append(xRangeResponseArray, XRangeResponse{k, v}) + } + + sort.Slice(xRangeResponseArray, func(i, j int) bool { + return xRangeResponseArray[i].StreamId > xRangeResponseArray[j].StreamId + }) + return xRangeResponseArray, nil } func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) { diff --git a/go/api/response_types.go b/go/api/response_types.go index db548c402f..9d08ba6c53 100644 --- a/go/api/response_types.go +++ b/go/api/response_types.go @@ -35,6 +35,12 @@ type MemberAndScore struct { Score float64 } +// Response type of [XRange] and [XRevRange] commands. +type XRangeResponse struct { + StreamId string + Entries [][]string +} + // Response type of [XAutoClaim] command. type XAutoClaimResponse struct { NextEntry string diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 1e74281ffb..fbf3ca47e9 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -113,21 +113,21 @@ type StreamCommands interface { XInfoStreamFullWithOptions(key string, options *options.XInfoStreamOptions) (map[string]any, error) - XRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error) + XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error) XRangeWithOptions( key string, start options.StreamBoundary, end options.StreamBoundary, options *options.StreamRangeOptions, - ) (map[string][][]string, error) + ) ([]XRangeResponse, error) - XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error) + XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error) XRevRangeWithOptions( key string, start options.StreamBoundary, end options.StreamBoundary, options *options.StreamRangeOptions, - ) (map[string][][]string, error) + ) ([]XRangeResponse, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index bda0018aba..927ec7478a 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -7506,7 +7506,10 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() { assert.NoError(suite.T(), err) assert.Equal( suite.T(), - map[string][][]string{streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}}, + []api.XRangeResponse{ + {StreamId: streamId1.Value(), Entries: [][]string{{"field1", "value1"}}}, + {StreamId: streamId2.Value(), Entries: [][]string{{"field2", "value2"}}}, + }, xrangeResult, ) @@ -7519,7 +7522,10 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() { assert.NoError(suite.T(), err) assert.Equal( suite.T(), - map[string][][]string{streamId2.Value(): {{"field2", "value2"}}, streamId1.Value(): {{"field1", "value1"}}}, + []api.XRangeResponse{ + {StreamId: streamId2.Value(), Entries: [][]string{{"field2", "value2"}}}, + {StreamId: streamId1.Value(), Entries: [][]string{{"field1", "value1"}}}, + }, xrevrangeResult, ) @@ -7558,7 +7564,9 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() { assert.NoError(suite.T(), err) assert.Equal( suite.T(), - map[string][][]string{streamId3.Value(): {{"field3", "value3"}}}, + []api.XRangeResponse{ + {StreamId: streamId3.Value(), Entries: [][]string{{"field3", "value3"}}}, + }, xrangeResult, ) @@ -7572,7 +7580,9 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() { assert.NoError(suite.T(), err) assert.Equal( suite.T(), - map[string][][]string{streamId3.Value(): {{"field3", "value3"}}}, + []api.XRangeResponse{ + {StreamId: streamId3.Value(), Entries: [][]string{{"field3", "value3"}}}, + }, xrevrangeResult, )