Skip to content

Commit

Permalink
feat: support managing cluster resources in a namespaced mode (argopr…
Browse files Browse the repository at this point in the history
…oj#6581)

Signed-off-by: Alexander Matyushentsev <[email protected]>
  • Loading branch information
Alexander Matyushentsev authored Jul 19, 2021
1 parent ef744e3 commit 58ac345
Show file tree
Hide file tree
Showing 20 changed files with 869 additions and 434 deletions.
4 changes: 4 additions & 0 deletions assets/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -5106,6 +5106,10 @@
"type": "object",
"title": "Cluster is the definition of a cluster resource",
"properties": {
"clusterResources": {
"description": "Indicates if cluster level resources should be managed. This setting is used only if cluster is connected in a namespaced mode.",
"type": "boolean"
},
"config": {
"$ref": "#/definitions/v1alpha1ClusterConfig"
},
Expand Down
261 changes: 253 additions & 8 deletions cmd/argocd-util/commands/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"math"
"os"
"sort"
"strings"
"text/tabwriter"
"time"

Expand All @@ -22,12 +24,15 @@ import (
"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/controller/sharding"
argoappv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
"github.com/argoproj/argo-cd/v2/util/argo"
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/clusterauth"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/errors"
"github.com/argoproj/argo-cd/v2/util/glob"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
"github.com/argoproj/argo-cd/v2/util/settings"
)
Expand All @@ -45,16 +50,23 @@ func NewClusterCommand(pathOpts *clientcmd.PathOptions) *cobra.Command {
command.AddCommand(NewGenClusterConfigCommand(pathOpts))
command.AddCommand(NewClusterStatsCommand())
command.AddCommand(NewClusterShardsCommand())
namespacesCommand := NewClusterNamespacesCommand()
namespacesCommand.AddCommand(NewClusterEnableNamespacedMode())
namespacesCommand.AddCommand(NewClusterDisableNamespacedMode())
command.AddCommand(namespacesCommand)

return command
}

type ClusterWithInfo struct {
argoappv1.Cluster
// Shard holds controller shard number that handles the cluster
Shard int
// Namespaces holds list of namespaces managed by Argo CD in the cluster
Namespaces []string
}

func loadClusters(kubeClient *kubernetes.Clientset, replicas int, namespace string, portForwardRedis bool, cacheSrc func() (*appstatecache.Cache, error), shard int) ([]ClusterWithInfo, error) {
func loadClusters(kubeClient *kubernetes.Clientset, appClient *versioned.Clientset, replicas int, namespace string, portForwardRedis bool, cacheSrc func() (*appstatecache.Cache, error), shard int) ([]ClusterWithInfo, error) {
settingsMgr := settings.NewSettingsManager(context.Background(), kubeClient, namespace)

argoDB := db.NewDB(namespace, settingsMgr, kubeClient)
Expand All @@ -78,6 +90,18 @@ func loadClusters(kubeClient *kubernetes.Clientset, replicas int, namespace stri
}
}

appItems, err := appClient.ArgoprojV1alpha1().Applications(namespace).List(context.Background(), v1.ListOptions{})
if err != nil {
return nil, err
}
apps := appItems.Items
for i, app := range apps {
err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, argoDB)
if err != nil {
return nil, err
}
apps[i] = app
}
clusters := make([]ClusterWithInfo, len(clustersList.Items))
batchSize := 10
batchesCount := int(math.Ceil(float64(len(clusters)) / float64(batchSize)))
Expand All @@ -98,9 +122,18 @@ func loadClusters(kubeClient *kubernetes.Clientset, replicas int, namespace stri
if shard != -1 && clusterShard != shard {
return nil
}

nsSet := map[string]bool{}
for _, app := range apps {
if app.Spec.Destination.Server == cluster.Server {
nsSet[app.Spec.Destination.Namespace] = true
}
}
var namespaces []string
for ns := range nsSet {
namespaces = append(namespaces, ns)
}
_ = cache.GetClusterInfo(cluster.Server, &cluster.Info)
clusters[batchStart+i] = ClusterWithInfo{cluster, clusterShard}
clusters[batchStart+i] = ClusterWithInfo{cluster, clusterShard, namespaces}
return nil
})
}
Expand Down Expand Up @@ -135,6 +168,7 @@ func NewClusterShardsCommand() *cobra.Command {
namespace, _, err := clientConfig.Namespace()
errors.CheckError(err)
kubeClient := kubernetes.NewForConfigOrDie(clientCfg)
appClient := versioned.NewForConfigOrDie(clientCfg)

if replicas == 0 {
replicas, err = getControllerReplicas(kubeClient, namespace)
Expand All @@ -144,7 +178,7 @@ func NewClusterShardsCommand() *cobra.Command {
return
}

clusters, err := loadClusters(kubeClient, replicas, namespace, portForwardRedis, cacheSrc, shard)
clusters, err := loadClusters(kubeClient, appClient, replicas, namespace, portForwardRedis, cacheSrc, shard)
errors.CheckError(err)
if len(clusters) == 0 {
return
Expand Down Expand Up @@ -180,6 +214,216 @@ func printStatsSummary(clusters []ClusterWithInfo) {
_ = w.Flush()
}

func runClusterNamespacesCommand(clientConfig clientcmd.ClientConfig, action func(appClient *versioned.Clientset, argoDB db.ArgoDB, clusters map[string][]string) error) error {
clientCfg, err := clientConfig.ClientConfig()
if err != nil {
return err
}
namespace, _, err := clientConfig.Namespace()
if err != nil {
return err
}

kubeClient := kubernetes.NewForConfigOrDie(clientCfg)
appClient := versioned.NewForConfigOrDie(clientCfg)

settingsMgr := settings.NewSettingsManager(context.Background(), kubeClient, namespace)
argoDB := db.NewDB(namespace, settingsMgr, kubeClient)
clustersList, err := argoDB.ListClusters(context.Background())
if err != nil {
return err
}
appItems, err := appClient.ArgoprojV1alpha1().Applications(namespace).List(context.Background(), v1.ListOptions{})
if err != nil {
return err
}
apps := appItems.Items
for i, app := range apps {
err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, argoDB)
if err != nil {
return err
}
apps[i] = app
}

clusters := map[string][]string{}
for _, cluster := range clustersList.Items {
nsSet := map[string]bool{}
for _, app := range apps {
if app.Spec.Destination.Server != cluster.Server {
continue
}
// Use namespaces of actually deployed resources, since some application use dummy target namespace
// If resources list is empty then use target namespace
if len(app.Status.Resources) != 0 {
for _, res := range app.Status.Resources {
if res.Namespace != "" {
nsSet[res.Namespace] = true
}
}
} else {
if app.Spec.Destination.Server == cluster.Server {
nsSet[app.Spec.Destination.Namespace] = true
}
}
}
var namespaces []string
for ns := range nsSet {
namespaces = append(namespaces, ns)
}
clusters[cluster.Server] = namespaces
}
return action(appClient, argoDB, clusters)
}

func NewClusterNamespacesCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
)
var command = cobra.Command{
Use: "namespaces",
Short: "Print information namespaces which Argo CD manages in each cluster.",
Run: func(cmd *cobra.Command, args []string) {
log.SetLevel(log.WarnLevel)

err := runClusterNamespacesCommand(clientConfig, func(appClient *versioned.Clientset, _ db.ArgoDB, clusters map[string][]string) error {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
_, _ = fmt.Fprintf(w, "CLUSTER\tNAMESPACES\n")

for cluster, namespaces := range clusters {
// print shortest namespace names first
sort.Slice(namespaces, func(i, j int) bool {
return len(namespaces[j]) > len(namespaces[i])
})
namespacesStr := ""
if len(namespaces) > 4 {
namespacesStr = fmt.Sprintf("%s (total %d)", strings.Join(namespaces[:4], ","), len(namespaces))
} else {
namespacesStr = strings.Join(namespaces, ",")
}

_, _ = fmt.Fprintf(w, "%s\t%s\n", cluster, namespacesStr)
}
_ = w.Flush()
return nil
})
errors.CheckError(err)
},
}
clientConfig = cli.AddKubectlFlagsToCmd(&command)
return &command
}

