diff --git a/pkg/kclient/deployments.go b/pkg/kclient/deployments.go index 889f0119a42..7c9297b5e2c 100644 --- a/pkg/kclient/deployments.go +++ b/pkg/kclient/deployments.go @@ -3,9 +3,7 @@ package kclient import ( "context" "encoding/json" - "errors" "fmt" - "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -27,10 +25,6 @@ func boolPtr(b bool) *bool { const ( DeploymentKind = "Deployment" DeploymentAPIVersion = "apps/v1" - - // TimedOutReason is added in a deployment when its newest replica set fails to show any progress - // within the given deadline (progressDeadlineSeconds). - timedOutReason = "ProgressDeadlineExceeded" ) // GetDeploymentByName gets a deployment by querying by name @@ -89,125 +83,6 @@ func (c *Client) GetDeploymentFromSelector(selector string) ([]appsv1.Deployment return deploymentList.Items, nil } -// getDeploymentCondition returns the condition with the provided type -// from https://github.com/kubernetes/kubectl/blob/8bc20f428d7d5aed031de5fa160081de7b5af2b0/pkg/util/deployment/deployment.go#L58 -func getDeploymentCondition(status appsv1.DeploymentStatus, condType appsv1.DeploymentConditionType) *appsv1.DeploymentCondition { - for i := range status.Conditions { - c := status.Conditions[i] - if c.Type == condType { - return &c - } - } - return nil -} - -// WaitForPodDeletion waits for the given pod to be deleted -func (c *Client) WaitForPodDeletion(name string) error { - watcher, err := c.KubeClient.CoreV1().Pods(c.Namespace).Watch(context.TODO(), metav1.ListOptions{FieldSelector: "metadata.name=" + name}) - if err != nil { - return err - } - defer watcher.Stop() - - if _, err = c.KubeClient.CoreV1().Pods(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); kerrors.IsNotFound(err) { - return nil - } - - for { - select { - case <-time.After(time.Minute): - return fmt.Errorf("timeout while waiting for %q pod to be deleted", name) - - case val, ok := <-watcher.ResultChan(): - if !ok { - return errors.New("error getting value from resultchan") - } - if val.Type == watch.Deleted { - return nil - } - } - } -} - -// WaitForDeploymentRollout waits for deployment to finish rollout. Returns the state of the deployment after rollout. -func (c *Client) WaitForDeploymentRollout(deploymentName string) (*appsv1.Deployment, error) { - klog.V(3).Infof("Waiting for %s deployment rollout", deploymentName) - - w, err := c.KubeClient.AppsV1().Deployments(c.Namespace).Watch(context.TODO(), metav1.ListOptions{FieldSelector: "metadata.name=" + deploymentName}) - if err != nil { - return nil, fmt.Errorf("unable to watch deployment: %w", err) - } - defer w.Stop() - - success := make(chan *appsv1.Deployment) - failure := make(chan error) - - // Collect all the events in a separate go routine - failedEvents := make(map[string]corev1.Event) - quit := make(chan int) - go c.CollectEvents("", failedEvents, quit) - - go func() { - defer close(success) - defer close(failure) - - for { - val, ok := <-w.ResultChan() - if !ok { - failure <- errors.New("watch channel was closed") - return - } - // based on https://github.com/kubernetes/kubectl/blob/9a3954bf653c874c8af6f855f2c754a8e1a44b9e/pkg/polymorphichelpers/rollout_status.go#L66-L91 - if deployment, ok := val.Object.(*appsv1.Deployment); ok { - for _, cond := range deployment.Status.Conditions { - // using this just for debugging message, so ignoring error on purpose - jsonCond, _ := json.Marshal(cond) - klog.V(3).Infof("Deployment Condition: %s", string(jsonCond)) - } - if deployment.Generation <= deployment.Status.ObservedGeneration { - cond := getDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing) - if cond != nil && cond.Reason == timedOutReason { - failure <- fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name) - } else if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { - klog.V(3).Infof("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas) - } else if deployment.Status.Replicas > deployment.Status.UpdatedReplicas { - klog.V(3).Infof("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas) - } else if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas { - klog.V(3).Infof("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas) - } else { - klog.V(3).Infof("Deployment %q successfully rolled out\n", deployment.Name) - success <- deployment - } - } - klog.V(3).Infof("Waiting for deployment spec update to be observed...\n") - - } else { - failure <- errors.New("unable to convert event object to Pod") - } - } - }() - - select { - case val := <-success: - return val, nil - case err := <-failure: - return nil, err - case <-time.After(5 * time.Minute): - errorMessage := fmt.Sprintf("timeout while waiting for %s deployment roll out", deploymentName) - if len(failedEvents) != 0 { - tableString := getErrorMessageFromEvents(failedEvents) - - errorMessage = errorMessage + fmt.Sprintf(`\nFor more information to help determine the cause of the error, re-run with '-v'. -See below for a list of failed events that occured more than %d times during deployment: -%s`, failedEventCount, tableString.String()) - - return nil, fmt.Errorf(errorMessage) - } - - return nil, fmt.Errorf("timeout while waiting for %s deployment roll out", deploymentName) - } -} - func resourceAsJson(resource interface{}) string { data, _ := json.MarshalIndent(resource, " ", " ") return string(data) diff --git a/pkg/kclient/events.go b/pkg/kclient/events.go index e1788e7528a..9f42173a4c3 100644 --- a/pkg/kclient/events.go +++ b/pkg/kclient/events.go @@ -2,70 +2,11 @@ package kclient import ( "context" - "sync" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" - "k8s.io/klog" - - "github.com/redhat-developer/odo/pkg/log" -) - -// We use a mutex here in order to make 100% sure that functions such as CollectEvents -// so that there are no race conditions -var mu sync.Mutex - -const ( - failedEventCount = 5 ) -// CollectEvents collects events in a Goroutine by manipulating a spinner. -// We don't care about the error (it's usually ran in a go routine), so erroring out is not needed. -func (c *Client) CollectEvents(selector string, events map[string]corev1.Event, quit <-chan int) { - - // Secondly, we will start a go routine for watching for events related to the pod and update our pod status accordingly. - eventWatcher, err := c.KubeClient.CoreV1().Events(c.Namespace).Watch(context.TODO(), metav1.ListOptions{}) - if err != nil { - log.Warningf("Unable to watch for events: %s", err) - return - } - defer eventWatcher.Stop() - - // Create an endless loop for collecting - for { - select { - case <-quit: - klog.V(3).Info("Quitting collect events") - return - case val, ok := <-eventWatcher.ResultChan(): - mu.Lock() - if !ok { - log.Warning("Watch channel was closed") - return - } - if e, ok := val.Object.(*corev1.Event); ok { - - // If there are many warning events happening during deployment, let's log them. - if e.Type == "Warning" { - - if e.Count >= failedEventCount { - newEvent := e - (events)[e.Name] = *newEvent - klog.V(3).Infof("Warning Event: Count: %d, Reason: %s, Message: %s", e.Count, e.Reason, e.Message) - } - - } - - } else { - log.Warning("Unable to convert object to event") - return - } - mu.Unlock() - } - } -} - func (c *Client) PodWarningEventWatcher(ctx context.Context) (watch.Interface, error) { selector := "involvedObject.kind=Pod,involvedObject.apiVersion=v1,type=Warning" ns := c.GetCurrentNamespace() diff --git a/pkg/kclient/events_test.go b/pkg/kclient/events_test.go deleted file mode 100644 index 4c8f338620b..00000000000 --- a/pkg/kclient/events_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package kclient - -import ( - "fmt" - "strings" - "testing" - "time" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - ktesting "k8s.io/client-go/testing" -) - -func fakeEventStatus(podName string, eventWarningMessage string, count int32) *corev1.Event { - return &corev1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - }, - Type: "Warning", - Count: count, - Reason: eventWarningMessage, - Message: "Foobar", - } -} - -func TestCollectEvents(t *testing.T) { - tests := []struct { - name string - podName string - eventWarningMessage string - }{ - { - name: "Case 1: Collect an arbitrary amount of events", - podName: "ruby", - eventWarningMessage: "Fake event warning message", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - // Create a fake client - fakeClient, fakeClientSet := FakeNew() - fakeEventWatch := watch.NewRaceFreeFake() - podSelector := fmt.Sprintf("deploymentconfig=%s", tt.podName) - - // Create a fake event status / watch reactor for faking the events we are collecting - fakeEvent := fakeEventStatus(tt.podName, tt.eventWarningMessage, 10) - go func(event *corev1.Event) { - fakeEventWatch.Add(event) - }(fakeEvent) - - fakeClientSet.Kubernetes.PrependWatchReactor("events", func(action ktesting.Action) (handled bool, ret watch.Interface, err error) { - return true, fakeEventWatch, nil - }) - - events := make(map[string]corev1.Event) - quit := make(chan int) - go fakeClient.CollectEvents(podSelector, events, quit) - - // Sleep in order to make sure we actually collect some events - time.Sleep(2 * time.Second) - close(quit) - - // We make sure to lock in order to prevent race conditions when retrieving the events (since they are a pointer - // by default since we pass in a map) - mu.Lock() - if len(events) == 0 { - t.Errorf("Expected events, got none") - } - mu.Unlock() - - // Collect the first event in the map - var firstEvent corev1.Event - for _, val := range events { - firstEvent = val - } - - if !strings.Contains(firstEvent.Reason, tt.eventWarningMessage) { - t.Errorf("expected warning message: '%s' in event message: '%+v'", tt.eventWarningMessage, firstEvent.Reason) - } - - }) - } -} diff --git a/pkg/kclient/interface.go b/pkg/kclient/interface.go index 814961d92ba..dc9699442e7 100644 --- a/pkg/kclient/interface.go +++ b/pkg/kclient/interface.go @@ -49,8 +49,6 @@ type ClientInterface interface { GetOneDeployment(componentName, appName string) (*appsv1.Deployment, error) GetOneDeploymentFromSelector(selector string) (*appsv1.Deployment, error) GetDeploymentFromSelector(selector string) ([]appsv1.Deployment, error) - WaitForPodDeletion(name string) error - WaitForDeploymentRollout(deploymentName string) (*appsv1.Deployment, error) CreateDeployment(deploy appsv1.Deployment) (*appsv1.Deployment, error) UpdateDeployment(deploy appsv1.Deployment) (*appsv1.Deployment, error) ApplyDeployment(deploy appsv1.Deployment) (*appsv1.Deployment, error) @@ -66,7 +64,6 @@ type ClientInterface interface { DeleteDynamicResource(name string, gvr schema.GroupVersionResource, wait bool) error // events.go - CollectEvents(selector string, events map[string]corev1.Event, quit <-chan int) PodWarningEventWatcher(ctx context.Context) (watch.Interface, error) // kclient.go @@ -109,7 +106,6 @@ type ClientInterface interface { TryWithBlockOwnerDeletion(ownerReference metav1.OwnerReference, exec func(ownerReference metav1.OwnerReference) error) error // pods.go - WaitAndGetPodWithEvents(selector string, desiredPhase corev1.PodPhase, pushTimeout time.Duration) (*corev1.Pod, error) ExecCMDInContainer(containerName, podName string, cmd []string, stdout io.Writer, stderr io.Writer, stdin io.Reader, tty bool) error ExtractProjectToComponent(containerName, podName string, targetPath string, stdin io.Reader) error GetPodUsingComponentName(componentName string) (*corev1.Pod, error) diff --git a/pkg/kclient/mock_Client.go b/pkg/kclient/mock_Client.go index 53c7a96e59e..727ba01770f 100644 --- a/pkg/kclient/mock_Client.go +++ b/pkg/kclient/mock_Client.go @@ -98,18 +98,6 @@ func (mr *MockClientInterfaceMockRecorder) ApplyDeployment(deploy interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ApplyDeployment", reflect.TypeOf((*MockClientInterface)(nil).ApplyDeployment), deploy) } -// CollectEvents mocks base method. -func (m *MockClientInterface) CollectEvents(selector string, events map[string]v11.Event, quit <-chan int) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "CollectEvents", selector, events, quit) -} - -// CollectEvents indicates an expected call of CollectEvents. -func (mr *MockClientInterfaceMockRecorder) CollectEvents(selector, events, quit interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CollectEvents", reflect.TypeOf((*MockClientInterface)(nil).CollectEvents), selector, events, quit) -} - // CreateDeployment mocks base method. func (m *MockClientInterface) CreateDeployment(deploy v10.Deployment) (*v10.Deployment, error) { m.ctrl.T.Helper() @@ -1404,21 +1392,6 @@ func (mr *MockClientInterfaceMockRecorder) UpdateStorageOwnerReference(pvc inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateStorageOwnerReference", reflect.TypeOf((*MockClientInterface)(nil).UpdateStorageOwnerReference), varargs...) } -// WaitAndGetPodWithEvents mocks base method. -func (m *MockClientInterface) WaitAndGetPodWithEvents(selector string, desiredPhase v11.PodPhase, pushTimeout time.Duration) (*v11.Pod, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WaitAndGetPodWithEvents", selector, desiredPhase, pushTimeout) - ret0, _ := ret[0].(*v11.Pod) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WaitAndGetPodWithEvents indicates an expected call of WaitAndGetPodWithEvents. -func (mr *MockClientInterfaceMockRecorder) WaitAndGetPodWithEvents(selector, desiredPhase, pushTimeout interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitAndGetPodWithEvents", reflect.TypeOf((*MockClientInterface)(nil).WaitAndGetPodWithEvents), selector, desiredPhase, pushTimeout) -} - // WaitAndGetSecret mocks base method. func (m *MockClientInterface) WaitAndGetSecret(name, namespace string) (*v11.Secret, error) { m.ctrl.T.Helper() @@ -1434,35 +1407,6 @@ func (mr *MockClientInterfaceMockRecorder) WaitAndGetSecret(name, namespace inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitAndGetSecret", reflect.TypeOf((*MockClientInterface)(nil).WaitAndGetSecret), name, namespace) } -// WaitForDeploymentRollout mocks base method. -func (m *MockClientInterface) WaitForDeploymentRollout(deploymentName string) (*v10.Deployment, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WaitForDeploymentRollout", deploymentName) - ret0, _ := ret[0].(*v10.Deployment) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WaitForDeploymentRollout indicates an expected call of WaitForDeploymentRollout. -func (mr *MockClientInterfaceMockRecorder) WaitForDeploymentRollout(deploymentName interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForDeploymentRollout", reflect.TypeOf((*MockClientInterface)(nil).WaitForDeploymentRollout), deploymentName) -} - -// WaitForPodDeletion mocks base method. -func (m *MockClientInterface) WaitForPodDeletion(name string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WaitForPodDeletion", name) - ret0, _ := ret[0].(error) - return ret0 -} - -// WaitForPodDeletion indicates an expected call of WaitForPodDeletion. -func (mr *MockClientInterfaceMockRecorder) WaitForPodDeletion(name interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForPodDeletion", reflect.TypeOf((*MockClientInterface)(nil).WaitForPodDeletion), name) -} - // WaitForServiceAccountInNamespace mocks base method. func (m *MockClientInterface) WaitForServiceAccountInNamespace(namespace, serviceAccountName string) error { m.ctrl.T.Helper() diff --git a/pkg/kclient/pods.go b/pkg/kclient/pods.go index 8e4f3768b67..e3da5e39183 100644 --- a/pkg/kclient/pods.go +++ b/pkg/kclient/pods.go @@ -3,12 +3,9 @@ package kclient import ( "bytes" "context" - "encoding/json" - "errors" "fmt" "io" "strings" - "time" "k8s.io/klog" @@ -23,102 +20,6 @@ import ( "k8s.io/client-go/tools/remotecommand" ) -// WaitAndGetPod block and waits until pod matching selector is in in Running state -// desiredPhase cannot be PodFailed or PodUnknown -func (c *Client) WaitAndGetPodWithEvents(selector string, desiredPhase corev1.PodPhase, pushTimeout time.Duration) (*corev1.Pod, error) { - - klog.V(3).Infof("Waiting for %s pod", selector) - - var spinner *log.Status - defer func() { - if spinner != nil { - spinner.End(false) - } - }() - - w, err := c.KubeClient.CoreV1().Pods(c.Namespace).Watch(context.TODO(), metav1.ListOptions{ - LabelSelector: selector, - }) - if err != nil { - return nil, fmt.Errorf("unable to watch pod: %w", err) - } - defer w.Stop() - - // Here we are going to start a loop watching for the pod status - podChannel := make(chan *corev1.Pod) - watchErrorChannel := make(chan error) - failedEvents := make(map[string]corev1.Event) - go func() { - loop: - for { - val, ok := <-w.ResultChan() - if !ok { - watchErrorChannel <- errors.New("watch channel was closed") - break loop - } - if e, ok := val.Object.(*corev1.Pod); ok { - klog.V(3).Infof("Status of %s pod is %s", e.Name, e.Status.Phase) - for _, cond := range e.Status.Conditions { - // using this just for debugging message, so ignoring error on purpose - jsonCond, _ := json.Marshal(cond) - klog.V(3).Infof("Pod Conditions: %s", string(jsonCond)) - } - for _, status := range e.Status.ContainerStatuses { - // using this just for debugging message, so ignoring error on purpose - jsonStatus, _ := json.Marshal(status) - klog.V(3).Infof("Container Status: %s", string(jsonStatus)) - } - switch e.Status.Phase { - case desiredPhase: - klog.V(3).Infof("Pod %s is %v", e.Name, desiredPhase) - podChannel <- e - break loop - case corev1.PodFailed, corev1.PodUnknown: - watchErrorChannel <- fmt.Errorf("pod %s status %s", e.Name, e.Status.Phase) - break loop - default: - // we start in a phase different from the desired one, let's wait - // Collect all the events in a separate go routine - quit := make(chan int) - go c.CollectEvents(selector, failedEvents, quit) - defer close(quit) - } - } else { - watchErrorChannel <- errors.New("unable to convert event object to Pod") - break loop - } - } - close(podChannel) - close(watchErrorChannel) - }() - - select { - case val := <-podChannel: - if spinner != nil { - spinner.End(true) - } - return val, nil - case err := <-watchErrorChannel: - return nil, err - case <-time.After(pushTimeout): - - // Create a useful error if there are any failed events - errorMessage := fmt.Sprintf(`waited %s but couldn't find running pod matching selector: '%s'`, pushTimeout, selector) - - if len(failedEvents) != 0 { - - tableString := getErrorMessageFromEvents(failedEvents) - - errorMessage = fmt.Sprintf(`waited %s but was unable to find a running pod matching selector: '%s' -For more information to help determine the cause of the error, re-run with '-v'. -See below for a list of failed events that occured more than %d times during deployment: -%s`, pushTimeout, selector, failedEventCount, tableString.String()) - } - - return nil, fmt.Errorf(errorMessage) - } -} - // ExecCMDInContainer execute command in the container of a pod, pass an empty string for containerName to execute in the first container of the pod func (c *Client) ExecCMDInContainer(containerName, podName string, cmd []string, stdout io.Writer, stderr io.Writer, stdin io.Reader, tty bool) error { podExecOptions := corev1.PodExecOptions{ diff --git a/pkg/kclient/pods_test.go b/pkg/kclient/pods_test.go index 626dbe6dd22..0d9924a0355 100644 --- a/pkg/kclient/pods_test.go +++ b/pkg/kclient/pods_test.go @@ -8,97 +8,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" - "github.com/redhat-developer/odo/pkg/preference" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - ktesting "k8s.io/client-go/testing" ) -func fakePodStatus(status corev1.PodPhase, podName string) *corev1.Pod { - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - }, - Status: corev1.PodStatus{ - Phase: status, - }, - } -} - -// NOTE: We do *not* collection the amount of actions taken in this function as there could be any number of fake -// 'event' actions that are happening in the background. -func TestWaitAndGetPodWithEvents(t *testing.T) { - tests := []struct { - name string - podName string - status corev1.PodPhase - wantEventWarning bool - wantErr bool - eventWarningMessage string - }{ - { - name: "Case 1: Pod running", - podName: "ruby", - status: corev1.PodRunning, - wantEventWarning: false, - wantErr: false, - }, - { - name: "Case 2: Pod failed", - podName: "ruby", - status: corev1.PodFailed, - wantEventWarning: false, - wantErr: true, - }, - { - name: "Case 3: Pod unknown", - podName: "ruby", - status: corev1.PodUnknown, - wantEventWarning: false, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - fakeClient, fakeClientSet := FakeNew() - fakePodWatch := watch.NewRaceFreeFake() - - // Watch for Pods - fakePod := fakePodStatus(tt.status, tt.podName) - go func(pod *corev1.Pod) { - fakePodWatch.Modify(pod) - }(fakePod) - - // Prepend watch reactor (beginning of the chain) - fakeClientSet.Kubernetes.PrependWatchReactor("pods", func(action ktesting.Action) (handled bool, ret watch.Interface, err error) { - return true, fakePodWatch, nil - }) - - podSelector := fmt.Sprintf("deploymentconfig=%s", tt.podName) - - pod, err := fakeClient.WaitAndGetPodWithEvents(podSelector, corev1.PodRunning, preference.DefaultPushTimeout) - - if !tt.wantErr == (err != nil) { - t.Errorf("client.WaitAndGetPod(string) unexpected error %v, wantErr %v", err, tt.wantErr) - return - } - - if err == nil { - if pod.Name != tt.podName { - t.Errorf("pod name is not matching to expected name, expected: %s, got %s", tt.podName, pod.Name) - } - } - - }) - } -} - func TestGetOnePodFromSelector(t *testing.T) { fakePod := FakePodStatus(corev1.PodRunning, "nodejs") fakePod.Labels["component"] = "nodejs"