From 0e0307de6f89232eab534818602a0e178bc612af Mon Sep 17 00:00:00 2001 From: Zack Zlotnik Date: Wed, 23 Mar 2022 11:32:35 -0400 Subject: [PATCH] refactored test suite --- go.mod | 5 +- go.sum | 4 + tests/layering/helpers_test.go | 611 ++++++++++++++++++++++++++++++++ tests/layering/layering_test.go | 291 ++++++--------- tests/layering/node_cmd_test.go | 436 +++++++++++++++++++++++ 5 files changed, 1156 insertions(+), 191 deletions(-) create mode 100644 tests/layering/helpers_test.go create mode 100644 tests/layering/node_cmd_test.go diff --git a/go.mod b/go.mod index b1ee73ee..5316a80b 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/stretchr/testify v1.7.0 k8s.io/api v0.23.4 k8s.io/apimachinery v0.23.4 + k8s.io/client-go v0.23.1 + k8s.io/kubectl v0.23.0 k8s.io/kubernetes v1.23.0 ) @@ -24,9 +26,11 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/onsi/ginkgo v1.16.5 // indirect @@ -49,7 +53,6 @@ require ( gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect k8s.io/apiextensions-apiserver v0.23.1 // indirect k8s.io/apiserver v0.23.0 // indirect - k8s.io/client-go v0.23.1 // indirect k8s.io/component-base v0.23.0 // indirect k8s.io/klog/v2 v2.40.1 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect diff --git a/go.sum b/go.sum index 0758764c..50e4d203 100644 --- a/go.sum +++ b/go.sum @@ -380,6 +380,7 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1 h1:yY9rWGoXv1U5pl4gxqlULARMQD7x0QG85lqEXTWysik= github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -590,6 +591,7 @@ github.com/google/uuid v0.0.0-20161128191214-064e2069ce9c/go.mod h1:TIyPZe4Mgqvf github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -809,6 +811,7 @@ github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/moby/ipvs v1.0.1/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hxQ= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/moby/sys/mountinfo v0.4.0/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A= github.com/moby/sys/mountinfo v0.4.1/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A= @@ -1830,6 +1833,7 @@ k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKb k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk= k8s.io/kube-proxy v0.23.0/go.mod h1:AFPfNIiOeDhHVtfN7ZfE1Wd8aP5qYov3khPu4VFeBb4= k8s.io/kube-scheduler v0.23.0/go.mod h1:BXDjbJEXtr9PU5/XzLtWMNG6Mid4GYBSGVWzP72UxKk= +k8s.io/kubectl v0.23.0 h1:WABWfj+Z4tC3SfKBCtZr5sIVHsFtkU9Azii4DR9IT6Y= k8s.io/kubectl v0.23.0/go.mod h1:TfcGEs3u4dkmoC2eku1GYymdGaMtPMcaLLFrX/RB2kI= k8s.io/kubelet v0.23.0/go.mod h1:A4DxfIt5Ka+rz54HAFhs1bgiFjJT6lcaAYUcACZl1/k= k8s.io/kubernetes v1.23.0 h1:r2DrryCpnmFfBuelpUNSWXHtD6Zy7SdwaCcycV5DsJE= diff --git a/tests/layering/helpers_test.go b/tests/layering/helpers_test.go new file mode 100644 index 00000000..ab1092f4 --- /dev/null +++ b/tests/layering/helpers_test.go @@ -0,0 +1,611 @@ +package e2e_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "os" + "path" + "strings" + "sync" + "testing" + "time" + + buildv1 "github.com/openshift/api/build/v1" + imagev1 "github.com/openshift/api/image/v1" + machineClient "github.com/openshift/client-go/machine/clientset/versioned" + "github.com/openshift/machine-config-operator/test/framework" + "github.com/openshift/machine-config-operator/test/helpers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + aggregatedErr "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/kubernetes/pkg/credentialprovider" +) + +// Gets the RHCOS base image tag from the BRANCH environment variable or +// defaults to "latest"; e.g., registry.ci.openshift.org/rhcos-devel/rhel-coreos: +func getBaseImageTag() string { + branch, found := os.LookupEnv("BRANCH") + if !found || branch == "" { + return "latest" + } + + return strings.ReplaceAll(branch, "release-", "") +} + +// Builds a derived OS image and places it into an Imagesteam. Attempts to use +// a previously-built image, if one is available. +func maybeBuildDerivedOSImage(t *testing.T, cs *framework.ClientSet) error { + imageStreamConfig := &imagev1.ImageStream{ + ObjectMeta: metav1.ObjectMeta{ + Name: imageStreamName, + Namespace: mcoNamespace, + }, + } + + _, err := cs.ImageStreams(mcoNamespace).Create(context.TODO(), imageStreamConfig, metav1.CreateOptions{}) + if err != nil && !k8sErrors.IsAlreadyExists(err) { + // If we have an error and it is not an already exists error, something is wrong. + return err + } + + if k8sErrors.IsAlreadyExists(err) { + // We've already got an imagestream matching this name + imagestream, err := cs.ImageStreams(mcoNamespace).Get(context.TODO(), imageStreamName, metav1.GetOptions{}) + if err != nil { + return err + } + + // Lets see if it has an image we can use + if len(imagestream.Status.Tags) != 0 { + t.Logf("image already built, reusing!") + return nil + } + } + + baseImageBuildArg := fmt.Sprintf(imagePullSpec, getBaseImageTag()) + + t.Logf("imagestream %s created", imageStreamName) + t.Logf("using %s as the base image", baseImageBuildArg) + + // Create a new build + buildConfig := &buildv1.Build{ + ObjectMeta: metav1.ObjectMeta{ + Name: buildName, + }, + Spec: buildv1.BuildSpec{ + CommonSpec: buildv1.CommonSpec{ + Source: buildv1.BuildSource{ + Type: "Git", + Git: &buildv1.GitBuildSource{ + URI: "https://github.com/coreos/fcos-derivation-example", + Ref: "rhcos", + }, + }, + Strategy: buildv1.BuildStrategy{ + DockerStrategy: &buildv1.DockerBuildStrategy{ + BuildArgs: []corev1.EnvVar{ + { + Name: "RHEL_COREOS_IMAGE", + Value: baseImageBuildArg, + }, + }, + }, + }, + Output: buildv1.BuildOutput{ + To: &v1.ObjectReference{ + Kind: "ImageStreamTag", + Name: imageStreamName + ":latest", + }, + }, + }, + }, + } + + // Start our new build + _, err = cs.BuildV1Interface.Builds(mcoNamespace).Create(context.TODO(), buildConfig, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create build: %w", err) + } + + t.Log("base image derivation build started") + + build, err := cs.BuildV1Interface.Builds(mcoNamespace).Get(context.TODO(), buildConfig.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get build %s: %w", build.Name, err) + } + + // Wait for the build to complete + return waitForBuildToRun(t, cs, build) +} + +// Puts the pull secret for the MCD service account on the underlying node so rpm-ostree can use it. +// get ImagePullSecret for the MCD service account and save to authfilePath on the node +// eventually we should use rest.InClusterConfig() instead of cs with kubeadmin +func putPullSecretOnNode(t *testing.T, cs *framework.ClientSet, node *corev1.Node) func() { + t.Logf("placing image pull secret on node") + + // Get the MCD service account + mcdServiceAccount, err := cs.ServiceAccounts(mcoNamespace).Get(context.TODO(), "machine-config-daemon", metav1.GetOptions{}) + require.Nil(t, err) + require.Equal(t, 1, len(mcdServiceAccount.ImagePullSecrets)) + + // Get the image pull secret attached to the service account + imagePullSecret, err := cs.Secrets(mcoNamespace).Get(context.TODO(), mcdServiceAccount.ImagePullSecrets[0].Name, metav1.GetOptions{}) + require.Nil(t, err) + + // Extract the dockerconfig from the pull secret + dockerConfigData := imagePullSecret.Data[corev1.DockerConfigKey] + var dockerConfig credentialprovider.DockerConfig + err = json.Unmarshal(dockerConfigData, &dockerConfig) + require.Nil(t, err) + dockerConfigJSON := credentialprovider.DockerConfigJSON{ + Auths: dockerConfig, + } + authfileData, err := json.Marshal(dockerConfigJSON) + require.Nil(t, err) + + require.Nil(t, writeFileToNode(t, cs, node, authfileData, authfilePath)) + + return func() { + // Clean up the pull secret from the node if the node has not rebooted and we do not want to delete it. + t.Logf("cleaning up pull secret from node") + if _, err := ExecCmdOnNode(cs, node, []string{"rm", "-rf", path.Dir(authfilePath)}); err != nil { + t.Fatalf("could not remove pull secret from node: %s", err) + } + } +} + +// Writes a file to an arbitrary path on a node +func writeFileToNode(t *testing.T, cs *framework.ClientSet, node *corev1.Node, data []byte, dst string) error { + runner := NewNodeCmdRunner(cs, node, mcoNamespace) + + if _, err := runner.Run([]string{"mkdir", "-p", path.Dir(dst)}); err != nil { + return fmt.Errorf("could not create new directory (%s): %w", path.Dir(dst), err) + } + + runOpts := NodeCmdOpts{ + Command: []string{"tee", dst}, + Stdin: bytes.NewBuffer(data), + } + + if _, err := runner.RunWithOpts(runOpts); err != nil { + return fmt.Errorf("could not write to file (%s): %w", dst, err) + } + + return nil +} + +// Gets the status output of rpm-ostree. Note: At this time, we only retrieve a +// small subset of rpm-ostree's full status JSON +func getRPMOStreeStatus(t *testing.T, cs *framework.ClientSet, node *corev1.Node) (*Status, error) { + result, err := runRPMOstreeCmd(t, cs, node, []string{"rpm-ostree", "status", "--json"}) + if err != nil { + return nil, fmt.Errorf("could not get rpm-ostree status: %w", err) + } + + status := &Status{} + if err := json.Unmarshal(result.Stdout, &status); err != nil { + return nil, fmt.Errorf("could not parse rpm-ostree status: %w", err) + } + + return status, nil +} + +// Places the derived OS image onto the node using rpm-ostree. +func applyDerivedImage(t *testing.T, cs *framework.ClientSet, node *corev1.Node) error { + // rpm-ostree rebase --experimental ostree-unverified-image:docker://image-registry.openshift-image-registry.svc.cluster.local:5000/openshift-machine-config-operator/test-boot-in-cluster-image-build + + t.Log("placing the new OS image") + + rpmOSTreeCmd := []string{"rpm-ostree", "rebase", "--experimental", imageURL} + cmdResult, err := runRPMOstreeCmd(t, cs, node, rpmOSTreeCmd) + if err != nil { + return fmt.Errorf("could not place image on node: %w", err) + } + + t.Logf("new OS image placed in %v", cmdResult.Duration) + + return nil +} + +// Rolls back to the original OS image that was on the node. +func rollbackToOriginalImage(t *testing.T, cs *framework.ClientSet, node *corev1.Node) error { + // Apply the rollback to the previous image + t.Logf("rolling back to previous OS image") + + _, err := runRPMOstreeCmd(t, cs, node, []string{"rpm-ostree", "rollback"}) + if err != nil { + return fmt.Errorf("could not run rollback command: %w", err) + } + + return nil +} + +// Verifies that we're in the built image +func assertInDerivedImage(t *testing.T, cs *framework.ClientSet, node *corev1.Node) { + status, err := getRPMOStreeStatus(t, cs, node) + require.Nil(t, err) + checkUsingImage(t, cs, node, true, status) +} + +// Verifies that we are not in the built image +func assertNotInDerivedImage(t *testing.T, cs *framework.ClientSet, node *corev1.Node) { + status, err := getRPMOStreeStatus(t, cs, node) + require.Nil(t, err) + checkUsingImage(t, cs, node, false, status) +} + +// Performs the check that we're either in the derived image or not. +func checkUsingImage(t *testing.T, cs *framework.ClientSet, node *corev1.Node, usingImage bool, status *Status) { + // These files are placed on the node by the derived container build process. + expectedFiles := []string{ + "/usr/lib64/libpixman-1.so.0", + "/etc/systemd/system/hello-world.service", + helloWorldPath, + } + + for _, deployment := range status.Deployments { + if deployment.Booted { + if usingImage { + t.Logf("we expect that we're using the newly derived image") + // Check that our container image is as expected + assert.Equal(t, imageURL, deployment.ContainerImageReference) + // Check that we have the expected files on the node + assertNodeHasFiles(t, cs, node, expectedFiles) + // Run the hello world program that we built and placed on the node + result, err := ExecCmdOnNode(cs, node, []string{helloWorldPath}) + t.Logf("Running hello world command: \n%s", result.String()) + assert.Nil(t, err) + } else { + t.Logf("we expect that we're using the non-derived image") + assert.NotEqual(t, imageURL, deployment.ContainerImageReference) + assertNodeNotHasFiles(t, cs, node, expectedFiles) + } + } + } +} + +// Asserts that a node has files in the expected places. +func assertNodeHasFiles(t *testing.T, cs *framework.ClientSet, node *corev1.Node, files []string) { + for _, file := range files { + out, err := ExecCmdOnNode(cs, node, []string{"stat", file}) + require.Nil(t, err, "expected to find %s on node %s: %s", file, node.Name, out.String()) + t.Logf("found %s on node %s (this was as expected)", file, node.Name) + } +} + +// Asserts that a node does not have files in the expected places. +func assertNodeNotHasFiles(t *testing.T, cs *framework.ClientSet, node *corev1.Node, files []string) { + for _, file := range files { + out, err := ExecCmdOnNode(cs, node, []string{"stat", file}) + require.NotNil(t, err, "expected not to find %s on node %s:\n%s", file, node.Name, out.String()) + t.Logf("did not find %s on node %s (this was as expected)", file, node.Name) + } +} + +// Determines if a node is ready. +func isNodeReady(node *corev1.Node) bool { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady && condition.Status == "True" { + return true + } + } + + return false +} + +// RebootAndWait reboots a node, waits until it's status is ready. +func rebootAndWait(t *testing.T, cs *framework.ClientSet, node *corev1.Node) error { + t.Logf("rebooting %s", node.Name) + + // Get an updated node object since this one may potentially be out-of-date + updatedNode, err := cs.Nodes().Get(context.TODO(), node.ObjectMeta.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get updated node: %w", err) + } + + prevBootID := updatedNode.Status.NodeInfo.BootID + + if _, err := ExecCmdOnNode(cs, node, []string{"systemctl", "reboot"}); err != nil { + return fmt.Errorf("could not reboot node: %w", err) + } + + startTime := time.Now() + err = wait.Poll(2*time.Second, 10*time.Minute, func() (bool, error) { + n, err := cs.Nodes().Get(context.TODO(), node.ObjectMeta.Name, metav1.GetOptions{}) + require.Nil(t, err) + + if n.Status.NodeInfo.BootID != prevBootID { + return isNodeReady(n), nil + } + return false, nil + }) + + if err != nil { + return fmt.Errorf("node %q never rebooted (waited %s)", node.ObjectMeta.Name, time.Since(startTime)) + } + + t.Logf("node %q has rebooted (waited %s)", node.ObjectMeta.Name, time.Since(startTime)) + + return nil +} + +// Ensures that rpm-ostree is running by first interrogating its status from +// systemd, then tries to start it if it is not running. It will try to start +// rpm-ostree up to five times. +func ensureRPMOstreeIsRunning(t *testing.T, cs *framework.ClientSet, node *corev1.Node) error { + activeState := "ActiveState=active" + startLimitHit := "Result=start-limit-hit" + + runner := NewNodeCmdRunner(cs, node, mcoNamespace) + + runOpts := NodeCmdOpts{ + Command: []string{"systemctl", "show", "--no-page", "rpm-ostreed"}, + Retries: 5, + RetryCheckFunc: func(attempt int, cr *CmdResult, err error) bool { + // We ran into an error trying to get the rpm-ostreed status, retry. + if err != nil { + return false + } + + combined := string(cr.Stdout) + string(cr.Stderr) + + if strings.Contains(combined, activeState) && !strings.Contains(combined, startLimitHit) { + // We're good + return true + } + + if strings.Contains(combined, startLimitHit) { + t.Logf("rpm-ostree start limit hit, waiting 60 seconds") + time.Sleep(60 * time.Second) + } + + _, err = runner.RunWithOpts(NodeCmdOpts{ + Command: []string{"systemctl", "start", "rpm-ostreed"}, + Stderr: os.Stderr, + }) + + if err != nil { + // Give rpm-ostree time to restart before we retry the status check + t.Logf("started rpm-ostree") + time.Sleep(time.Second) + return false + } + + return false + }, + Stderr: os.Stderr, + } + + if _, err := runner.RunWithOpts(runOpts); err != nil { + return fmt.Errorf("failed to start rpm-ostree:\n%w", err) + } + + return nil +} + +// Runs an arbitrary rpm-ostree command after first ensuring that rpm-ostree is +// running. It will retry the command up to five times. +func runRPMOstreeCmd(t *testing.T, cs *framework.ClientSet, node *corev1.Node, cmd []string) (*CmdResult, error) { + if err := ensureRPMOstreeIsRunning(t, cs, node); err != nil { + return nil, fmt.Errorf("could not ensure that rpm-ostreed is running: %w", err) + } + + failureMsg := "Active: failed (Result: start-limit-hit)" + + runner := NewNodeCmdRunner(cs, node, mcoNamespace) + + runOpts := NodeCmdOpts{ + Command: cmd, + Retries: 5, + RetryCheckFunc: func(attempt int, cr *CmdResult, err error) bool { + if err != nil { + return false + } + + combinedOut := string(cr.Stdout) + string(cr.Stderr) + if strings.Contains(combinedOut, failureMsg) { + t.Logf("encountered start-limit-hit error, will wait and restart rpm-ostree, then try again") + ensureRPMOstreeIsRunning(t, cs, node) + return false + } + + t.Logf("ran '$ %s' successfully, took %s", strings.Join(cmd, " "), cr.Duration) + return true + }, + Stderr: os.Stderr, + } + + t.Logf("running: '$ %s'", strings.Join(cmd, " ")) + result, err := runner.RunWithOpts(runOpts) + if err != nil { + return nil, fmt.Errorf("could not run rpm-ostree command: %w", err) + } + + return result, nil +} + +func streamBuildLogs(t *testing.T, cs *framework.ClientSet, build *buildv1.Build) error { + // Configure our output writers for later use with an io.MultiWriter + outWriters := []io.Writer{} + + if buildLogFile != nil && *buildLogFile != "" { + t.Logf("writing build log to: %s", *buildLogFile) + + buildLog, err := os.Create(*buildLogFile) + if err != nil { + return fmt.Errorf("could not create image_build.log: %w", err) + } + defer buildLog.Close() + outWriters = append(outWriters, buildLog) + } + + if streamBuild != nil && *streamBuild { + t.Logf("streaming build logs to stdout") + outWriters = append(outWriters, os.Stdout) + } + + // We're not configured to stream any logs, so stop here. + if len(outWriters) == 0 { + t.Logf("not capturing build logs") + return nil + } + + // Wait for the build to start so we can get the underlying pod. + err := wait.Poll(2*time.Second, 5*time.Minute, func() (bool, error) { + b, err := cs.BuildV1Interface.Builds(build.Namespace).Get(context.TODO(), build.Name, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("could not get build: %w", err) + } + + return b.Status.Phase == buildv1.BuildPhaseRunning, nil + }) + + if err != nil { + return fmt.Errorf("build did not start within a reasonable amount of time") + } + + // Get updated build object + build, err = cs.BuildV1Interface.Builds(build.Namespace).Get(context.TODO(), build.Name, metav1.GetOptions{}) + buildPodName := build.Annotations[buildv1.BuildPodNameAnnotation] + + // Wait for build container to start + err = wait.Poll(2*time.Second, 1*time.Minute, func() (bool, error) { + // TODO: Find constant for this annotation name + buildPod, err := cs.Pods(build.Namespace).Get(context.TODO(), buildPodName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("could not get build pod for build %s: %w", buildName, err) + } + + return isPodReady(buildPod), nil + }) + + if err != nil { + return fmt.Errorf("build container did not start within a reasonable amount of time") + } + + // Get the build pod so we can stream its logs. + buildPod, err := cs.Pods(build.Namespace).Get(context.TODO(), buildPodName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get build pod for build %s: %w", buildName, err) + } + + podLogOpts := &corev1.PodLogOptions{ + Follow: true, + Container: buildPod.Spec.Containers[0].Name, + } + + // Get the io.Reader we read for the build logs + req := cs.Pods(buildPod.Namespace).GetLogs(buildPod.Name, podLogOpts) + podLogs, err := req.Stream(context.TODO()) + if err != nil { + return fmt.Errorf("could not stream build pod logs: %w", err) + } + + defer podLogs.Close() + + // Copy the contents of the io.Reader to our writers by using an + // io.MultiWriter + if _, err := io.Copy(io.MultiWriter(outWriters...), podLogs); err != nil { + return fmt.Errorf("could not stream build logs to stdout: %w", err) + } + + return nil +} + +// Waits for the build to run while simultaneously streaming the build output +// if either of the flags are set to do so. +func waitForBuildToRun(t *testing.T, cs *framework.ClientSet, build *buildv1.Build) error { + var wg sync.WaitGroup + wg.Add(2) + + var streamErr error = nil + var waitErr error = nil + + go func(to *testing.T) { + defer wg.Done() + waitErr = waitForBuild(to, cs, build) + }(t) + + go func(to *testing.T) { + defer wg.Done() + streamErr = streamBuildLogs(to, cs, build) + }(t) + + wg.Wait() + + return aggregatedErr.NewAggregate([]error{ + streamErr, + waitErr, + }) +} + +// Waits for a build to complete. +func waitForBuild(t *testing.T, cs *framework.ClientSet, build *buildv1.Build) error { + startTime := time.Now() + + err := wait.Poll(2*time.Second, 20*time.Minute, func() (bool, error) { + b, err := cs.BuildV1Interface.Builds(build.Namespace).Get(context.TODO(), build.Name, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("could not get build: %w", err) + } + + if b.Status.Phase == buildv1.BuildPhaseComplete { + return true, nil + } + + require.NotContains(t, []buildv1.BuildPhase{buildv1.BuildPhaseFailed, buildv1.BuildPhaseError, buildv1.BuildPhaseCancelled}, b.Status.Phase) + return false, nil + }) + + if err != nil { + return fmt.Errorf("build %q did not complete (waited %s)", build.Name, time.Since(startTime)) + } + + t.Logf("build %q has completed (waited %s)", build.Name, time.Since(startTime)) + + return nil +} + +// Deletes the machine using the OpenShift Machine API so that we don't land on +// the same machine if we're doing development on the test +func deleteMachineAndNode(t *testing.T, cs *framework.ClientSet, node *corev1.Node) { + machineID := node.Annotations["machine.openshift.io/machine"] + machineID = strings.ReplaceAll(machineID, "openshift-machine-api/", "") + + t.Logf("Deleting machine %s and node %s", machineID, node.Name) + kubeconfig, err := cs.GetKubeconfig() + require.Nil(t, err) + + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + require.Nil(t, err) + + client := machineClient.NewForConfigOrDie(config) + + require.Nil(t, client.MachineV1beta1().Machines("openshift-machine-api").Delete(context.TODO(), machineID, metav1.DeleteOptions{})) + require.Nil(t, cs.Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{})) +} + +// Gets a random node to use as a target for this test +func getRandomNode(cs *framework.ClientSet, role string) (*corev1.Node, error) { + nodes, err := helpers.GetNodesByRole(cs, role) + if err != nil { + return nil, err + } + + // #nosec + rand.Seed(time.Now().UnixNano()) + node := nodes[rand.Intn(len(nodes))] + + return &node, nil +} diff --git a/tests/layering/layering_test.go b/tests/layering/layering_test.go index c114dbe0..62f62d31 100644 --- a/tests/layering/layering_test.go +++ b/tests/layering/layering_test.go @@ -1,29 +1,20 @@ package e2e_test import ( - "bytes" "context" - "encoding/json" - "fmt" - "os" - "os/exec" - "path" - "strings" + "flag" "testing" - "time" - buildv1 "github.com/openshift/api/build/v1" - imagev1 "github.com/openshift/api/image/v1" "github.com/openshift/machine-config-operator/test/framework" - "github.com/openshift/machine-config-operator/test/helpers" "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/pkg/credentialprovider" ) +var deleteBuild = flag.Bool("delete-build", false, "Delete the derived build at the end of the test.") +var deleteMachine = flag.Bool("delete-machine", false, "Delete the target machine after test run.") +var streamBuild = flag.Bool("stream-build", false, "Stream the derived image build to stdout") +var buildLogFile = flag.String("build-log", "", "Filename to write the derived image build log to") + const ( imageStreamName = "test-boot-in-cluster-image" buildName = imageStreamName @@ -34,204 +25,124 @@ const ( ostreeUnverifiedRegistry = "ostree-unverified-registry" imageRegistry = "image-registry.openshift-image-registry.svc:5000" // If this moves from /run, make sure files get cleaned up - authfilePath = "/run/ostree/auth.json" - mcoNamespace = "openshift-machine-config-operator" - imagePullSpec = "registry.ci.openshift.org/rhcos-devel/rhel-coreos:%s" + authfilePath = "/run/ostree/auth.json" + mcoNamespace = "openshift-machine-config-operator" + imagePullSpec = "registry.ci.openshift.org/rhcos-devel/rhel-coreos:%s" + helloWorldPath = "/usr/bin/hello-world" + imageURL = ostreeUnverifiedRegistry + ":" + imageRegistry + "/" + mcoNamespace + "/" + imageStreamName ) type Deployments struct { - booted bool + Booted bool `json:"booted"` ContainerImageReference string `json:"container-image-reference"` } -type Status struct { - deployments []Deployments -} -func getImageTag() string { - branch, found := os.LookupEnv("BRANCH") - if !found || branch == "" { - return "latest" - } - - return strings.ReplaceAll(branch, "release-", "") +type Status struct { + Deployments []Deployments `json:"deployments"` } func TestBootInClusterImage(t *testing.T) { cs := framework.NewClientSet("") - - // create a new image stream - ctx := context.TODO() - imageStreamConfig := &imagev1.ImageStream{ - ObjectMeta: metav1.ObjectMeta{ - Name: imageStreamName, - Namespace: mcoNamespace, - }, - } - _, err := cs.ImageStreams(mcoNamespace).Create(ctx, imageStreamConfig, metav1.CreateOptions{}) + // Get a random node to run on. Note: We don't need to set the infra role + // because this test does not involve the MCO. + targetNode, err := getRandomNode(cs, "worker") require.Nil(t, err) - defer cs.ImageStreams(mcoNamespace).Delete(ctx, imageStreamName, metav1.DeleteOptions{}) - baseImageBuildArg := fmt.Sprintf(imagePullSpec, getImageTag()) + t.Logf("targeting node %s", targetNode.Name) - t.Logf("Imagestream %s created", imageStreamName) + // If the delete-build flag is used, delete the Build and ImageStream afterward. + if deleteBuild != nil && *deleteBuild == true { + defer func() { + t.Logf("Deleting the ImageStream") + require.Nil(t, cs.ImageStreams(mcoNamespace).Delete(context.TODO(), imageStreamName, metav1.DeleteOptions{})) + t.Logf("Deleting the Image Build") + require.Nil(t, cs.BuildV1Interface.Builds(mcoNamespace).Delete(context.TODO(), buildName, metav1.DeleteOptions{})) + }() + } - // push a build to the image stream - buildConfig := &buildv1.Build{ - ObjectMeta: metav1.ObjectMeta{ - Name: buildName, + canDeleteMachine := false + defer func() { + // Only if the image is applied should we delete + if canDeleteMachine { + if deleteMachine != nil && *deleteMachine == true { + deleteMachineAndNode(t, cs, targetNode) + } else { + t.Logf("leaving node %s behind, you need to clean it up manually", targetNode.Name) + } + } else { + t.Logf("node %s will not be deleted since it was not touched", targetNode.Name) + } + }() + + testCases := []struct { + name string + testFunc func(*testing.T) + }{ + { + name: "Derived Image Is Built", + testFunc: func(t *testing.T) { + if err := maybeBuildDerivedOSImage(t, cs); err != nil { + t.Fatal(err) + } + }, }, - Spec: buildv1.BuildSpec{ - CommonSpec: buildv1.CommonSpec{ - Source: buildv1.BuildSource{ - Type: "Git", - Git: &buildv1.GitBuildSource{ - URI: "https://github.com/coreos/fcos-derivation-example", - Ref: "rhcos", - }, - }, - Strategy: buildv1.BuildStrategy{ - DockerStrategy: &buildv1.DockerBuildStrategy{ - BuildArgs: []corev1.EnvVar{ - { - Name: "RHEL_COREOS_IMAGE", - Value: baseImageBuildArg, - }, - }, - }, - }, - Output: buildv1.BuildOutput{ - To: &v1.ObjectReference{ - Kind: "ImageStreamTag", - Name: imageStreamName + ":latest", - }, - }, + { + name: "Not In Derived Image", + testFunc: func(t *testing.T) { + assertNotInDerivedImage(t, cs, targetNode) }, }, - } - - t.Logf("Using %s as the base image", baseImageBuildArg) - - _, err = cs.BuildV1Interface.Builds(mcoNamespace).Create(ctx, buildConfig, metav1.CreateOptions{}) - require.Nil(t, err) - defer cs.BuildV1Interface.Builds(mcoNamespace).Delete(ctx, buildName, metav1.DeleteOptions{}) - - t.Logf("Build %s started", buildName) - waitForBuild(t, cs, buildConfig.ObjectMeta.Name) - t.Logf("Build completed!") - - // pick a random worker node - unlabelFunc := helpers.LabelRandomNodeFromPool(t, cs, "worker", "node-role.kubernetes.io/infra") - defer unlabelFunc() - infraNode := helpers.GetSingleNodeByRole(t, cs, "infra") - - t.Logf("Labeled node %s with infra", infraNode.Name) - - // get ImagePullSecret for the MCD service account and save to authfilePath on the node - // eventually we should use rest.InClusterConfig() instead of cs with kubeadmin - mcdServiceAccount, err := cs.ServiceAccounts(mcoNamespace).Get(ctx, "machine-config-daemon", metav1.GetOptions{}) - require.Nil(t, err) - require.Equal(t, 1, len(mcdServiceAccount.ImagePullSecrets)) - imagePullSecret, err := cs.Secrets(mcoNamespace).Get(ctx, mcdServiceAccount.ImagePullSecrets[0].Name, metav1.GetOptions{}) - dockerConfigData := imagePullSecret.Data[corev1.DockerConfigKey] - var dockerConfig credentialprovider.DockerConfig - err = json.Unmarshal(dockerConfigData, &dockerConfig) - require.Nil(t, err) - dockerConfigJSON := credentialprovider.DockerConfigJSON{ - Auths: dockerConfig, - } - authfileData, err := json.Marshal(dockerConfigJSON) - require.Nil(t, err) - helpers.ExecCmdOnNode(t, cs, infraNode, "mkdir", "-p", path.Dir(path.Join("/rootfs", authfilePath))) - // will get cleaned up on reboot since file is in /run - writeToMCDContainer(t, cs, infraNode, path.Join("/rootfs", authfilePath), authfileData) - - // rpm-ostree rebase --experimental ostree-unverified-image:docker://image-registry.openshift-image-registry.svc.cluster.local:5000/openshift-machine-config-operator/test-boot-in-cluster-image-build - imageURL := fmt.Sprintf("%s:%s/%s/%s", ostreeUnverifiedRegistry, imageRegistry, mcoNamespace, imageStreamName) - helpers.ExecCmdOnNode(t, cs, infraNode, "chroot", "/rootfs", "rpm-ostree", "rebase", "--experimental", imageURL) - // reboot - rebootAndWait(t, cs, infraNode) - // check that new image is used - checkUsingImage := func(usingImage bool) { - status := helpers.ExecCmdOnNode(t, cs, infraNode, "chroot", "/rootfs", "rpm-ostree", "status", "--json") - var statusJSON Status - err = json.Unmarshal([]byte(status), &statusJSON) - require.Nil(t, err) - for _, deployment := range statusJSON.deployments { - if deployment.booted { - if usingImage { - require.Equal(t, imageURL, deployment.ContainerImageReference) - } else { - require.NotEqual(t, imageURL, deployment.ContainerImageReference) + { + name: "Boots Into Derived Image", + testFunc: func(t *testing.T) { + nodeRebooted := false + deletePullSecret := putPullSecretOnNode(t, cs, targetNode) + defer func() { + // The pull secret should be cleared from the node upon reboot since + // it is placed into the /run directory. However, if the node does + // not reboot and we're not deleting it, we should clean it up. + if !nodeRebooted && (deleteMachine == nil || *deleteMachine == false) { + deletePullSecret() + } + }() + + // From this point on, we want to delete the underlying machine if the + // delete-machine flag is used. This is because: + // - If we failed to apply the OS update, the node could be in an + // inconsistent state. + // - If we were successful in applying the OS update, we still want to + // delete the node afterweard so that if the test is re-run, it will + // not target the same node. + canDeleteMachine = true + if err := applyDerivedImage(t, cs, targetNode); err != nil { + t.Fatal(err) } - } - } - } - checkUsingImage(true) - // rollback - helpers.ExecCmdOnNode(t, cs, infraNode, "chroot", "/rootfs", "rpm-ostree", "rollback") - rebootAndWait(t, cs, infraNode) - checkUsingImage(false) -} - -// WriteToNode finds a node's mcd and writes a file over oc rsh's stdin -// filename should include /rootfs to write to node filesystem -func writeToMCDContainer(t *testing.T, cs *framework.ClientSet, node corev1.Node, filename string, data []byte) { - mcd, err := helpers.MCDForNode(cs, &node) - require.Nil(t, err) - mcdName := mcd.ObjectMeta.Name - - entryPoint := "oc" - args := []string{"rsh", - "-n", "openshift-machine-config-operator", - "-c", "machine-config-daemon", - mcdName, - "tee", filename, - } - cmd := exec.Command(entryPoint, args...) - cmd.Stderr = os.Stderr - cmd.Stdin = bytes.NewReader(data) + if err := rebootAndWait(t, cs, targetNode); err != nil { + t.Fatal(err) + } - out, err := cmd.Output() - require.Nil(t, err, "failed to write data to file %q on node %s: %s", filename, node.Name, string(out)) -} + nodeRebooted = true + assertInDerivedImage(t, cs, targetNode) + }, + }, + { + name: "Rolls Back To Original Image", + testFunc: func(t *testing.T) { + if err := rollbackToOriginalImage(t, cs, targetNode); err != nil { + t.Fatal(err) + } -// RebootAndWait reboots a node and then waits until the node has rebooted and its status is again Ready -func rebootAndWait(t *testing.T, cs *framework.ClientSet, node corev1.Node) { - updatedNode, err := cs.Nodes().Get(context.TODO(), node.ObjectMeta.Name, metav1.GetOptions{}) - require.Nil(t, err) - prevBootID := updatedNode.Status.NodeInfo.BootID - helpers.ExecCmdOnNode(t, cs, node, "chroot", "/rootfs", "systemctl", "reboot") - startTime := time.Now() - if err := wait.Poll(2*time.Second, 20*time.Minute, func() (bool, error) { - node, err := cs.Nodes().Get(context.TODO(), node.ObjectMeta.Name, metav1.GetOptions{}) - require.Nil(t, err) - if node.Status.NodeInfo.BootID != prevBootID { - for _, condition := range node.Status.Conditions { - if condition.Type == corev1.NodeReady && condition.Status == "True" { - return true, nil + if err := rebootAndWait(t, cs, targetNode); err != nil { + t.Fatal(err) } - } - } - return false, nil - }); err != nil { - require.Nil(t, err, "node %q never rebooted (waited %s)", node.ObjectMeta.Name, time.Since(startTime)) + + assertNotInDerivedImage(t, cs, targetNode) + }, + }, } - t.Logf("node %q has rebooted (waited %s)", node.ObjectMeta.Name, time.Since(startTime)) -} -func waitForBuild(t *testing.T, cs *framework.ClientSet, build string) { - startTime := time.Now() - if err := wait.Poll(2*time.Second, 20*time.Minute, func() (bool, error) { - build, err := cs.BuildV1Interface.Builds("openshift-machine-config-operator").Get(context.TODO(), build, metav1.GetOptions{}) - require.Nil(t, err) - if build.Status.Phase == "Complete" { - return true, nil - } - require.NotContains(t, []string{"Failed", "Error", "Cancelled"}, build.Status.Phase) - return false, nil - }); err != nil { - require.Nil(t, err, "build %q did not complete (waited %s)", build, time.Since(startTime)) + for i := range testCases { + t.Run(testCases[i].name, testCases[i].testFunc) } - t.Logf("build %q has completed (waited %s)", build, time.Since(startTime)) } diff --git a/tests/layering/node_cmd_test.go b/tests/layering/node_cmd_test.go new file mode 100644 index 00000000..afdbe416 --- /dev/null +++ b/tests/layering/node_cmd_test.go @@ -0,0 +1,436 @@ +package e2e_test + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "time" + + "github.com/openshift/machine-config-operator/test/framework" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" +) + +// An error that may occur when executing a command +type NodeCmdError struct { + CmdResult + execErr error +} + +func (n *NodeCmdError) Error() string { + return fmt.Sprintf("error: %s\n%s", n.execErr, n.String()) +} + +func (n *NodeCmdError) Unwrap() error { + return n.execErr +} + +// Contains the output of a command +type CmdResult struct { + Command []string + Stdin []byte + Stdout []byte + Stderr []byte + Duration time.Duration +} + +func (c *CmdResult) String() string { + sb := &strings.Builder{} + + if len(c.Command) != 0 { + fmt.Fprintln(sb, "command:", strings.Join(c.Command, " ")) + } + + if len(c.Stdin) != 0 { + fmt.Fprintln(sb, "stdin:") + if _, err := sb.Write(c.Stdin); err != nil { + panic(err) + } + } else { + fmt.Fprintln(sb, "stdin: ") + } + + if len(c.Stdout) != 0 { + fmt.Fprintln(sb, "stdout:") + if _, err := sb.Write(c.Stdout); err != nil { + panic(err) + } + } else { + fmt.Fprintln(sb, "stdout: ") + } + + if len(c.Stderr) != 0 { + fmt.Fprintln(sb, "stderr:") + if _, err := sb.Write(c.Stderr); err != nil { + panic(err) + } + } else { + fmt.Fprintln(sb, "stderr: ") + } + + fmt.Fprintf(sb, "took %v\n", c.Duration) + + return sb.String() +} + +// Options to use while executing a command on a node +type NodeCmdOpts struct { + // The actual command itself + Command []string + // How many retries, if any + Retries int + // A function that determines if a retry is successful + RetryCheckFunc func(int, *CmdResult, error) bool + // The stdin to give to the command (optional) + Stdin io.Reader + // The stdout emitted from the command (optional) + // Useful for capturing stdout someplace else + Stdout io.Writer + // The stderr emitted from the command (optional) + // Useful for capturing stderr someplace else + Stderr io.Writer +} + +// Holds the command runner implementation +type NodeCmdRunner struct { + node *corev1.Node + clientSet *framework.ClientSet + namespace string +} + +// One-shot that execs a command on an arbitrary node in the default namespace +func ExecCmdOnNode(cs *framework.ClientSet, node *corev1.Node, cmd []string) (*CmdResult, error) { + return NewNodeCmdRunner(cs, node, mcoNamespace).Run(cmd) +} + +// Creates a reusable command runner object +func NewNodeCmdRunner(cs *framework.ClientSet, node *corev1.Node, namespace string) *NodeCmdRunner { + return &NodeCmdRunner{ + node: node, + clientSet: cs, + namespace: namespace, + } +} + +// Runs the command without additional options +func (n *NodeCmdRunner) Run(cmd []string) (*CmdResult, error) { + return n.RunWithOpts(NodeCmdOpts{Command: cmd}) +} + +// Runs the command with the additional options including retrying +func (n *NodeCmdRunner) RunWithOpts(runOpts NodeCmdOpts) (*CmdResult, error) { + return n.runAndMaybeRetry(runOpts) +} + +// Creates a pod on the target node in the given namespace to run the command in. +func (n *NodeCmdRunner) createCmdPod() (*corev1.Pod, error) { + containerName := "cmd-container" + + var user int64 = 0 + privileged := true + hostPathDirectoryType := corev1.HostPathDirectory + + // This PodSpec was largely cribbed from the output of + // $ oc debug node/ -o yaml + cmdPodSpec := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "cmd-pod-", + Namespace: n.namespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: containerName, + Command: []string{ + "/bin/sh", + }, + Image: "image-registry.openshift-image-registry.svc:5000/openshift/tools:latest", + SecurityContext: &corev1.SecurityContext{ + Privileged: &privileged, + RunAsUser: &user, + }, + Stdin: true, + StdinOnce: true, + TTY: true, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "host", + MountPath: "/host", + }, + }, + }, + }, + HostNetwork: true, + HostPID: true, + NodeName: n.node.Name, + RestartPolicy: corev1.RestartPolicyNever, + Volumes: []corev1.Volume{ + { + Name: "host", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/", + Type: &hostPathDirectoryType, + }, + }, + }, + }, + }, + } + + return n.clientSet.Pods(cmdPodSpec.Namespace).Create(context.TODO(), cmdPodSpec, metav1.CreateOptions{}) +} + +// Waits for the pod to become ready so we can exec into it. The long timeout +// is needed for when the pod takes longer to be scheduled on the node +// post-reboot. +func (n *NodeCmdRunner) waitForCmdPodToBeReady(pod *corev1.Pod) (*corev1.Pod, error) { + err := wait.Poll(1*time.Second, 3*time.Minute, func() (bool, error) { + p, err := n.clientSet.Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + if p == nil { + return false, nil + } + + return isPodReady(p), nil + }) + + if err != nil { + return nil, fmt.Errorf("timed out while creating command pod: %w", err) + } + + return n.clientSet.Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) +} + +// Creates a new command pod and waits until it is ready. +func (n *NodeCmdRunner) getCmdPodAndWait() (*corev1.Pod, error) { + cmdPod, err := n.createCmdPod() + if err != nil { + return nil, fmt.Errorf("could not create command pod: %w", err) + } + + return n.waitForCmdPodToBeReady(cmdPod) +} + +// Runs the command, optionally retrying for as many times as is necessary. +// This will create a new pod, exec into it to run the command, then terminate +// the pod at the very end. If no RetryCheckFunc is provided, the default will +// be to run until the command no longer returns an error. +func (n *NodeCmdRunner) runAndMaybeRetry(runOpts NodeCmdOpts) (*CmdResult, error) { + // We can't run this command. + if len(runOpts.Command) == 0 { + return nil, fmt.Errorf("zero-length command passed") + } + + // Gets a pod and waits for it to be ready. + cmdPod, err := n.getCmdPodAndWait() + if err != nil { + return nil, fmt.Errorf("could not create command pod: %w", err) + } + + defer func() { + // Delete the pod when we're finished. + n.clientSet.Pods(cmdPod.Namespace).Delete(context.TODO(), cmdPod.Name, metav1.DeleteOptions{}) + }() + + // We don't have any retries, so just run the command. + if runOpts.Retries <= 1 { + return n.run(runOpts, cmdPod) + } + + // Default is to keep retrying until we no longer get an error + retryFunc := func(_ int, _ *CmdResult, runErr error) bool { + return runErr == nil + } + + if runOpts.RetryCheckFunc != nil { + retryFunc = runOpts.RetryCheckFunc + } + + var result *CmdResult = nil + + // Retry the command for the specified retries. + for i := 1; i <= runOpts.Retries; i++ { + runResult, runErr := n.run(runOpts, cmdPod) + if retryFunc(i, runResult, runErr) { + return runResult, nil + } + } + + return result, fmt.Errorf("max retries (%d) reached", runOpts.Retries) +} + +// Actually runs the command via an exec. This implementation was mostly +// cribbed from +// https://github.com/kubernetes/kubectl/blob/master/pkg/cmd/exec/exec.go +func (n *NodeCmdRunner) run(runOpts NodeCmdOpts, cmdPod *corev1.Pod) (*CmdResult, error) { + restClient := n.clientSet.CoreV1Interface.RESTClient() + + execOpts := &corev1.PodExecOptions{ + Container: cmdPod.Spec.Containers[0].Name, + Command: getCommandToRun(runOpts.Command), + Stdin: runOpts.Stdin != nil, + Stdout: true, + Stderr: true, + TTY: false, + } + + req := restClient.Post(). + Resource("pods"). + Name(cmdPod.Name). + Namespace(cmdPod.Namespace). + SubResource("exec") + + req.VersionedParams(execOpts, scheme.ParameterCodec) + + // TODO: Figure out a better way to get the config from our clientset than + // having to read it back in. + kubeconfig, err := n.clientSet.GetKubeconfig() + if err != nil { + return nil, fmt.Errorf("could not get kubeconfig: %w", err) + } + + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, fmt.Errorf("could not get config: %w", err) + } + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return nil, fmt.Errorf("could not get command executor: %w", err) + } + + stdinBuf := bytes.NewBuffer([]byte{}) + stdoutBuf, stdout := getWriterAndBuffer(runOpts.Stdout) + stderrBuf, stderr := getWriterAndBuffer(runOpts.Stderr) + + streamOpts := remotecommand.StreamOptions{ + Stdout: stdout, + Stderr: stderr, + Tty: false, + } + + // Wire in stdin using io.TeeReader so its contents are available in the result object. + if runOpts.Stdin != nil { + streamOpts.Stdin = io.TeeReader(runOpts.Stdin, stdinBuf) + } + + // Run the actual command + start := time.Now() + err = exec.Stream(streamOpts) + end := time.Since(start) + + results := CmdResult{ + Command: runOpts.Command, + Duration: end, + Stdin: stdinBuf.Bytes(), + Stdout: stdoutBuf.Bytes(), + Stderr: stderrBuf.Bytes(), + } + + if err != nil { + err = &NodeCmdError{ + CmdResult: results, + execErr: err, + } + } + + return &results, err +} + +// Creates a new buffer and io.MultiWriter so we can collect stdin / stderr to +// multiple places simultaneously. +func getWriterAndBuffer(w io.Writer) (*bytes.Buffer, io.Writer) { + buf := bytes.NewBuffer([]byte{}) + if w == nil { + return buf, buf + } + + return buf, io.MultiWriter(buf, w) +} + +// Prepends chroot /host onto the command we want to run. +func getCommandToRun(cmd []string) []string { + if strings.HasPrefix(strings.Join(cmd, " "), "chroot /host") { + return cmd + } + + return append([]string{"chroot", "/host"}, cmd...) +} + +// Determines if the command pod is ready. These checks might be a little +// heavy-handed, but they work well. +func isPodReady(pod *corev1.Pod) bool { + // Check that the pod is not in the running phase + if pod.Status.Phase != corev1.PodRunning { + return false + } + + // Check all the pod conditions + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodInitialized && condition.Status != "True" { + return false + } + + if condition.Type == corev1.PodScheduled && condition.Status != "True" { + return false + } + + if condition.Type == corev1.PodReady && condition.Status != "True" { + return false + } + + if condition.Type == corev1.ContainersReady && condition.Status != "True" { + return false + } + } + + // Check all the pod container statuses + for _, status := range pod.Status.ContainerStatuses { + // The container status is still waiting + if status.State.Waiting != nil { + return false + } + + // The container was terminated + if status.State.Terminated != nil { + return false + } + + // The container isn't running + if status.State.Running == nil { + return false + } + + // This is nil, meaning we haven't started yet + if status.Started == nil { + return false + } + + // We haven't started the container yet + if *status.Started != true { + return false + } + + // We started the container but we're not ready yet + if status.Ready != true { + return false + } + } + + // If we've made it here, the pod is ready. + return true +}