Skip to content

Commit

Permalink
refactor waitForPodsRunning method and add new waitForPodsToDisappear…
Browse files Browse the repository at this point in the history
… method to remove duplicated method
  • Loading branch information
fanhaouu committed Aug 19, 2024
1 parent ad657c3 commit 1e23dbc
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 131 deletions.
3 changes: 2 additions & 1 deletion test/e2e/e2e_duplicatepods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func TestRemoveDuplicates(t *testing.T) {
t.Log("Running removeduplicates plugin")
plugin.(frameworktypes.BalancePlugin).Balance(ctx, workerNodes)

waitForTerminatingPodsToDisappear(ctx, t, clientSet, testNamespace.Name)
waitForPodsToDisappear(ctx, t, clientSet, map[string]string{"app": "test-duplicate", "name": "test-duplicatePods"}, testNamespace.Name)

actualEvictedPodCount := podEvictor.TotalEvicted()
if actualEvictedPodCount != tc.expectedEvictedPodCount {
t.Errorf("Test error for description: %s. Unexpected number of pods have been evicted, got %v, expected %v", tc.description, actualEvictedPodCount, tc.expectedEvictedPodCount)
Expand Down
141 changes: 52 additions & 89 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,67 +194,6 @@ func printPodLogs(ctx context.Context, t *testing.T, kubeClient clientset.Interf
}
}

func waitForDeschedulerPodRunning(t *testing.T, ctx context.Context, kubeClient clientset.Interface, testName string) string {
deschedulerPodName := ""
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
podList, err := kubeClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{"app": "descheduler", "test": testName})).String(),
})
if err != nil {
t.Logf("Unable to list pods: %v", err)
if isClientRateLimiterError(err) {
return false, nil
}
return false, err
}

runningPods := []*v1.Pod{}
for _, item := range podList.Items {
if item.Status.Phase != v1.PodRunning {
continue
}
pod := item
runningPods = append(runningPods, &pod)
}

if len(runningPods) != 1 {
t.Logf("Expected a single running pod, got %v instead", len(runningPods))
return false, nil
}

deschedulerPodName = runningPods[0].Name
t.Logf("Found a descheduler pod running: %v", deschedulerPodName)
return true, nil
}); err != nil {
t.Fatalf("Error waiting for a running descheduler: %v", err)
}
return deschedulerPodName
}

func waitForDeschedulerPodAbsent(t *testing.T, ctx context.Context, kubeClient clientset.Interface, testName string) {
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
podList, err := kubeClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{"app": "descheduler", "test": testName})).String(),
})
if err != nil {
t.Logf("Unable to list pods: %v", err)
if isClientRateLimiterError(err) {
return false, nil
}
return false, err
}

if len(podList.Items) > 0 {
t.Logf("Found a descheduler pod. Waiting until it gets deleted")
return false, nil
}

return true, nil
}); err != nil {
t.Fatalf("Error waiting for a descheduler to disapear: %v", err)
}
}

func TestMain(m *testing.M) {
if os.Getenv("DESCHEDULER_IMAGE") == "" {
klog.Errorf("DESCHEDULER_IMAGE env is not set")
Expand Down Expand Up @@ -680,7 +619,7 @@ func TestLowNodeUtilization(t *testing.T) {
}
plugin.(frameworktypes.BalancePlugin).Balance(ctx, workerNodes)

waitForTerminatingPodsToDisappear(ctx, t, clientSet, rc.Namespace)
waitForPodsToDisappear(ctx, t, clientSet, rc.Spec.Template.Labels, rc.Namespace)

podFilter, err = podutil.NewOptions().WithFilter(handle.EvictorFilterImpl.Filter).BuildFilterFunc()
if err != nil {
Expand Down Expand Up @@ -1379,7 +1318,7 @@ func TestPodLifeTimeOldestEvicted(t *testing.T) {
t.Log("Finished PodLifetime plugin")

t.Logf("Wait for terminating pod to disappear")
waitForTerminatingPodsToDisappear(ctx, t, clientSet, rc.Namespace)
waitForPodsToDisappear(ctx, t, clientSet, rc.Spec.Template.Labels, rc.Namespace)

podList, err = clientSet.CoreV1().Pods(rc.Namespace).List(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(rc.Spec.Template.Labels).String()})
if err != nil {
Expand Down Expand Up @@ -1453,24 +1392,6 @@ func waitForRCPodsRunning(ctx context.Context, t *testing.T, clientSet clientset
}
}

func waitForTerminatingPodsToDisappear(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
podList, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return false, err
}
for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
t.Logf("Pod %v still terminating", pod.Name)
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("Error waiting for terminating pods to disappear: %v", err)
}
}

func deleteDS(ctx context.Context, t *testing.T, clientSet clientset.Interface, ds *appsv1.DaemonSet) {
// adds nodeselector to avoid any nodes by setting an unused label
dsDeepCopy := ds.DeepCopy()
Expand Down Expand Up @@ -1775,6 +1696,10 @@ func waitForPodRunning(ctx context.Context, t *testing.T, clientSet clientset.In
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
podItem, err := clientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
t.Logf("Unable to list pods: %v", err)
if isClientRateLimiterError(err) {
return false, nil
}
return false, err
}

Expand All @@ -1789,27 +1714,65 @@ func waitForPodRunning(ctx context.Context, t *testing.T, clientSet clientset.In
}
}

func waitForPodsRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, labelMap map[string]string, desireRunningPodNum int, namespace string) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
func waitForPodsRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, labelMap map[string]string, desireRunningPodNum int, namespace string) string {
runningPodName := ""
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
podList, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
})
if err != nil {
t.Logf("Unable to list pods: %v", err)
if isClientRateLimiterError(err) {
return false, nil
}
return false, err
}
if len(podList.Items) != desireRunningPodNum {
t.Logf("Waiting for %v pods to be running, got %v instead", desireRunningPodNum, len(podList.Items))

runningPods := []*v1.Pod{}
for _, item := range podList.Items {
if item.Status.Phase != v1.PodRunning {
continue
}
pod := item
runningPods = append(runningPods, &pod)
}

if len(runningPods) != desireRunningPodNum {
t.Logf("Waiting for %v pods to be running, got %v instead", desireRunningPodNum, len(runningPods))
return false, nil
}
for _, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
t.Logf("Pod %v not running yet, is %v instead", pod.Name, pod.Status.Phase)

if desireRunningPodNum == 1 {
runningPodName = runningPods[0].Name
}

return true, nil
}); err != nil {
t.Fatalf("Error waiting for pods running: %v", err)
}
return runningPodName
}

