Skip to content

Commit

Permalink
Remove now unused functions used to wait for Deployment / Pod
Browse files Browse the repository at this point in the history
  • Loading branch information
feloy committed Jul 13, 2022
1 parent 7bc8ae8 commit 5102b4e
Show file tree
Hide file tree
Showing 7 changed files with 0 additions and 514 deletions.
125 changes: 0 additions & 125 deletions pkg/kclient/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package kclient
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -27,10 +25,6 @@ func boolPtr(b bool) *bool {
const (
DeploymentKind = "Deployment"
DeploymentAPIVersion = "apps/v1"

// TimedOutReason is added in a deployment when its newest replica set fails to show any progress
// within the given deadline (progressDeadlineSeconds).
timedOutReason = "ProgressDeadlineExceeded"
)

// GetDeploymentByName gets a deployment by querying by name
Expand Down Expand Up @@ -89,125 +83,6 @@ func (c *Client) GetDeploymentFromSelector(selector string) ([]appsv1.Deployment
return deploymentList.Items, nil
}

// getDeploymentCondition returns the condition with the provided type
// from https://github.com/kubernetes/kubectl/blob/8bc20f428d7d5aed031de5fa160081de7b5af2b0/pkg/util/deployment/deployment.go#L58
func getDeploymentCondition(status appsv1.DeploymentStatus, condType appsv1.DeploymentConditionType) *appsv1.DeploymentCondition {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == condType {
return &c
}
}
return nil
}

// WaitForPodDeletion waits for the given pod to be deleted
func (c *Client) WaitForPodDeletion(name string) error {
watcher, err := c.KubeClient.CoreV1().Pods(c.Namespace).Watch(context.TODO(), metav1.ListOptions{FieldSelector: "metadata.name=" + name})
if err != nil {
return err
}
defer watcher.Stop()

if _, err = c.KubeClient.CoreV1().Pods(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); kerrors.IsNotFound(err) {
return nil
}

for {
select {
case <-time.After(time.Minute):
return fmt.Errorf("timeout while waiting for %q pod to be deleted", name)

case val, ok := <-watcher.ResultChan():
if !ok {
return errors.New("error getting value from resultchan")
}
if val.Type == watch.Deleted {
return nil
}
}
}
}

// WaitForDeploymentRollout waits for deployment to finish rollout. Returns the state of the deployment after rollout.
func (c *Client) WaitForDeploymentRollout(deploymentName string) (*appsv1.Deployment, error) {
klog.V(3).Infof("Waiting for %s deployment rollout", deploymentName)

w, err := c.KubeClient.AppsV1().Deployments(c.Namespace).Watch(context.TODO(), metav1.ListOptions{FieldSelector: "metadata.name=" + deploymentName})
if err != nil {
return nil, fmt.Errorf("unable to watch deployment: %w", err)
}
defer w.Stop()

success := make(chan *appsv1.Deployment)
failure := make(chan error)

// Collect all the events in a separate go routine
failedEvents := make(map[string]corev1.Event)
quit := make(chan int)
go c.CollectEvents("", failedEvents, quit)

go func() {
defer close(success)
defer close(failure)

for {
val, ok := <-w.ResultChan()
if !ok {
failure <- errors.New("watch channel was closed")
return
}
// based on https://github.com/kubernetes/kubectl/blob/9a3954bf653c874c8af6f855f2c754a8e1a44b9e/pkg/polymorphichelpers/rollout_status.go#L66-L91
if deployment, ok := val.Object.(*appsv1.Deployment); ok {
for _, cond := range deployment.Status.Conditions {
// using this just for debugging message, so ignoring error on purpose
jsonCond, _ := json.Marshal(cond)
klog.V(3).Infof("Deployment Condition: %s", string(jsonCond))
}
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := getDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing)
if cond != nil && cond.Reason == timedOutReason {
failure <- fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name)
} else if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
klog.V(3).Infof("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas)
} else if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
klog.V(3).Infof("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas)
} else if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
klog.V(3).Infof("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas)
} else {
klog.V(3).Infof("Deployment %q successfully rolled out\n", deployment.Name)
success <- deployment
}
}
klog.V(3).Infof("Waiting for deployment spec update to be observed...\n")

} else {
failure <- errors.New("unable to convert event object to Pod")
}
}
}()