func NewClusterEnableNamespacedMode() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
dryRun bool
clusterResources bool
namespacesCount int
)
var command = cobra.Command{
Use: "enable-namespaced-mode PATTERN",
Short: "Enable namespaced mode for clusters which name matches to the specified pattern.",
Run: func(cmd *cobra.Command, args []string) {
log.SetLevel(log.WarnLevel)

if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
pattern := args[0]

errors.CheckError(runClusterNamespacesCommand(clientConfig, func(_ *versioned.Clientset, argoDB db.ArgoDB, clusters map[string][]string) error {
for server, namespaces := range clusters {
if len(namespaces) == 0 || len(namespaces) > namespacesCount || !glob.Match(pattern, server) {
continue
}

cluster, err := argoDB.GetCluster(context.Background(), server)
if err != nil {
return err
}
cluster.Namespaces = namespaces
cluster.ClusterResources = clusterResources
fmt.Printf("Setting cluster %s namespaces to %v...", server, namespaces)
if !dryRun {
_, err = argoDB.UpdateCluster(context.Background(), cluster)
if err != nil {
return err
}
fmt.Println("done")
} else {
fmt.Println("done (dry run)")
}

}
return nil
}))
},
}
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().BoolVar(&dryRun, "dry-run", true, "Print what will be performed")
command.Flags().BoolVar(&clusterResources, "cluster-resources", false, "Indicates if cluster level resources should be managed.")
command.Flags().IntVar(&namespacesCount, "max-namespace-count", 0, "Max number of namespaces that cluster should managed managed namespaces is less or equal to specified count")

