From f6d9b0b396befdc4f771d30fcb7fe4116a1e0c34 Mon Sep 17 00:00:00 2001 From: Ruichen Bao Date: Thu, 16 Jan 2025 20:30:01 +0800 Subject: [PATCH] enhance: add Compact, CreateResourceGroup, DropResourceGroup, DescribeResourceGroup, ListResourceGroups and UpdateResourceGroup Signed-off-by: Ruichen Bao --- src/impl/MilvusClientImplV2.cpp | 182 ++++++++++++++++++ src/impl/MilvusClientImplV2.h | 18 ++ src/impl/MilvusConnection.cpp | 25 +++ src/impl/MilvusConnection.h | 15 ++ src/impl/types/NodeInfo.cpp | 16 ++ src/impl/types/ResourceGroupConfig.cpp | 52 +++++ src/impl/types/ResourceGroupDesc.cpp | 47 +++++ src/include/milvus/MilvusClientV2.h | 20 ++ src/include/milvus/types/NodeInfo.h | 17 ++ .../milvus/types/ResourceGroupConfig.h | 40 ++++ src/include/milvus/types/ResourceGroupDesc.h | 41 ++++ 11 files changed, 473 insertions(+) create mode 100644 src/impl/types/NodeInfo.cpp create mode 100644 src/impl/types/ResourceGroupConfig.cpp create mode 100644 src/impl/types/ResourceGroupDesc.cpp create mode 100644 src/include/milvus/types/NodeInfo.h create mode 100644 src/include/milvus/types/ResourceGroupConfig.h create mode 100644 src/include/milvus/types/ResourceGroupDesc.h diff --git a/src/impl/MilvusClientImplV2.cpp b/src/impl/MilvusClientImplV2.cpp index 6ea1f4f..acb5626 100644 --- a/src/impl/MilvusClientImplV2.cpp +++ b/src/impl/MilvusClientImplV2.cpp @@ -1580,6 +1580,165 @@ Status MilvusClientImplV2::RevokePrivilegeV2(const std::string& role_name, const pre, &MilvusConnection::OperatePrivilegeV2, GrpcOpts{timeout}); } +Status +MilvusClientImplV2::CreateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) { + auto pre = [&resource_group, &config]() { + proto::milvus::CreateResourceGroupRequest rpc_request; + rpc_request.set_resource_group(resource_group); + + auto* rg_config = rpc_request.mutable_config(); + rg_config->mutable_requests()->set_node_num(config.GetRequestsNodeNum()); + rg_config->mutable_limits()->set_node_num(config.GetLimitsNodeNum()); + + for (const auto& transfer : config.GetTransferFrom()) { + auto* transfer_from = rg_config->add_transfer_from(); + transfer_from->set_resource_group(transfer); + } + + for (const auto& transfer : config.GetTransferTo()) { + auto* transfer_to = rg_config->add_transfer_to(); + transfer_to->set_resource_group(transfer); + } + + auto* node_filter = rg_config->mutable_node_filter(); + for (const auto& label : config.GetNodeLabels()) { + auto* kv_pair = node_filter->add_node_labels(); + kv_pair->set_key(label.first); + kv_pair->set_value(label.second); + } + + return rpc_request; + }; + + return apiHandler( + pre, &MilvusConnection::CreateResourceGroup, GrpcOpts{timeout}); +} + +Status +MilvusClientImplV2::DropResourceGroup(const std::string& resource_group, int timeout) { + auto pre = [&resource_group]() { + proto::milvus::DropResourceGroupRequest rpc_request; + rpc_request.set_resource_group(resource_group); + return rpc_request; + }; + + return apiHandler( + pre, &MilvusConnection::DropResourceGroup, GrpcOpts{timeout}); +} + +Status +MilvusClientImplV2::DescribeResourceGroup(const std::string& resource_group, ResourceGroupDesc& resource_group_desc, int timeout) { + auto pre = [&resource_group]() { + proto::milvus::DescribeResourceGroupRequest rpc_request; + rpc_request.set_resource_group(resource_group); + return rpc_request; + }; + + auto post = [&resource_group_desc](const proto::milvus::DescribeResourceGroupResponse& response) { + if (response.status().code() != 0) { + return; + } + const auto& rg = response.resource_group(); + + ResourceGroupConfig config; + config.SetRequestsNodeNum(rg.config().requests().node_num()); + config.SetLimitsNodeNum(rg.config().limits().node_num()); + + std::vector transfer_from; + for (const auto& transfer : rg.config().transfer_from()) { + transfer_from.push_back(transfer.resource_group()); + } + config.SetTransferFrom(transfer_from); + + std::vector transfer_to; + for (const auto& transfer : rg.config().transfer_to()) { + transfer_to.push_back(transfer.resource_group()); + } + config.SetTransferTo(transfer_to); + + std::vector> node_labels; + for (const auto& label : rg.config().node_filter().node_labels()) { + node_labels.emplace_back(label.key(), label.value()); + } + config.SetNodeLabels(node_labels); + + std::vector nodes; + for (const auto& node : rg.nodes()) { + nodes.emplace_back(node.node_id(), node.address(), node.hostname()); + } + + resource_group_desc = ResourceGroupDesc( + rg.name(), + rg.capacity(), + rg.num_available_node(), + std::map(rg.num_loaded_replica().begin(), rg.num_loaded_replica().end()), + std::map(rg.num_outgoing_node().begin(), rg.num_outgoing_node().end()), + std::map(rg.num_incoming_node().begin(), rg.num_incoming_node().end()), + config, + nodes + ); + }; + + return apiHandler( + pre, &MilvusConnection::DescribeResourceGroup, post, GrpcOpts{timeout}); +} + +Status +MilvusClientImplV2::ListResourceGroups(std::vector& resource_groups, int timeout) { + auto pre = []() { + proto::milvus::ListResourceGroupsRequest rpc_request; + return rpc_request; + }; + + auto post = [&resource_groups](const proto::milvus::ListResourceGroupsResponse& response) { + resource_groups.clear(); + if (response.status().code() != 0) { + return; + } + for (const auto& group : response.resource_groups()) { + resource_groups.push_back(group); + } + }; + + return apiHandler( + pre, &MilvusConnection::ListResourceGroups, post, GrpcOpts{timeout}); +} + +Status +MilvusClientImplV2::UpdateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) { + auto pre = [&resource_group, &config]() { + proto::milvus::UpdateResourceGroupsRequest rpc_request; + + auto& config_map = *rpc_request.mutable_resource_groups(); + auto* rg_config = &config_map[resource_group]; + + rg_config->mutable_requests()->set_node_num(config.GetRequestsNodeNum()); + rg_config->mutable_limits()->set_node_num(config.GetLimitsNodeNum()); + + for (const auto& transfer : config.GetTransferFrom()) { + auto* transfer_from = rg_config->add_transfer_from(); + transfer_from->set_resource_group(transfer); + } + + for (const auto& transfer : config.GetTransferTo()) { + auto* transfer_to = rg_config->add_transfer_to(); + transfer_to->set_resource_group(transfer); + } + + auto* node_filter = rg_config->mutable_node_filter(); + for (const auto& label : config.GetNodeLabels()) { + auto* kv_pair = node_filter->add_node_labels(); + kv_pair->set_key(label.first); + kv_pair->set_value(label.second); + } + + return rpc_request; + }; + + return apiHandler( + pre, &MilvusConnection::UpdateResourceGroups, GrpcOpts{timeout}); +} + Status MilvusClientImplV2::CalcDistance(const CalcDistanceArguments& arguments, DistanceArray& results) { auto validate = [&arguments]() { return arguments.Validate(); }; @@ -1838,6 +1997,29 @@ MilvusClientImplV2::LoadBalance(int64_t src_node, const std::vector& ds nullptr); } +Status MilvusClientImplV2::Compact(const std::string& collection_name, int64_t& compaction_id, bool is_clustering, int timeout) { + CollectionDesc collection_desc; + auto status = DescribeCollection(collection_name, collection_desc); + if (!status.IsOk()) { + return status; + } + + auto pre = [&collection_desc, &collection_name, is_clustering]() { + proto::milvus::ManualCompactionRequest rpc_request; + rpc_request.set_collectionid(collection_desc.ID()); + rpc_request.set_collection_name(collection_name); + rpc_request.set_majorcompaction(is_clustering); + return rpc_request; + }; + + auto post = [&compaction_id](const proto::milvus::ManualCompactionResponse& response) { + compaction_id = response.compactionid(); + }; + + return apiHandler( + pre, &MilvusConnection::ManualCompaction, post, GrpcOpts{timeout}); +} + Status MilvusClientImplV2::GetCompactionState(int64_t compaction_id, CompactionState& compaction_state) { auto pre = [&compaction_id]() { diff --git a/src/impl/MilvusClientImplV2.h b/src/impl/MilvusClientImplV2.h index 65890c2..6b26ad9 100644 --- a/src/impl/MilvusClientImplV2.h +++ b/src/impl/MilvusClientImplV2.h @@ -255,6 +255,21 @@ class MilvusClientImplV2 : public MilvusClientV2 { Status RevokePrivilegeV2(const std::string& role_name, const std::string& privilege, const std::string& collection_name, const std::string& db_name, int timeout) final; + Status + CreateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) final; + + Status + DropResourceGroup(const std::string& resource_group, int timeout) final; + + Status + DescribeResourceGroup(const std::string& resource_group, ResourceGroupDesc& resource_group_desc, int timeout = 0) final; + + Status + ListResourceGroups(std::vector& resource_groups, int timeout) final; + + Status + UpdateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) final; + Status CalcDistance(const CalcDistanceArguments& arguments, DistanceArray& results) final; @@ -276,6 +291,9 @@ class MilvusClientImplV2 : public MilvusClientV2 { Status LoadBalance(int64_t src_node, const std::vector& dst_nodes, const std::vector& segments) final; + Status + Compact(const std::string& collection_name, int64_t& compaction_id, bool is_clustering, int timeout) final; + Status GetCompactionState(int64_t compaction_id, CompactionState& compaction_state) final; diff --git a/src/impl/MilvusConnection.cpp b/src/impl/MilvusConnection.cpp index 0dc43a3..391a66d 100644 --- a/src/impl/MilvusConnection.cpp +++ b/src/impl/MilvusConnection.cpp @@ -517,6 +517,31 @@ MilvusConnection::OperatePrivilegeV2(const proto::milvus::OperatePrivilegeV2Requ return grpcCall("OperatePrivilegeV2", &Stub::OperatePrivilegeV2, request, response, options); } +Status +MilvusConnection::CreateResourceGroup(const proto::milvus::CreateResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options) { + return grpcCall("CreateResourceGroup", &Stub::CreateResourceGroup, request, response, options); +} + +Status +MilvusConnection::DropResourceGroup(const proto::milvus::DropResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options) { + return grpcCall("DropResourceGroup", &Stub::DropResourceGroup, request, response, options); +} + +Status +MilvusConnection::DescribeResourceGroup(const proto::milvus::DescribeResourceGroupRequest& request, proto::milvus::DescribeResourceGroupResponse& response, const GrpcContextOptions& options) { + return grpcCall("DescribeResourceGroup", &Stub::DescribeResourceGroup, request, response, options); +} + +Status +MilvusConnection::ListResourceGroups(const proto::milvus::ListResourceGroupsRequest& request, proto::milvus::ListResourceGroupsResponse& response, const GrpcContextOptions& options) { + return grpcCall("ListResourceGroups", &Stub::ListResourceGroups, request, response, options); +} + +Status +MilvusConnection::UpdateResourceGroups(const proto::milvus::UpdateResourceGroupsRequest& request, proto::common::Status& response, const GrpcContextOptions& options) { + return grpcCall("UpdateResourceGroups", &Stub::UpdateResourceGroups, request, response, options); +} + void MilvusConnection::SetHeader(const std::string& key, const std::string& value) { headers_[key] = value; diff --git a/src/impl/MilvusConnection.h b/src/impl/MilvusConnection.h index 44e7b6b..dd720f5 100644 --- a/src/impl/MilvusConnection.h +++ b/src/impl/MilvusConnection.h @@ -323,6 +323,21 @@ class MilvusConnection { Status OperatePrivilegeV2(const proto::milvus::OperatePrivilegeV2Request& request, proto::common::Status& response, const GrpcContextOptions& options); + Status + CreateResourceGroup(const proto::milvus::CreateResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options); + + Status + DropResourceGroup(const proto::milvus::DropResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options); + + Status + DescribeResourceGroup(const proto::milvus::DescribeResourceGroupRequest& request, proto::milvus::DescribeResourceGroupResponse& response,const GrpcContextOptions& options); + + Status + ListResourceGroups(const proto::milvus::ListResourceGroupsRequest& request, proto::milvus::ListResourceGroupsResponse& response, const GrpcContextOptions& options); + + Status + UpdateResourceGroups(const proto::milvus::UpdateResourceGroupsRequest& request, proto::common::Status& response, const GrpcContextOptions& options); + void SetHeader(const std::string& key, const std::string& value); diff --git a/src/impl/types/NodeInfo.cpp b/src/impl/types/NodeInfo.cpp new file mode 100644 index 0000000..5f281ba --- /dev/null +++ b/src/impl/types/NodeInfo.cpp @@ -0,0 +1,16 @@ +#include "milvus/types/NodeInfo.h" + +NodeInfo::NodeInfo(int64_t id, const std::string& addr, const std::string& host) + : node_id(id), address(addr), hostname(host) {} + +int64_t NodeInfo::GetNodeId() const { + return node_id; +} + +const std::string& NodeInfo::GetAddress() const { + return address; +} + +const std::string& NodeInfo::GetHostname() const { + return hostname; +} diff --git a/src/impl/types/ResourceGroupConfig.cpp b/src/impl/types/ResourceGroupConfig.cpp new file mode 100644 index 0000000..dae9b2b --- /dev/null +++ b/src/impl/types/ResourceGroupConfig.cpp @@ -0,0 +1,52 @@ +#include "milvus/types/ResourceGroupConfig.h" + +namespace milvus { + +ResourceGroupConfig::ResourceGroupConfig(int req_node_num, int lim_node_num, + const std::vector& from, + const std::vector& to, + const std::vector>& labels) + : requests_node_num(req_node_num), limits_node_num(lim_node_num), + transfer_from(from), transfer_to(to), node_labels(labels) {} + +int ResourceGroupConfig::GetRequestsNodeNum() const { + return requests_node_num; +} + +void ResourceGroupConfig::SetRequestsNodeNum(int num) { + requests_node_num = num; +} + +int ResourceGroupConfig::GetLimitsNodeNum() const { + return limits_node_num; +} + +void ResourceGroupConfig::SetLimitsNodeNum(int num) { + limits_node_num = num; +} + +const std::vector& ResourceGroupConfig::GetTransferFrom() const { + return transfer_from; +} + +void ResourceGroupConfig::SetTransferFrom(const std::vector& from) { + transfer_from = from; +} + +const std::vector& ResourceGroupConfig::GetTransferTo() const { + return transfer_to; +} + +void ResourceGroupConfig::SetTransferTo(const std::vector& to) { + transfer_to = to; +} + +const std::vector>& ResourceGroupConfig::GetNodeLabels() const { + return node_labels; +} + +void ResourceGroupConfig::SetNodeLabels(const std::vector>& labels) { + node_labels = labels; +} + +} // namespace milvus diff --git a/src/impl/types/ResourceGroupDesc.cpp b/src/impl/types/ResourceGroupDesc.cpp new file mode 100644 index 0000000..515c1cd --- /dev/null +++ b/src/impl/types/ResourceGroupDesc.cpp @@ -0,0 +1,47 @@ +#include "milvus/types/ResourceGroupDesc.h" + +namespace milvus { + +ResourceGroupDesc::ResourceGroupDesc(const std::string& name, int32_t capacity, int32_t available_nodes, + const std::map& loaded_replicas, + const std::map& outgoing_nodes, + const std::map& incoming_nodes, + const ResourceGroupConfig& config, + const std::vector& nodes) + : name(name), capacity(capacity), num_available_node(available_nodes), + num_loaded_replica(loaded_replicas), num_outgoing_node(outgoing_nodes), + num_incoming_node(incoming_nodes), config(config), nodes(nodes) {} + +const std::string& ResourceGroupDesc::GetName() const { + return name; +} + +int32_t ResourceGroupDesc::GetCapacity() const { + return capacity; +} + +int32_t ResourceGroupDesc::GetNumAvailableNode() const { + return num_available_node; +} + +const std::map& ResourceGroupDesc::GetNumLoadedReplica() const { + return num_loaded_replica; +} + +const std::map& ResourceGroupDesc::GetNumOutgoingNode() const { + return num_outgoing_node; +} + +const std::map& ResourceGroupDesc::GetNumIncomingNode() const { + return num_incoming_node; +} + +const ResourceGroupConfig& ResourceGroupDesc::GetConfig() const { + return config; +} + +const std::vector& ResourceGroupDesc::GetNodes() const { + return nodes; +} + +} // namespace milvus diff --git a/src/include/milvus/MilvusClientV2.h b/src/include/milvus/MilvusClientV2.h index f4fd505..1671abb 100644 --- a/src/include/milvus/MilvusClientV2.h +++ b/src/include/milvus/MilvusClientV2.h @@ -44,6 +44,8 @@ #include "types/ProgressMonitor.h" #include "types/QueryArguments.h" #include "types/QueryResults.h" +#include "types/ResourceGroupConfig.h" +#include "types/ResourceGroupDesc.h" #include "types/RoleDesc.h" #include "types/SearchArguments.h" #include "types/SearchResults.h" @@ -538,6 +540,21 @@ class MilvusClientV2 { virtual Status RevokePrivilegeV2(const std::string& role_name, const std::string& privilege, const std::string& collection_name, const std::string& db_name = "", int timeout = 0) = 0; + virtual Status + CreateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout = 0) = 0; + + virtual Status + DropResourceGroup(const std::string& resource_group, int timeout = 0) = 0; + + virtual Status + DescribeResourceGroup(const std::string& resource_group, ResourceGroupDesc& resource_group_desc, int timeout = 0) = 0; + + virtual Status + ListResourceGroups(std::vector& resource_groups, int timeout = 0) = 0; + + virtual Status + UpdateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout = 0) = 0; + /** * Calculate distance between two vector arrays. * @@ -619,6 +636,9 @@ class MilvusClientV2 { virtual Status LoadBalance(int64_t src_node, const std::vector& dst_nodes, const std::vector& segments) = 0; + virtual Status + Compact(const std::string& collection_name, int64_t& compaction_id, bool is_clustering = false, int timeout = 0) = 0; + /** * Get compaction action state. * diff --git a/src/include/milvus/types/NodeInfo.h b/src/include/milvus/types/NodeInfo.h new file mode 100644 index 0000000..5d47f9c --- /dev/null +++ b/src/include/milvus/types/NodeInfo.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +class NodeInfo { +public: + NodeInfo(int64_t id, const std::string& addr, const std::string& host); + + int64_t GetNodeId() const; + const std::string& GetAddress() const; + const std::string& GetHostname() const; + +private: + int64_t node_id; + std::string address; + std::string hostname; +}; diff --git a/src/include/milvus/types/ResourceGroupConfig.h b/src/include/milvus/types/ResourceGroupConfig.h new file mode 100644 index 0000000..446205f --- /dev/null +++ b/src/include/milvus/types/ResourceGroupConfig.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include + +namespace milvus { + +class ResourceGroupConfig { +public: + ResourceGroupConfig() = default; + ResourceGroupConfig(int req_node_num, int lim_node_num, + const std::vector& from, + const std::vector& to, + const std::vector>& labels); + + int GetRequestsNodeNum() const; + void SetRequestsNodeNum(int num); + + int GetLimitsNodeNum() const; + void SetLimitsNodeNum(int num); + + const std::vector& GetTransferFrom() const; + void SetTransferFrom(const std::vector& from); + + const std::vector& GetTransferTo() const; + void SetTransferTo(const std::vector& to); + + const std::vector>& GetNodeLabels() const; + void SetNodeLabels(const std::vector>& labels); + +private: + int requests_node_num; + int limits_node_num; + std::vector transfer_from; + std::vector transfer_to; + std::vector> node_labels; +}; + +} // namespace milvus diff --git a/src/include/milvus/types/ResourceGroupDesc.h b/src/include/milvus/types/ResourceGroupDesc.h new file mode 100644 index 0000000..f947358 --- /dev/null +++ b/src/include/milvus/types/ResourceGroupDesc.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include +#include "NodeInfo.h" +#include "ResourceGroupConfig.h" + +namespace milvus { + +class ResourceGroupDesc { +public: + ResourceGroupDesc() = default; + ResourceGroupDesc(const std::string& name, int32_t capacity, int32_t available_nodes, + const std::map& loaded_replicas, + const std::map& outgoing_nodes, + const std::map& incoming_nodes, + const ResourceGroupConfig& config, + const std::vector& nodes); + + const std::string& GetName() const; + int32_t GetCapacity() const; + int32_t GetNumAvailableNode() const; + const std::map& GetNumLoadedReplica() const; + const std::map& GetNumOutgoingNode() const; + const std::map& GetNumIncomingNode() const; + const ResourceGroupConfig& GetConfig() const; + const std::vector& GetNodes() const; + +private: + std::string name; + int32_t capacity; + int32_t num_available_node; + std::map num_loaded_replica; + std::map num_outgoing_node; + std::map num_incoming_node; + ResourceGroupConfig config; + std::vector nodes; +}; + +} // namespace milvus \ No newline at end of file