func waitForPodsToDisappear(ctx context.Context, t *testing.T, clientSet clientset.Interface, labelMap map[string]string, namespace string) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
podList, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
})
if err != nil {
t.Logf("Unable to list pods: %v", err)
if isClientRateLimiterError(err) {
return false, nil
}
return false, err
}

if len(podList.Items) > 0 {
t.Logf("Found a existing pod. Waiting until it gets deleted")
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("Error waiting for pods running: %v", err)
t.Fatalf("Error waiting for pods to disappear: %v", err)
}
}

Expand Down
14 changes: 8 additions & 6 deletions test/e2e/e2e_toomanyrestarts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
componentbaseconfig "k8s.io/component-base/config"

"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
apiv1alpha2 "sigs.k8s.io/descheduler/pkg/api/v1alpha2"
Expand Down Expand Up @@ -101,7 +101,6 @@ func TestTooManyRestarts(t *testing.T) {
if _, err := clientSet.CoreV1().Namespaces().Create(ctx, testNamespace, metav1.CreateOptions{}); err != nil {
t.Fatalf("Unable to create ns %v", testNamespace.Name)
}
defer clientSet.CoreV1().Namespaces().Delete(ctx, testNamespace.Name, metav1.DeleteOptions{})

deploymentObj := buildTestDeployment("restart-pod", testNamespace.Name, deploymentReplicas, map[string]string{"test": "restart-pod", "name": "test-toomanyrestarts"}, func(deployment *appsv1.Deployment) {
deployment.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh"}
Expand All @@ -119,7 +118,11 @@ func TestTooManyRestarts(t *testing.T) {
}
return
}
defer clientSet.AppsV1().Deployments(deploymentObj.Namespace).Delete(ctx, deploymentObj.Name, metav1.DeleteOptions{})
defer func() {
clientSet.AppsV1().Deployments(deploymentObj.Namespace).Delete(ctx, deploymentObj.Name, metav1.DeleteOptions{})
waitForPodsToDisappear(ctx, t, clientSet, deploymentObj.Labels, deploymentObj.Namespace)
clientSet.CoreV1().Namespaces().Delete(ctx, testNamespace.Name, metav1.DeleteOptions{})
}()

// Wait for 3 restarts
waitPodRestartCount(ctx, clientSet, testNamespace.Name, t, 3)
Expand Down Expand Up @@ -187,11 +190,11 @@ func TestTooManyRestarts(t *testing.T) {
if err != nil {
t.Fatalf("Unable to delete %q deployment: %v", deschedulerDeploymentObj.Name, err)
}
waitForDeschedulerPodAbsent(t, ctx, clientSet, testNamespace.Name)
waitForPodsToDisappear(ctx, t, clientSet, deschedulerDeploymentObj.Labels, deschedulerDeploymentObj.Namespace)
}()

t.Logf("Waiting for the descheduler pod running")
deschedulerPodName = waitForDeschedulerPodRunning(t, ctx, clientSet, testNamespace.Name)
deschedulerPodName = waitForPodsRunning(ctx, t, clientSet, deschedulerDeploymentObj.Labels, 1, deschedulerDeploymentObj.Namespace)

// Run RemovePodsHavingTooManyRestarts strategy
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 20*time.Second, true, func(ctx context.Context) (bool, error) {
Expand All @@ -208,7 +211,6 @@ func TestTooManyRestarts(t *testing.T) {
}); err != nil {
t.Errorf("Error waiting for descheduler running: %v", err)
}
waitForTerminatingPodsToDisappear(ctx, t, clientSet, testNamespace.Name)
})
}
}
Expand Down
21 changes: 10 additions & 11 deletions test/e2e/e2e_topologyspreadconstraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint"
frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/test"
)

