Skip to content

Commit

Permalink
Replace cache.ListWatch with informers.NewSharedInformerFactory
Browse files Browse the repository at this point in the history
Signed-off-by: Davanum Srinivas <[email protected]>
  • Loading branch information
dims committed Jan 13, 2025
1 parent d03e737 commit 328f503
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 198 deletions.
51 changes: 29 additions & 22 deletions internal/aws/k8s/k8sclient/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -44,18 +41,24 @@ type daemonSetClient struct {
stopChan chan struct{}
stopped bool

store *ObjStore
store *ObjStore
informer cache.SharedIndexInformer

syncChecker initialSyncChecker

mu sync.RWMutex
daemonSetInfos []*DaemonSetInfo
logger *zap.Logger
}

func (d *daemonSetClient) refresh() {
d.mu.Lock()
defer d.mu.Unlock()

if !cache.WaitForCacheSync(d.stopChan, d.informer.HasSynced) {
d.logger.Warn("Daemonset informer cache sync timeout")
}

var daemonSetInfos []*DaemonSetInfo
objsList := d.store.List()
for _, obj := range objsList {
Expand Down Expand Up @@ -87,26 +90,23 @@ func newDaemonSetClient(clientSet kubernetes.Interface, logger *zap.Logger, opti
option(d)
}

ctx := context.Background()
if _, err := clientSet.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
return nil, fmt.Errorf("cannot list DaemonSets. err: %w", err)
}

d.store = NewObjStore(transformFuncDaemonSet, logger)
lw := createDaemonSetListWatch(clientSet, metav1.NamespaceAll)
reflector := cache.NewReflector(lw, &appsv1.DaemonSet{}, d.store, 0)
d.logger = logger

go reflector.Run(d.stopChan)
d.informer = createSharedDaemonsetsInformer(clientSet, d.store)
go d.informer.Run(d.stopChan)

if d.syncChecker != nil {
// check the init sync for potential connection issue
d.syncChecker.Check(reflector, "DaemonSet initial sync timeout")
d.syncChecker.Check(d.informer, "DaemonSet initial sync timeout")
}

return d, nil
}

func (d *daemonSetClient) shutdown() {
d.mu.Lock()
defer d.mu.Unlock()
close(d.stopChan)
d.stopped = true
}
Expand All @@ -128,14 +128,21 @@ func transformFuncDaemonSet(obj any) (any, error) {
return info, nil
}

func createDaemonSetListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher {
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
return client.AppsV1().DaemonSets(ns).List(ctx, opts)
// createSharedDaemonsetsInformer creates a shared informer for daemonsets
func createSharedDaemonsetsInformer(clientSet kubernetes.Interface, store *ObjStore) cache.SharedIndexInformer {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
sharedIndexInformer := informerFactory.Apps().V1().DaemonSets().Informer()

sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
store.Add(obj)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
return client.AppsV1().DaemonSets(ns).Watch(ctx, opts)
UpdateFunc: func(oldObj, newObj interface{}) {
store.Update(newObj)
},
}
DeleteFunc: func(obj interface{}) {
store.Delete(obj)
},
})
return sharedIndexInformer
}
2 changes: 1 addition & 1 deletion internal/aws/k8s/k8sclient/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func TestDaemonSetClient(t *testing.T) {

fakeClientSet := fake.NewSimpleClientset(daemonSetObjects...)
client, _ := newDaemonSetClient(fakeClientSet, zap.NewNop(), options)
client.refresh()

daemonSets := make([]any, len(daemonSetObjects))
for i := range daemonSetObjects {
daemonSets[i] = daemonSetObjects[i]
}
assert.NoError(t, client.store.Replace(daemonSets, ""))

expected := []*DaemonSetInfo{
{
Expand Down
51 changes: 29 additions & 22 deletions internal/aws/k8s/k8sclient/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -44,18 +41,24 @@ type deploymentClient struct {
stopChan chan struct{}
stopped bool

store *ObjStore
store *ObjStore
informer cache.SharedIndexInformer

syncChecker initialSyncChecker

mu sync.RWMutex
deploymentInfos []*DeploymentInfo
logger *zap.Logger
}

func (d *deploymentClient) refresh() {
d.mu.Lock()
defer d.mu.Unlock()

if !cache.WaitForCacheSync(d.stopChan, d.informer.HasSynced) {
d.logger.Warn("Deployments informer cache sync timeout")
}

var deploymentInfos []*DeploymentInfo
objsList := d.store.List()
for _, obj := range objsList {
Expand Down Expand Up @@ -87,26 +90,23 @@ func newDeploymentClient(clientSet kubernetes.Interface, logger *zap.Logger, opt
option(d)
}

ctx := context.Background()
if _, err := clientSet.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
return nil, fmt.Errorf("cannot list Deployments. err: %w", err)
}

d.store = NewObjStore(transformFuncDeployment, logger)
lw := createDeploymentListWatch(clientSet, metav1.NamespaceAll)
reflector := cache.NewReflector(lw, &appsv1.Deployment{}, d.store, 0)
d.logger = logger

go reflector.Run(d.stopChan)
d.informer = createSharedDeploymentsInformer(clientSet, d.store)
go d.informer.Run(d.stopChan)

if d.syncChecker != nil {
// check the init sync for potential connection issue
d.syncChecker.Check(reflector, "Deployment initial sync timeout")
d.syncChecker.Check(d.informer, "Deployment initial sync timeout")
}

return d, nil
}

func (d *deploymentClient) shutdown() {
d.mu.Lock()
defer d.mu.Unlock()
close(d.stopChan)
d.stopped = true
}
Expand All @@ -131,14 +131,21 @@ func transformFuncDeployment(obj any) (any, error) {
return info, nil
}

func createDeploymentListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher {
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
return client.AppsV1().Deployments(ns).List(ctx, opts)
// createSharedDeploymentsInformer creates a shared informer for deployments
func createSharedDeploymentsInformer(clientSet kubernetes.Interface, store *ObjStore) cache.SharedIndexInformer {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
sharedIndexInformer := informerFactory.Apps().V1().Deployments().Informer()

sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
store.Add(obj)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
return client.AppsV1().Deployments(ns).Watch(ctx, opts)
UpdateFunc: func(oldObj, newObj interface{}) {
store.Update(newObj)
},
}
DeleteFunc: func(obj interface{}) {
store.Delete(obj)
},
})
return sharedIndexInformer
}
43 changes: 25 additions & 18 deletions internal/aws/k8s/k8sclient/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -50,6 +47,7 @@ func epSyncCheckerOption(checker initialSyncChecker) epClientOption {
type epClient struct {
stopChan chan struct{}
store *ObjStore
informer cache.SharedIndexInformer

stopped bool

Expand All @@ -58,6 +56,7 @@ type epClient struct {
mu sync.RWMutex
podKeyToServiceNamesMap map[string][]string
serviceToPodNumMap map[Service]int // only running pods will show behind endpoints
logger *zap.Logger
}

func (c *epClient) PodKeyToServiceNames() map[string][]string {
Expand Down Expand Up @@ -132,20 +131,21 @@ func newEpClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...
}

c.store = NewObjStore(transformFuncEndpoint, logger)
lw := c.createEndpointListWatch(clientSet, metav1.NamespaceAll)
reflector := cache.NewReflector(lw, &v1.Endpoints{}, c.store, 0)
c.logger = logger

go reflector.Run(c.stopChan)
c.informer = createSharedEndpointsInformer(clientSet, c.store)
go c.informer.Run(c.stopChan)

if c.syncChecker != nil {
// check the init sync for potential connection issue
c.syncChecker.Check(reflector, "Endpoint initial sync timeout")
if !cache.WaitForCacheSync(c.stopChan, c.informer.HasSynced) {
c.logger.Warn("Endpoint informer cache sync timeout")
}

return c
}

func (c *epClient) shutdown() {
c.mu.Lock()
defer c.mu.Unlock()
close(c.stopChan)
c.stopped = true
}
Expand Down Expand Up @@ -177,14 +177,21 @@ func transformFuncEndpoint(obj any) (any, error) {
return info, nil
}

func (c *epClient) createEndpointListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher {
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Endpoints(ns).List(ctx, opts)
// createSharedEndpointsInformer creates a shared informer for endpoints
func createSharedEndpointsInformer(clientSet kubernetes.Interface, store *ObjStore) cache.SharedIndexInformer {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
sharedIndexInformer := informerFactory.Core().V1().Endpoints().Informer()

sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
store.Add(obj)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Endpoints(ns).Watch(ctx, opts)
UpdateFunc: func(oldObj, newObj interface{}) {
store.Update(newObj)
},
}
DeleteFunc: func(obj interface{}) {
store.Delete(obj)
},
})
return sharedIndexInformer
}
13 changes: 7 additions & 6 deletions internal/aws/k8s/k8sclient/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,13 @@ var endpointsArray = []runtime.Object{
}

func setUpEndpointClient() (*epClient, chan struct{}) {
stopChan := make(chan struct{})
options := epSyncCheckerOption(&mockReflectorSyncChecker{})

client := &epClient{
stopChan: stopChan,
store: NewObjStore(transformFuncEndpoint, zap.NewNop()),
}
return client, stopChan
fakeClientSet := fake.NewSimpleClientset()
client := newEpClient(fakeClientSet, zap.NewNop(), options)
client.refresh()

return client, client.stopChan
}

func TestEpClient_PodKeyToServiceNames(t *testing.T) {
Expand Down Expand Up @@ -415,6 +415,7 @@ func TestNewEndpointClient(t *testing.T) {

fakeClientSet := fake.NewSimpleClientset(endpointsArray...)
client := newEpClient(fakeClientSet, zap.NewNop(), setOption)
client.refresh()
assert.NotNil(t, client)
client.shutdown()
removeTempKubeConfig()
Expand Down
Loading

0 comments on commit 328f503

Please sign in to comment.