return &command
}

func NewClusterDisableNamespacedMode() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
dryRun bool
)
var command = cobra.Command{
Use: "disable-namespaced-mode PATTERN",
Short: "Disable namespaced mode for clusters which name matches to the specified pattern.",
Run: func(cmd *cobra.Command, args []string) {
log.SetLevel(log.WarnLevel)

if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}

pattern := args[0]

errors.CheckError(runClusterNamespacesCommand(clientConfig, func(_ *versioned.Clientset, argoDB db.ArgoDB, clusters map[string][]string) error {
for server := range clusters {
if !glob.Match(pattern, server) {
continue
}

cluster, err := argoDB.GetCluster(context.Background(), server)
if err != nil {
return err
}

if len(cluster.Namespaces) == 0 {
continue
}

cluster.Namespaces = nil
fmt.Printf("Disabling namespaced mode for cluster %s...", server)
if !dryRun {
_, err = argoDB.UpdateCluster(context.Background(), cluster)
if err != nil {
return err
}
fmt.Println("done")
} else {
fmt.Println("done (dry run)")
}

}
return nil
}))
},
}
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().BoolVar(&dryRun, "dry-run", true, "Print what will be performed")
return &command
}

func NewClusterStatsCommand() *cobra.Command {
var (
shard int
Expand All @@ -200,17 +444,18 @@ func NewClusterStatsCommand() *cobra.Command {
errors.CheckError(err)

kubeClient := kubernetes.NewForConfigOrDie(clientCfg)
appClient := versioned.NewForConfigOrDie(clientCfg)
if replicas == 0 {
replicas, err = getControllerReplicas(kubeClient, namespace)
errors.CheckError(err)
}
clusters, err := loadClusters(kubeClient, replicas, namespace, portForwardRedis, cacheSrc, shard)
clusters, err := loadClusters(kubeClient, appClient, replicas, namespace, portForwardRedis, cacheSrc, shard)
errors.CheckError(err)

w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
_, _ = fmt.Fprintf(w, "SERVER\tSHARD\tCONNECTION\tAPPS COUNT\tRESOURCES COUNT\n")
_, _ = fmt.Fprintf(w, "SERVER\tSHARD\tCONNECTION\tNAMESPACES COUNT\tAPPS COUNT\tRESOURCES COUNT\n")
for _, cluster := range clusters {
_, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%d\t%d\n", cluster.Server, cluster.Shard, cluster.Info.ConnectionState.Status, cluster.Info.ApplicationsCount, cluster.Info.CacheInfo.ResourcesCount)
_, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%d\t%d\t%d\n", cluster.Server, cluster.Shard, cluster.Info.ConnectionState.Status, len(cluster.Namespaces), cluster.Info.ApplicationsCount, cluster.Info.CacheInfo.ResourcesCount)
}
_ = w.Flush()
},
Expand Down Expand Up @@ -315,7 +560,7 @@ func NewGenClusterConfigCommand(pathOpts *clientcmd.PathOptions) *cobra.Command
if clusterOpts.Name != "" {
contextName = clusterOpts.Name
}
clst := cmdutil.NewCluster(contextName, clusterOpts.Namespaces, conf, bearerToken, awsAuthConf, execProviderConf)
clst := cmdutil.NewCluster(contextName, clusterOpts.Namespaces, clusterOpts.ClusterResources, conf, bearerToken, awsAuthConf, execProviderConf)
if clusterOpts.InCluster {
clst.Server = argoappv1.KubernetesInternalAPIServerAddr
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/argocd/commands/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewClusterAddCommand(clientOpts *argocdclient.ClientOptions, pathOpts *clie
if clusterOpts.Name != "" {
contextName = clusterOpts.Name
}
clst := cmdutil.NewCluster(contextName, clusterOpts.Namespaces, conf, managerBearerToken, awsAuthConf, execProviderConf)
clst := cmdutil.NewCluster(contextName, clusterOpts.Namespaces, clusterOpts.ClusterResources, conf, managerBearerToken, awsAuthConf, execProviderConf)
if clusterOpts.InCluster {
clst.Server = argoappv1.KubernetesInternalAPIServerAddr
}
Expand Down
Loading

0 comments on commit 58ac345

Please sign in to comment.