const zoneTopologyKey string = "topology.kubernetes.io/zone"
Expand Down Expand Up @@ -132,8 +131,11 @@ func TestTopologySpreadConstraint(t *testing.T) {
if _, err := clientSet.AppsV1().Deployments(deployment.Namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating Deployment %s %v", name, err)
}
defer test.DeleteDeployment(ctx, t, clientSet, deployment)
test.WaitForDeploymentPodsRunning(ctx, t, clientSet, deployment)
defer func() {
clientSet.AppsV1().Deployments(deployment.Namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{})
waitForPodsToDisappear(ctx, t, clientSet, deployment.Labels, deployment.Namespace)
}()
waitForPodsRunning(ctx, t, clientSet, deployment.Labels, tc.replicaCount, deployment.Namespace)

// Create a "Violator" Deployment that has the same label and is forced to be on the same node using a nodeSelector
violatorDeploymentName := name + "-violator"
Expand All @@ -144,8 +146,11 @@ func TestTopologySpreadConstraint(t *testing.T) {
if _, err := clientSet.AppsV1().Deployments(deployment.Namespace).Create(ctx, violatorDeployment, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating Deployment %s: %v", violatorDeploymentName, err)
}
defer test.DeleteDeployment(ctx, t, clientSet, violatorDeployment)
test.WaitForDeploymentPodsRunning(ctx, t, clientSet, violatorDeployment)
defer func() {
clientSet.AppsV1().Deployments(violatorDeployment.Namespace).Delete(ctx, violatorDeployment.Name, metav1.DeleteOptions{})
waitForPodsToDisappear(ctx, t, clientSet, violatorDeployment.Labels, violatorDeployment.Namespace)
}()
waitForPodsRunning(ctx, t, clientSet, violatorDeployment.Labels, int(violatorCount), violatorDeployment.Namespace)

evictionPolicyGroupVersion, err := eutils.SupportEviction(clientSet)
if err != nil || len(evictionPolicyGroupVersion) == 0 {
Expand Down Expand Up @@ -181,9 +186,6 @@ func TestTopologySpreadConstraint(t *testing.T) {
plugin.(frameworktypes.BalancePlugin).Balance(ctx, workerNodes)
t.Logf("Finished RemovePodsViolatingTopologySpreadConstraint strategy for %s", name)

t.Logf("Wait for terminating pods of %s to disappear", name)
waitForTerminatingPodsToDisappear(ctx, t, clientSet, deployment.Namespace)

if totalEvicted := podEvictor.TotalEvicted(); totalEvicted == tc.expectedEvictedCount {
t.Logf("Total of %d Pods were evicted for %s", totalEvicted, name)
} else {
Expand All @@ -194,9 +196,6 @@ func TestTopologySpreadConstraint(t *testing.T) {
return
}

// Ensure recently evicted Pod are rescheduled and running before asserting for a balanced topology spread
test.WaitForDeploymentPodsRunning(ctx, t, clientSet, deployment)

listOptions := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(tc.topologySpreadConstraint.LabelSelector.MatchLabels).String()}
pods, err := clientSet.CoreV1().Pods(testNamespace.Name).List(ctx, listOptions)
if err != nil {
Expand Down
24 changes: 0 additions & 24 deletions test/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,30 +241,6 @@ func DeleteDeployment(ctx context.Context, t *testing.T, clientSet clientset.Int
}
}

func WaitForDeploymentPodsRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, deployment *appsv1.Deployment) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(c context.Context) (bool, error) {
podList, err := clientSet.CoreV1().Pods(deployment.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(deployment.Spec.Template.ObjectMeta.Labels).String(),
})
if err != nil {
return false, err
}
if len(podList.Items) != int(*deployment.Spec.Replicas) {
t.Logf("Waiting for %v pods to be created, got %v instead", *deployment.Spec.Replicas, len(podList.Items))
return false, nil
}
for _, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
t.Logf("Pod %v not running yet, is %v instead", pod.Name, pod.Status.Phase)
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("Error waiting for pods running: %v", err)
}
}

func SetPodAntiAffinity(inputPod *v1.Pod, labelKey, labelValue string) {
inputPod.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
Expand Down

0 comments on commit 1e23dbc

Please sign in to comment.