Skip to content

Commit

Permalink
fix: record instrumented application in all cases (#1416)
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir authored Aug 4, 2024
1 parent feeec6f commit d201f50
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,80 +19,33 @@ package instrumentationdevice
import (
"context"

"github.com/go-logr/logr"
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// CollectorsGroupReconciler reconciles a CollectorsGroup object
type CollectorsGroupReconciler struct {
client.Client
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=odigos.io,resources=collectorsgroups,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=odigos.io,resources=collectorsgroups/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=odigos.io,resources=collectorsgroups/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the CollectorsGroup object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *CollectorsGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
if isDataCollectionReady(ctx, r.Client) {
logger.V(0).Info("data collection is ready, instrumenting selected applications")
var instApps odigosv1.InstrumentedApplicationList
if err := r.List(ctx, &instApps); err != nil {
logger.Error(err, "failed to list InstrumentedApps")
return ctrl.Result{}, err
}

for _, instApp := range instApps.Items {
err := instrument(logger, ctx, r.Client, &instApp)
if err != nil {
logger.Error(err, "failed to instrument application", "application", instApp.Name, "namespace", instApp.Namespace)
return ctrl.Result{}, err
}
}
} else {
err := r.removeAllInstrumentations(ctx, logger)
if err != nil {
logger.Error(err, "failed to remove instrumentations")
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}
isDataCollectionReady := isDataCollectionReady(ctx, r.Client)

func (r *CollectorsGroupReconciler) removeAllInstrumentations(ctx context.Context, logger logr.Logger) error {
var instApps odigosv1.InstrumentedApplicationList
if err := r.List(ctx, &instApps); err != nil {
return err
return ctrl.Result{}, err
}

for _, instApp := range instApps.Items {
name, kind, err := workload.GetWorkloadInfoRuntimeName(instApp.Name)
if err != nil {
return err
}

err = uninstrument(logger, ctx, r.Client, instApp.Namespace, name, kind, UnInstrumentReasonRemoveAll)
for _, runtimeDetails := range instApps.Items {
err := reconcileSingleWorkload(ctx, r.Client, &runtimeDetails, isDataCollectionReady)
if err != nil {
logger.Error(err, "failed to remove instrumentation", "application", name, "namespace", instApp.Namespace, "kind", kind)
return err
return ctrl.Result{}, err
}
}

return nil
return ctrl.Result{}, nil
}
87 changes: 62 additions & 25 deletions instrumentor/controllers/instrumentationdevice/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"

"github.com/go-logr/logr"
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/common/consts"
"github.com/odigos-io/odigos/instrumentor/instrumentation"
Expand All @@ -13,19 +12,19 @@ import (
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type UnInstrumentReason string
type ApplyInstrumentationDeviceReason string

const (
UnInstrumentReasonDataCollectionNotReady UnInstrumentReason = "DataCollection not ready"
UnInstrumentReasonNoRuntimeDetails UnInstrumentReason = "No runtime details"
UnInstrumentReasonRemoveAll UnInstrumentReason = "Remove all"
ApplyInstrumentationDeviceReasonDataCollectionNotReady ApplyInstrumentationDeviceReason = "DataCollectionNotReady"
ApplyInstrumentationDeviceReasonNoRuntimeDetails ApplyInstrumentationDeviceReason = "NoRuntimeDetails"
ApplyInstrumentationDeviceReasonErrApplying ApplyInstrumentationDeviceReason = "ErrApplyingInstrumentationDevice"
ApplyInstrumentationDeviceReasonErrRemoving ApplyInstrumentationDeviceReason = "ErrRemovingInstrumentationDevice"
)

const (
Expand Down Expand Up @@ -65,8 +64,10 @@ func isDataCollectionReady(ctx context.Context, c client.Client) bool {
return false
}

func instrument(logger logr.Logger, ctx context.Context, kubeClient client.Client, runtimeDetails *odigosv1.InstrumentedApplication) error {
obj, err := getTargetObject(ctx, kubeClient, runtimeDetails)
func addInstrumentationDeviceToWorkload(ctx context.Context, kubeClient client.Client, runtimeDetails *odigosv1.InstrumentedApplication) error {

logger := log.FromContext(ctx)
obj, err := getWorkloadObject(ctx, kubeClient, runtimeDetails)
if err != nil {
return err
}
Expand All @@ -87,36 +88,30 @@ func instrument(logger logr.Logger, ctx context.Context, kubeClient client.Clien
})

if err != nil {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionFalse, appliedInstrumentationDeviceType, "ErrApplyInstrumentationDevice", err.Error())
return err
}
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionTrue, appliedInstrumentationDeviceType, string(result), "Successfully applied instrumentation device to pod template")

if result != controllerutil.OperationResultNone {
logger.V(0).Info("instrumented application", "name", obj.GetName(), "namespace", obj.GetNamespace())
modified := result != controllerutil.OperationResultNone
if modified {
logger.V(0).Info("added instrumentation device to workload", "name", obj.GetName(), "namespace", obj.GetNamespace())
}

return nil
}

func uninstrument(logger logr.Logger, ctx context.Context, kubeClient client.Client, namespace string, name string, kind string, reason UnInstrumentReason) error {
obj, err := getObjectFromKindString(kind)
func removeInstrumentationDeviceFromWorkload(ctx context.Context, kubeClient client.Client, namespace string, workloadKind string, workloadName string, uninstrumentReason ApplyInstrumentationDeviceReason) error {

obj, err := getObjectFromKindString(workloadKind)
if err != nil {
logger.Error(err, "error getting object from kind string")
return err
}

err = kubeClient.Get(ctx, client.ObjectKey{
Namespace: namespace,
Name: name,
Name: workloadName,
}, obj)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}

logger.Error(err, "error getting object")
return err
return client.IgnoreNotFound(err)
}

result, err := controllerutil.CreateOrPatch(ctx, kubeClient, obj, func() error {
Expand All @@ -140,14 +135,16 @@ func uninstrument(logger logr.Logger, ctx context.Context, kubeClient client.Cli
return err
}

if result != controllerutil.OperationResultNone {
logger.V(0).Info("uninstrumented application", "name", obj.GetName(), "namespace", obj.GetNamespace(), "reason", reason)
modified := result != controllerutil.OperationResultNone
if modified {
logger := log.FromContext(ctx)
logger.V(0).Info("removed instrumentation device from workload", "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind(), "name", obj.GetName(), "reason", uninstrumentReason)
}

return nil
}

func getTargetObject(ctx context.Context, kubeClient client.Client, runtimeDetails *odigosv1.InstrumentedApplication) (client.Object, error) {
func getWorkloadObject(ctx context.Context, kubeClient client.Client, runtimeDetails *odigosv1.InstrumentedApplication) (client.Object, error) {
name, kind, err := workload.GetWorkloadInfoRuntimeName(runtimeDetails.Name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -194,3 +191,43 @@ func getObjectFromKindString(kind string) (client.Object, error) {
return nil, errors.New("unknown kind")
}
}

// reconciles a single workload, which might be triggered by a change in multiple resources.
// each time a relevant resource changes, this function is called to reconcile the workload
// and always writes the status into the InstrumentedApplication CR
func reconcileSingleWorkload(ctx context.Context, kubeClient client.Client, runtimeDetails *odigosv1.InstrumentedApplication, isNodeCollectorReady bool) error {

workloadName, workloadKind, err := workload.GetWorkloadInfoRuntimeName(runtimeDetails.Name)
if err != nil {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionFalse, appliedInstrumentationDeviceType, string(ApplyInstrumentationDeviceReasonErrRemoving), err.Error())
return err
}

if !isNodeCollectorReady {
err := removeInstrumentationDeviceFromWorkload(ctx, kubeClient, runtimeDetails.Namespace, workloadKind, workloadName, ApplyInstrumentationDeviceReasonDataCollectionNotReady)
if err == nil {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionFalse, appliedInstrumentationDeviceType, string(ApplyInstrumentationDeviceReasonDataCollectionNotReady), "OpenTelemetry pipeline not yet ready to receive data")
} else {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionFalse, appliedInstrumentationDeviceType, string(ApplyInstrumentationDeviceReasonErrRemoving), err.Error())
}
return err
}

if len(runtimeDetails.Spec.RuntimeDetails) == 0 {
err := removeInstrumentationDeviceFromWorkload(ctx, kubeClient, runtimeDetails.Namespace, workloadKind, workloadName, ApplyInstrumentationDeviceReasonNoRuntimeDetails)
if err == nil {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionFalse, appliedInstrumentationDeviceType, string(ApplyInstrumentationDeviceReasonNoRuntimeDetails), "No runtime details found")
} else {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionFalse, appliedInstrumentationDeviceType, string(ApplyInstrumentationDeviceReasonErrRemoving), err.Error())
}
return err
}

err = addInstrumentationDeviceToWorkload(ctx, kubeClient, runtimeDetails)
if err == nil {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionTrue, appliedInstrumentationDeviceType, "InstrumentationDeviceApplied", "Instrumentation device applied successfully")
} else {
conditions.UpdateStatusConditions(ctx, kubeClient, runtimeDetails, &runtimeDetails.Status.Conditions, metav1.ConditionFalse, appliedInstrumentationDeviceType, string(ApplyInstrumentationDeviceReasonErrApplying), err.Error())
}
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package instrumentationdevice
import (
"context"

"github.com/go-logr/logr"
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -35,85 +34,33 @@ type InstrumentedApplicationReconciler struct {
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=odigos.io,resources=instrumentedapplications,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=odigos.io,resources=instrumentedapplications/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=odigos.io,resources=instrumentedapplications/finalizers,verbs=update
//+kubebuilder:rbac:groups=odigos.io,resources=odigosconfigurations,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods/status,verbs=get;update;patch

// Reconcile is responsible for instrumenting deployment/statefulset/daemonset. In order for instrumentation to happen two things must be true:
// 1. InstrumentedApplication must have at least one language specified
// 2. Data collection pods must be running (DataCollection CollectorsGroup .status.ready == true)
func (r *InstrumentedApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

var runtimeDetails odigosv1.InstrumentedApplication
err := r.Client.Get(ctx, req.NamespacedName, &runtimeDetails)
if err != nil {
if client.IgnoreNotFound(err) != nil {
logger.Error(err, "error fetching instrumented application")

if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}

// runtime details deleted: remove instrumentation from resource requests
err = removeInstrumentation(logger, ctx, r.Client, req.NamespacedName, UnInstrumentReasonNoRuntimeDetails)
workloadName, workloadKind, err := workload.GetWorkloadInfoRuntimeName(req.Name)
if err != nil {
logger.Error(err, "error removing instrumentation")
logger.Error(err, "error parsing workload info from runtime object name")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

err = reconcileSingleInstrumentedApplication(ctx, r.Client, &runtimeDetails)
return ctrl.Result{}, err
}

// this function is extracted so we can call it from other reconcilers like when odigos config changes
func reconcileSingleInstrumentedApplication(ctx context.Context, kubeClient client.Client, runtimeDetails *odigosv1.InstrumentedApplication) error {
logger := log.FromContext(ctx)

runtimeDetailsNamespacedName := client.ObjectKeyFromObject(runtimeDetails)

if len(runtimeDetails.Spec.RuntimeDetails) == 0 {
err := removeInstrumentation(logger, ctx, kubeClient, runtimeDetailsNamespacedName, UnInstrumentReasonNoRuntimeDetails)
err = removeInstrumentationDeviceFromWorkload(ctx, r.Client, req.Namespace, workloadKind, workloadName, ApplyInstrumentationDeviceReasonNoRuntimeDetails)
if err != nil {
logger.Error(err, "error removing instrumentation")
return err
}

return nil
}

if !isDataCollectionReady(ctx, kubeClient) {
err := removeInstrumentation(logger, ctx, kubeClient, runtimeDetailsNamespacedName, UnInstrumentReasonDataCollectionNotReady)
if err != nil {
logger.Error(err, "error removing instrumentation")
return err
}
} else {
err := instrument(logger, ctx, kubeClient, runtimeDetails)
if err != nil {
logger.Error(err, "error instrumenting")
return err
return ctrl.Result{}, err
}
}

return nil
}

func removeInstrumentation(logger logr.Logger, ctx context.Context, kubeClient client.Client, instrumentedApplicationName types.NamespacedName, reason UnInstrumentReason) error {
name, kind, err := workload.GetWorkloadInfoRuntimeName(instrumentedApplicationName.Name)
if err != nil {
return err
}

err = uninstrument(logger, ctx, kubeClient, instrumentedApplicationName.Namespace, name, kind, reason)
if err != nil {
logger.Error(err, "error removing instrumentation")
return err
return ctrl.Result{}, nil
}

return nil
isNodeCollectorReady := isDataCollectionReady(ctx, r.Client)
err = reconcileSingleWorkload(ctx, r.Client, &runtimeDetails, isNodeCollectorReady)
return ctrl.Result{}, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ func (r *OdigosConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, err
}

isNodeCollectorReady := isDataCollectionReady(ctx, r.Client)
logger.Info("reconciling all instrumented applications on odigos config change", "numInstrumentedApplications", len(instrumentedApplications.Items))

for _, instrumentedApplication := range instrumentedApplications.Items {
err = reconcileSingleInstrumentedApplication(ctx, r.Client, &instrumentedApplication)
err = reconcileSingleWorkload(ctx, r.Client, &instrumentedApplication, isNodeCollectorReady)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func reconcileSingleInstrumentedApplicationByName(ctx context.Context, k8sClient
var instrumentedApplication odigosv1.InstrumentedApplication
err := k8sClient.Get(ctx, types.NamespacedName{Name: instrumentedAppName, Namespace: namespace}, &instrumentedApplication)
if err != nil {
// changes in workload when there is no instrumented application is not interesting
return client.IgnoreNotFound(err)
}
return reconcileSingleInstrumentedApplication(ctx, k8sClient, &instrumentedApplication)
isNodeCollectorReady := isDataCollectionReady(ctx, k8sClient)
return reconcileSingleWorkload(ctx, k8sClient, &instrumentedApplication, isNodeCollectorReady)
}
2 changes: 1 addition & 1 deletion instrumentor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/odigos-io/odigos/instrumentor
go 1.22.0

require (
github.com/go-logr/logr v1.4.2
github.com/google/uuid v1.6.0
github.com/odigos-io/odigos/api v0.0.0
github.com/odigos-io/odigos/common v0.0.0
Expand All @@ -20,6 +19,7 @@ require (
github.com/agoda-com/opentelemetry-logs-go v0.4.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/fatih/color v1.10.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-yaml v1.11.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
Expand Down
Loading

0 comments on commit d201f50

Please sign in to comment.