diff --git a/go/api/base_client.go b/go/api/base_client.go index cb289acbdd..9dd061a997 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -7108,6 +7108,43 @@ func (client *baseClient) XInfoStream(key string) (map[string]interface{}, error return handleStringToAnyMapResponse(result) } +// Returns the list of all consumers and their attributes for the given consumer group of the +// stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// +// Return value: +// +// An array of [api.XInfoConsumerInfo], where each element contains the attributes +// of a consumer for the given consumer group of the stream at `key`. +// +// Example: +// +// info, err := client.XInfoConsumers(key, group) +// info: +// // []XInfoConsumerInfo { +// // XInfoConsumerInfo { +// // Name: "Alice", +// // Pending: 2, +// // Idle: 9104628, +// // Inactive: {18104698 false}, // Added in version 7.2.0 +// // }, +// // } +// +// [valkey.io]: https://valkey.io/commands/xinfo-consumers/ +func (client *baseClient) XInfoConsumers(key string, group string) ([]XInfoConsumerInfo, error) { + response, err := client.executeCommand(C.XInfoConsumers, []string{key, group}) + if err != nil { + return nil, err + } + return handleXInfoConsumersResponse(response) +} + // Returns detailed information about the stream stored at `key`. // // See [valkey.io] for details. diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index e1030e0b7d..f3c77d880e 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -1185,6 +1185,49 @@ func handleXPendingDetailResponse(response *C.struct_CommandResponse) ([]XPendin return pendingDetails, nil } +func handleXInfoConsumersResponse(response *C.struct_CommandResponse) ([]XInfoConsumerInfo, error) { + defer C.free_command_response(response) + + typeErr := checkResponseType(response, C.Array, false) + if typeErr != nil { + return nil, typeErr + } + arrData, err := parseArray(response) + if err != nil { + return nil, err + } + converted, err := arrayConverter[map[string]interface{}]{ + nil, + false, + }.convert(arrData) + if err != nil { + return nil, err + } + arr, ok := converted.([]map[string]interface{}) + if !ok { + return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type: %T", converted)} + } + + result := make([]XInfoConsumerInfo, 0, len(arr)) + + for _, group := range arr { + info := XInfoConsumerInfo{ + Name: group["name"].(string), + Pending: group["pending"].(int64), + Idle: group["idle"].(int64), + } + switch inactive := group["inactive"].(type) { + case int64: + info.Inactive = CreateInt64Result(inactive) + default: + info.Inactive = CreateNilInt64Result() + } + result = append(result, info) + } + + return result, nil +} + func handleStringToAnyMapResponse(response *C.struct_CommandResponse) (map[string]interface{}, error) { defer C.free_command_response(response) diff --git a/go/api/response_types.go b/go/api/response_types.go index 9d08ba6c53..9651fcd067 100644 --- a/go/api/response_types.go +++ b/go/api/response_types.go @@ -243,3 +243,18 @@ type XPendingDetail struct { func CreateNilXPendingSummary() XPendingSummary { return XPendingSummary{0, CreateNilStringResult(), CreateNilStringResult(), make([]ConsumerPendingMessage, 0)} } + +// XInfoConsumerInfo represents a group information returned by `XInfoConsumers` command. +type XInfoConsumerInfo struct { + // The consumer's name. + Name string + // The number of entries in the PEL: pending messages for the consumer, which are messages that were delivered but are yet + // to be acknowledged. + Pending int64 + // The number of milliseconds that have passed since the consumer's last attempted interaction (Examples: XREADGROUP, + // XCLAIM, XAUTOCLAIM). + Idle int64 + // The number of milliseconds that have passed since the consumer's last successful interaction (Examples: XREADGROUP that + // actually read some entries into the PEL, XCLAIM/XAUTOCLAIM that actually claimed some entries). + Inactive Result[int64] +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 6a312a0186..d2ae4829b4 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -113,6 +113,8 @@ type StreamCommands interface { XInfoStreamFullWithOptions(key string, options *options.XInfoStreamOptions) (map[string]any, error) + XInfoConsumers(key string, group string) ([]XInfoConsumerInfo, error) + XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error) XRangeWithOptions( diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 1c837961ab..59e9f8c845 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -7098,6 +7098,106 @@ func (suite *GlideTestSuite) TestXInfoStream() { }) } +func (suite *GlideTestSuite) TestXInfoConsumers() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + group := uuid.NewString() + consumer1 := uuid.NewString() + consumer2 := uuid.NewString() + + xadd, err := client.XAddWithOptions( + key, + [][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, + options.NewXAddOptions().SetId("0-1"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-1", xadd.Value()) + xadd, err = client.XAddWithOptions( + key, + [][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, + options.NewXAddOptions().SetId("0-2"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-2", xadd.Value()) + xadd, err = client.XAddWithOptions(key, [][]string{{"e3_f1", "e3_v1"}}, options.NewXAddOptions().SetId("0-3")) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-3", xadd.Value()) + + suite.verifyOK(client.XGroupCreate(key, group, "0-0")) + + xReadGroup, err := client.XReadGroupWithOptions( + group, + consumer1, + map[string]string{key: ">"}, + options.NewXReadGroupOptions().SetCount(1), + ) + assert.NoError(suite.T(), err) + expectedResult := map[string]map[string][][]string{ + key: { + "0-1": {{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, + }, + } + assert.Equal(suite.T(), expectedResult, xReadGroup) + + // Sleep to ensure the idle time value and inactive time value returned by xinfo_consumers is > 0 + time.Sleep(2000 * time.Millisecond) + info, err := client.XInfoConsumers(key, group) + assert.NoError(suite.T(), err) + assert.Len(suite.T(), info, 1) + assert.Equal(suite.T(), consumer1, info[0].Name) + assert.Equal(suite.T(), int64(1), info[0].Pending) + assert.Greater(suite.T(), info[0].Idle, int64(0)) + if suite.serverVersion > "7.2.0" { + assert.False(suite.T(), info[0].Inactive.IsNil()) + assert.Greater(suite.T(), info[0].Inactive.Value(), int64(0)) + } else { + assert.True(suite.T(), info[0].Inactive.IsNil()) + } + + respBool, err := client.XGroupCreateConsumer(key, group, consumer2) + assert.NoError(suite.T(), err) + assert.True(suite.T(), respBool) + + xReadGroup, err = client.XReadGroup(group, consumer2, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + expectedResult = map[string]map[string][][]string{ + key: { + "0-2": {{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, + "0-3": {{"e3_f1", "e3_v1"}}, + }, + } + assert.Equal(suite.T(), expectedResult, xReadGroup) + + // Verify that xinfo_consumers contains info for 2 consumers now + info, err = client.XInfoConsumers(key, group) + assert.NoError(suite.T(), err) + assert.Len(suite.T(), info, 2) + + // Passing a non-existing key raises an error + key = uuid.NewString() + _, err = client.XInfoConsumers(key, "_") + assert.IsType(suite.T(), &errors.RequestError{}, err) + + // key exists, but it is not a stream + suite.verifyOK(client.Set(key, key)) + _, err = client.XInfoConsumers(key, "_") + assert.IsType(suite.T(), &errors.RequestError{}, err) + + // Passing a non-existing group raises an error + key = uuid.NewString() + _, err = client.XAdd(key, [][]string{{"a", "b"}}) + assert.NoError(suite.T(), err) + _, err = client.XInfoConsumers(key, "_") + assert.IsType(suite.T(), &errors.RequestError{}, err) + + // no consumers yet + suite.verifyOK(client.XGroupCreate(key, group, "0-0")) + info, err = client.XInfoConsumers(key, group) + assert.NoError(suite.T(), err) + assert.Empty(suite.T(), info) + }) +} + func (suite *GlideTestSuite) TestSetBit_SetSingleBit() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String()