Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: XINFO CONSUMERS. #3120

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7108,6 +7108,43 @@ func (client *baseClient) XInfoStream(key string) (map[string]interface{}, error
return handleStringToAnyMapResponse(result)
}

// Returns the list of all consumers and their attributes for the given consumer group of the
// stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
//
// Return value:
//
// An array of [api.XInfoConsumerInfo], where each element contains the attributes
// of a consumer for the given consumer group of the stream at `key`.
//
// Example:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to move example stream_commands_test.go

//
// info, err := client.XInfoConsumers(key, group)
// info:
// // []XInfoConsumerInfo {
// // XInfoConsumerInfo {
// // Name: "Alice",
// // Pending: 2,
// // Idle: 9104628,
// // Inactive: {18104698 false}, // Added in version 7.2.0
// // },
// // }
//
// [valkey.io]: https://valkey.io/commands/xinfo-consumers/
func (client *baseClient) XInfoConsumers(key string, group string) ([]XInfoConsumerInfo, error) {
response, err := client.executeCommand(C.XInfoConsumers, []string{key, group})
if err != nil {
return nil, err
}
return handleXInfoConsumersResponse(response)
}

// Returns detailed information about the stream stored at `key`.
//
// See [valkey.io] for details.
Expand Down
43 changes: 43 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,49 @@ func handleXPendingDetailResponse(response *C.struct_CommandResponse) ([]XPendin
return pendingDetails, nil
}

func handleXInfoConsumersResponse(response *C.struct_CommandResponse) ([]XInfoConsumerInfo, error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Array, false)
if typeErr != nil {
return nil, typeErr
}
arrData, err := parseArray(response)
if err != nil {
return nil, err
}
converted, err := arrayConverter[map[string]interface{}]{
nil,
false,
}.convert(arrData)
if err != nil {
return nil, err
}
arr, ok := converted.([]map[string]interface{})
if !ok {
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type: %T", converted)}
}

result := make([]XInfoConsumerInfo, 0, len(arr))

for _, group := range arr {
info := XInfoConsumerInfo{
Name: group["name"].(string),
Pending: group["pending"].(int64),
Idle: group["idle"].(int64),
}
switch inactive := group["inactive"].(type) {
case int64:
info.Inactive = CreateInt64Result(inactive)
default:
info.Inactive = CreateNilInt64Result()
}
result = append(result, info)
}

return result, nil
}

func handleStringToAnyMapResponse(response *C.struct_CommandResponse) (map[string]interface{}, error) {
defer C.free_command_response(response)

Expand Down
15 changes: 15 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,18 @@ type XPendingDetail struct {
func CreateNilXPendingSummary() XPendingSummary {
return XPendingSummary{0, CreateNilStringResult(), CreateNilStringResult(), make([]ConsumerPendingMessage, 0)}
}

// XInfoConsumerInfo represents a group information returned by `XInfoConsumers` command.
type XInfoConsumerInfo struct {
// The consumer's name.
Name string
// The number of entries in the PEL: pending messages for the consumer, which are messages that were delivered but are yet
// to be acknowledged.
Pending int64
// The number of milliseconds that have passed since the consumer's last attempted interaction (Examples: XREADGROUP,
// XCLAIM, XAUTOCLAIM).
Idle int64
// The number of milliseconds that have passed since the consumer's last successful interaction (Examples: XREADGROUP that
// actually read some entries into the PEL, XCLAIM/XAUTOCLAIM that actually claimed some entries).
Inactive Result[int64]
}
2 changes: 2 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type StreamCommands interface {

XInfoStreamFullWithOptions(key string, options *options.XInfoStreamOptions) (map[string]any, error)

XInfoConsumers(key string, group string) ([]XInfoConsumerInfo, error)

XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

XRangeWithOptions(
Expand Down
100 changes: 100 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7098,6 +7098,106 @@ func (suite *GlideTestSuite) TestXInfoStream() {
})
}

func (suite *GlideTestSuite) TestXInfoConsumers() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
group := uuid.NewString()
consumer1 := uuid.NewString()
consumer2 := uuid.NewString()

xadd, err := client.XAddWithOptions(
key,
[][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
options.NewXAddOptions().SetId("0-1"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-1", xadd.Value())
xadd, err = client.XAddWithOptions(
key,
[][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
options.NewXAddOptions().SetId("0-2"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-2", xadd.Value())
xadd, err = client.XAddWithOptions(key, [][]string{{"e3_f1", "e3_v1"}}, options.NewXAddOptions().SetId("0-3"))
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-3", xadd.Value())

suite.verifyOK(client.XGroupCreate(key, group, "0-0"))

xReadGroup, err := client.XReadGroupWithOptions(
group,
consumer1,
map[string]string{key: ">"},
options.NewXReadGroupOptions().SetCount(1),
)
assert.NoError(suite.T(), err)
expectedResult := map[string]map[string][][]string{
key: {
"0-1": {{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
},
}
assert.Equal(suite.T(), expectedResult, xReadGroup)

// Sleep to ensure the idle time value and inactive time value returned by xinfo_consumers is > 0
time.Sleep(2000 * time.Millisecond)
info, err := client.XInfoConsumers(key, group)
assert.NoError(suite.T(), err)
assert.Len(suite.T(), info, 1)
assert.Equal(suite.T(), consumer1, info[0].Name)
assert.Equal(suite.T(), int64(1), info[0].Pending)
assert.Greater(suite.T(), info[0].Idle, int64(0))
if suite.serverVersion > "7.2.0" {
assert.False(suite.T(), info[0].Inactive.IsNil())
assert.Greater(suite.T(), info[0].Inactive.Value(), int64(0))
} else {
assert.True(suite.T(), info[0].Inactive.IsNil())
}

respBool, err := client.XGroupCreateConsumer(key, group, consumer2)
assert.NoError(suite.T(), err)
assert.True(suite.T(), respBool)

xReadGroup, err = client.XReadGroup(group, consumer2, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
expectedResult = map[string]map[string][][]string{
key: {
"0-2": {{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
"0-3": {{"e3_f1", "e3_v1"}},
},
}
assert.Equal(suite.T(), expectedResult, xReadGroup)

// Verify that xinfo_consumers contains info for 2 consumers now
info, err = client.XInfoConsumers(key, group)
assert.NoError(suite.T(), err)
assert.Len(suite.T(), info, 2)

// Passing a non-existing key raises an error
key = uuid.NewString()
_, err = client.XInfoConsumers(key, "_")
assert.IsType(suite.T(), &errors.RequestError{}, err)

// key exists, but it is not a stream
suite.verifyOK(client.Set(key, key))
_, err = client.XInfoConsumers(key, "_")
assert.IsType(suite.T(), &errors.RequestError{}, err)

// Passing a non-existing group raises an error
key = uuid.NewString()
_, err = client.XAdd(key, [][]string{{"a", "b"}})
assert.NoError(suite.T(), err)
_, err = client.XInfoConsumers(key, "_")
assert.IsType(suite.T(), &errors.RequestError{}, err)

// no consumers yet
suite.verifyOK(client.XGroupCreate(key, group, "0-0"))
info, err = client.XInfoConsumers(key, group)
assert.NoError(suite.T(), err)
assert.Empty(suite.T(), info)
})
}

func (suite *GlideTestSuite) TestSetBit_SetSingleBit() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
Expand Down
Loading