Skip to content
This repository has been archived by the owner on Oct 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #574 from karimra/tunnel-target
Browse files Browse the repository at this point in the history
properly handle tunnel targets with the same ID
  • Loading branch information
karimra authored Apr 3, 2022
2 parents 23928f2 + 5a39809 commit b34da47
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 90 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ Documentation available at [https://gnmic.kmrd.dev](https://gnmic.kmrd.dev)
* **Flexible collector deployment**
`gnmic` can be deployed as a gNMI collector that supports multiple output types ([NATS](https://gnmic.kmrd.dev/user_guide/outputs/nats_output/), [Kafka](https://gnmic.kmrd.dev/user_guide/outputs/kafka_output/), [Prometheus](https://gnmic.kmrd.dev/user_guide/outputs/prometheus_output/), [InfluxDB](https://gnmic.kmrd.dev/user_guide/outputs/influxdb_output/),...).
The collector can be deployed either as a [single instance](https://gnmic.kmrd.dev/deployments/deployments_intro/#single-instance), as part of a [cluster](https://gnmic.kmrd.dev/user_guide/HA/), or used to form [data pipelines](https://gnmic.kmrd.dev/deployments/deployments_intro/#pipelines).
* **gNMI data manipulation**
* **Support gRPC tunnel based dialout telemetry**
`gnmic` can be deployed as a gNMI collector with an [embedded tunnel server](https://gnmic.kmrd.dev/user_guide/tunnel_server/).
* **gNMI data manipulation**
`gnmic` collector has [data transformation](https://gnmic.kmrd.dev/user_guide/event_processors/intro/) capabilities that can be used to adapt the collected data to your specific use case.
* **Dynamic targets loading**
`gnmic` support [target loading at runtime](https://gnmic.kmrd.dev/user_guide/target_discovery/discovery_intro/) based on input from external systems.
Expand Down
16 changes: 10 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type App struct {
grpcTunnelSrv *grpc.Server
tunServer *tunnel.Server
ttm *sync.RWMutex
tunTargets map[string]tunnel.Target
tunTargetCfn map[string]context.CancelFunc
tunTargets map[tunnel.Target]struct{}
tunTargetCfn map[tunnel.Target]context.CancelFunc
}

func New() *App {
Expand Down Expand Up @@ -133,8 +133,8 @@ func New() *App {
c: cache.New(nil),
// tunnel server
ttm: new(sync.RWMutex),
tunTargets: make(map[string]tunnel.Target),
tunTargetCfn: make(map[string]context.CancelFunc),
tunTargets: make(map[tunnel.Target]struct{}),
tunTargetCfn: make(map[tunnel.Target]context.CancelFunc),
}
a.router.StrictSlash(true)
a.router.Use(headersMiddleware, a.loggingMiddleware)
Expand Down Expand Up @@ -468,8 +468,11 @@ func (a *App) GetTargets() (map[string]*types.TargetConfig, error) {
time.Sleep(a.Config.TunnelServer.TargetWaitTime)
a.ttm.RLock()
defer a.ttm.RUnlock()
for _, tt := range a.tunTargets {
for tt := range a.tunTargets {
tc := a.getTunnelTargetMatch(tt)
if tc == nil {
continue
}
err = a.Config.SetTargetConfigDefaults(tc)
if err != nil {
return nil, err
Expand All @@ -494,10 +497,11 @@ func (a *App) CreateGNMIClient(ctx context.Context, t *target.Target) error {
targetDialOpts := a.dialOpts
if a.Config.UseTunnelServer {
targetDialOpts = append(targetDialOpts,
grpc.WithContextDialer(a.tunDialerFn(ctx, t.Config.Name)),
grpc.WithContextDialer(a.tunDialerFn(ctx, t.Config)),
)
t.Config.Address = t.Config.Name
}
a.Logger.Printf("creating gRPC client for target %q", t.Config.Name)
if err := t.CreateGNMIClient(ctx, targetDialOpts...); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to create a gRPC client for target %q, timeout (%s) reached", t.Config.Name, t.Config.Timeout)
Expand Down
8 changes: 7 additions & 1 deletion app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ func (a *App) StartCollector(ctx context.Context) {
if t == nil {
continue
}
if _, ok := a.activeTargets[t.Config.Name]; ok {
a.operLock.RLock()
_, ok := a.activeTargets[t.Config.Name]
a.operLock.RUnlock()
if ok {
if a.Config.Debug {
a.Logger.Printf("target %q listener already active", t.Config.Name)
}
continue
}
a.operLock.Lock()
a.activeTargets[t.Config.Name] = struct{}{}
a.operLock.Unlock()

a.Logger.Printf("starting target %q listener", t.Config.Name)
go func(t *target.Target) {
numOnceSubscriptions := t.NumberOfOnceSubscriptions()
Expand Down
1 change: 1 addition & 0 deletions app/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (a *App) getRequest(ctx context.Context, tc *types.TargetConfig, req *gnmi.
}
return response, nil
}

func (a *App) filterModels(ctx context.Context, tc *types.TargetConfig, modelsNames []string) (map[string]*gnmi.ModelData, []string, error) {
supModels, err := a.GetModels(ctx, tc)
if err != nil {
Expand Down
29 changes: 6 additions & 23 deletions app/gnmi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package app

import (
"context"
"errors"
"fmt"

"github.com/karimra/gnmic/types"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/proto/gnmi_ext"
"google.golang.org/grpc"
)

func (a *App) ClientCapabilities(ctx context.Context, tc *types.TargetConfig, ext ...*gnmi_ext.Extension) (*gnmi.CapabilityResponse, error) {
Expand All @@ -21,9 +19,8 @@ func (a *App) ClientCapabilities(ctx context.Context, tc *types.TargetConfig, ex
}
// acquire reader lock
a.operLock.RLock()
defer a.operLock.RUnlock()

err = a.CreateGNMIClient(ctx, t)
a.operLock.RUnlock()
if err != nil {
return nil, err
}
Expand All @@ -46,9 +43,8 @@ func (a *App) ClientGet(ctx context.Context, tc *types.TargetConfig, req *gnmi.G
}
// acquire reader lock
a.operLock.RLock()
defer a.operLock.RUnlock()

err = a.CreateGNMIClient(ctx, t)
a.operLock.RUnlock()
if err != nil {
return nil, err
}
Expand All @@ -70,23 +66,10 @@ func (a *App) ClientSet(ctx context.Context, tc *types.TargetConfig, req *gnmi.S
}
// acquire reader lock
a.operLock.RLock()
defer a.operLock.RUnlock()

if t.Client == nil {
targetDialOpts := a.dialOpts
if a.Config.UseTunnelServer {
targetDialOpts = append(targetDialOpts,
grpc.WithContextDialer(a.tunDialerFn(ctx, tc.Name)),
)
// overwrite target address
t.Config.Address = t.Config.Name
}
if err := t.CreateGNMIClient(ctx, targetDialOpts...); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("failed to create a gRPC client for target %q, timeout (%s) reached", t.Config.Name, t.Config.Timeout)
}
return nil, fmt.Errorf("failed to create a gRPC client for target %q : %v", t.Config.Name, err)
}
err = a.CreateGNMIClient(ctx, t)
a.operLock.RUnlock()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, t.Config.Timeout)
defer cancel()
Expand Down
12 changes: 7 additions & 5 deletions app/gnmi_client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/karimra/gnmic/target"
"github.com/karimra/gnmic/types"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/grpctunnel/tunnel"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -176,15 +177,16 @@ CRCLIENT:
targetDialOpts := a.dialOpts
if a.Config.UseTunnelServer {
a.ttm.Lock()
a.tunTargetCfn[tc.Name] = cancel
a.tunTargetCfn[tunnel.Target{ID: tc.Name, Type: tc.TunnelTargetType}] = cancel
a.ttm.Unlock()
targetDialOpts = append(targetDialOpts,
grpc.WithContextDialer(a.tunDialerFn(gnmiCtx, tc.Name)),
grpc.WithContextDialer(a.tunDialerFn(gnmiCtx, tc)),
)
// overwrite target address
t.Config.Address = t.Config.Name
}
if err := t.CreateGNMIClient(ctx, targetDialOpts...); err != nil {
err := t.CreateGNMIClient(ctx, targetDialOpts...)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
a.Logger.Printf("failed to initialize target %q timeout (%s) reached", tc.Name, t.Config.Timeout)
} else {
Expand Down Expand Up @@ -236,10 +238,10 @@ CRCLIENT:
targetDialOpts := a.dialOpts
if a.Config.UseTunnelServer {
a.ttm.Lock()
a.tunTargetCfn[tc.Name] = cancel
a.tunTargetCfn[tunnel.Target{ID: tc.Name, Type: tc.TunnelTargetType}] = cancel
a.ttm.Unlock()
targetDialOpts = append(targetDialOpts,
grpc.WithContextDialer(a.tunDialerFn(gnmiCtx, tc.Name)),
grpc.WithContextDialer(a.tunDialerFn(gnmiCtx, tc)),
)
// overwrite target address
t.Config.Address = t.Config.Name
Expand Down
2 changes: 1 addition & 1 deletion app/gnmi_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (a *App) Set(ctx context.Context, req *gnmi.SetRequest) (*gnmi.SetResponse,
targetDialOpts := a.dialOpts
if a.Config.UseTunnelServer {
targetDialOpts = append(targetDialOpts,
grpc.WithContextDialer(a.tunDialerFn(ctx, name)),
grpc.WithContextDialer(a.tunDialerFn(ctx, tc)),
)
t.Config.Address = t.Config.Name
}
Expand Down
20 changes: 7 additions & 13 deletions app/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,19 +281,13 @@ func (a *App) startIO() {
limiter = time.NewTicker(a.Config.LocalFlags.SubscribeBackoff)
}

a.wg.Add(len(a.Config.Targets))
for _, tc := range a.Config.Targets {
// check if target is a tunnel discovered target.
// in which case, do not (re)subscribe
a.ttm.RLock()
_, ok := a.tunTargets[tc.Name]
a.ttm.RUnlock()
if ok {
continue
}
go a.subscribeStream(a.ctx, tc)
if limiter != nil {
<-limiter.C
if !a.Config.UseTunnelServer {
for _, tc := range a.Config.Targets {
a.wg.Add(1)
go a.subscribeStream(a.ctx, tc)
if limiter != nil {
<-limiter.C
}
}
}
if limiter != nil {
Expand Down
17 changes: 5 additions & 12 deletions app/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (a *App) stopTarget(ctx context.Context, name string) error {
}
a.operLock.Lock()
defer a.operLock.Unlock()
if !a.targetExists(name) {
if _, ok := a.Targets[name]; !ok {
return fmt.Errorf("target %q does not exist", name)
}

Expand Down Expand Up @@ -87,17 +87,17 @@ func (a *App) DeleteTarget(ctx context.Context, name string) error {
// AddTargetConfig adds a *TargetConfig to the configuration map
func (a *App) AddTargetConfig(tc *types.TargetConfig) {
a.Logger.Printf("adding target %+v", tc)
_, ok := a.Config.Targets[tc.Name]
if ok {
return
}
if tc.BufferSize <= 0 {
tc.BufferSize = a.Config.TargetBufferSize
}
if tc.RetryTimer <= 0 {
tc.RetryTimer = a.Config.Retry
}

if _, ok := a.Config.Targets[tc.Name]; ok {
return
}

a.configLock.Lock()
defer a.configLock.Unlock()
a.Config.Targets[tc.Name] = tc
Expand All @@ -123,13 +123,6 @@ func (a *App) parseProtoFiles(t *target.Target) error {
return nil
}

func (a *App) targetExists(name string) bool {
//a.operLock.RLock()
_, ok := a.Targets[name]
//a.operLock.RUnlock()
return ok
}

func (a *App) targetConfigExists(name string) bool {
a.configLock.RLock()
_, ok := a.Config.Targets[name]
Expand Down
Loading

0 comments on commit b34da47

Please sign in to comment.