select {
case val := <-success:
return val, nil
case err := <-failure:
return nil, err
case <-time.After(5 * time.Minute):
errorMessage := fmt.Sprintf("timeout while waiting for %s deployment roll out", deploymentName)
if len(failedEvents) != 0 {
tableString := getErrorMessageFromEvents(failedEvents)

errorMessage = errorMessage + fmt.Sprintf(`\nFor more information to help determine the cause of the error, re-run with '-v'.
See below for a list of failed events that occured more than %d times during deployment:
%s`, failedEventCount, tableString.String())

return nil, fmt.Errorf(errorMessage)
}

return nil, fmt.Errorf("timeout while waiting for %s deployment roll out", deploymentName)
}
}

func resourceAsJson(resource interface{}) string {
data, _ := json.MarshalIndent(resource, " ", " ")
return string(data)
Expand Down
59 changes: 0 additions & 59 deletions pkg/kclient/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,11 @@ package kclient

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog"

"github.com/redhat-developer/odo/pkg/log"
)

// We use a mutex here in order to make 100% sure that functions such as CollectEvents
// so that there are no race conditions
var mu sync.Mutex

const (
failedEventCount = 5
)

// CollectEvents collects events in a Goroutine by manipulating a spinner.
// We don't care about the error (it's usually ran in a go routine), so erroring out is not needed.
func (c *Client) CollectEvents(selector string, events map[string]corev1.Event, quit <-chan int) {

// Secondly, we will start a go routine for watching for events related to the pod and update our pod status accordingly.
eventWatcher, err := c.KubeClient.CoreV1().Events(c.Namespace).Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Warningf("Unable to watch for events: %s", err)
return
}
defer eventWatcher.Stop()

// Create an endless loop for collecting
for {
select {
case <-quit:
klog.V(3).Info("Quitting collect events")
return
case val, ok := <-eventWatcher.ResultChan():
mu.Lock()
if !ok {
log.Warning("Watch channel was closed")
return
}
if e, ok := val.Object.(*corev1.Event); ok {

// If there are many warning events happening during deployment, let's log them.
if e.Type == "Warning" {

if e.Count >= failedEventCount {
newEvent := e
(events)[e.Name] = *newEvent
klog.V(3).Infof("Warning Event: Count: %d, Reason: %s, Message: %s", e.Count, e.Reason, e.Message)
}

}

} else {
log.Warning("Unable to convert object to event")
return
}
mu.Unlock()
}
}
}

func (c *Client) PodWarningEventWatcher(ctx context.Context) (watch.Interface, error) {
selector := "involvedObject.kind=Pod,involvedObject.apiVersion=v1,type=Warning"
ns := c.GetCurrentNamespace()
Expand Down
86 changes: 0 additions & 86 deletions pkg/kclient/events_test.go

This file was deleted.

4 changes: 0 additions & 4 deletions pkg/kclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type ClientInterface interface {
GetOneDeployment(componentName, appName string) (*appsv1.Deployment, error)
GetOneDeploymentFromSelector(selector string) (*appsv1.Deployment, error)
GetDeploymentFromSelector(selector string) ([]appsv1.Deployment, error)
WaitForPodDeletion(name string) error
WaitForDeploymentRollout(deploymentName string) (*appsv1.Deployment, error)
CreateDeployment(deploy appsv1.Deployment) (*appsv1.Deployment, error)
UpdateDeployment(deploy appsv1.Deployment) (*appsv1.Deployment, error)
ApplyDeployment(deploy appsv1.Deployment) (*appsv1.Deployment, error)
Expand All @@ -66,7 +64,6 @@ type ClientInterface interface {
DeleteDynamicResource(name string, gvr schema.GroupVersionResource, wait bool) error

// events.go
CollectEvents(selector string, events map[string]corev1.Event, quit <-chan int)
PodWarningEventWatcher(ctx context.Context) (watch.Interface, error)

// kclient.go
Expand Down Expand Up @@ -109,7 +106,6 @@ type ClientInterface interface {
TryWithBlockOwnerDeletion(ownerReference metav1.OwnerReference, exec func(ownerReference metav1.OwnerReference) error) error

// pods.go
WaitAndGetPodWithEvents(selector string, desiredPhase corev1.PodPhase, pushTimeout time.Duration) (*corev1.Pod, error)
ExecCMDInContainer(containerName, podName string, cmd []string, stdout io.Writer, stderr io.Writer, stdin io.Reader, tty bool) error
ExtractProjectToComponent(containerName, podName string, targetPath string, stdin io.Reader) error
GetPodUsingComponentName(componentName string) (*corev1.Pod, error)
Expand Down
Loading

0 comments on commit 5102b4e

Please sign in to comment.