Skip to content

Commit

Permalink
SIG can reload configs (#1322)
Browse files Browse the repository at this point in the history
  • Loading branch information
scrye authored Nov 16, 2017
1 parent d5ff4d7 commit 90cf7bf
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 136 deletions.
5 changes: 3 additions & 2 deletions go/lib/pathmgr/pathmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ func (r *PR) WatchFilter(src, dst *addr.ISD_AS, filter *PathPredicate) (*SyncPat

// UnwatchFilter deletes a previously registered filter.
func (r *PR) UnwatchFilter(src, dst *addr.ISD_AS, filter *PathPredicate) error {
// FIXME(scrye): Implement this
return common.NewCError("Function UnwatchFilter not implemented")
r.Lock()
defer r.Unlock()
return r.cache.removeWatch(src, dst, filter)
}

// Revoke asynchronously informs SCIOND about a revocation and flushes any
Expand Down
182 changes: 130 additions & 52 deletions go/sig/base/as.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/netsec-ethz/scion/go/lib/addr"
"github.com/netsec-ethz/scion/go/lib/common"
liblog "github.com/netsec-ethz/scion/go/lib/log"
"github.com/netsec-ethz/scion/go/sig/config"
"github.com/netsec-ethz/scion/go/sig/egress"
"github.com/netsec-ethz/scion/go/sig/sigcmn"
"github.com/netsec-ethz/scion/go/sig/siginfo"
Expand All @@ -38,16 +39,16 @@ const sigMgrTick = 10 * time.Second
// ASEntry contains all of the information required to interact with a remote AS.
type ASEntry struct {
sync.RWMutex
log.Logger
Nets map[string]*NetEntry
Sigs *siginfo.SigMap
IA *addr.ISD_AS
IAString string
Nets map[string]*NetEntry
Sigs siginfo.SigMap
Session *egress.Session
DevName string
tunLink netlink.Link
tunIO io.ReadWriteCloser
sigMgrStop chan struct{}
log.Logger
}

func newASEntry(ia *addr.ISD_AS) (*ASEntry, error) {
Expand All @@ -56,34 +57,67 @@ func newASEntry(ia *addr.ISD_AS) (*ASEntry, error) {
IA: ia,
IAString: ia.String(),
Nets: make(map[string]*NetEntry),
Sigs: make(siginfo.SigMap),
Sigs: &siginfo.SigMap{},
DevName: fmt.Sprintf("scion-%s", ia),
sigMgrStop: make(chan struct{}),
}
var err error
if ae.Session, err = egress.NewSession(ia, 0, ae.SigMap, ae.Logger); err != nil {
if ae.Session, err = egress.NewSession(ia, 0, ae.Sigs, ae.Logger); err != nil {
return nil, err
}
return ae, nil
}

func (ae *ASEntry) setupNet() error {
var err error
ae.tunLink, ae.tunIO, err = xnet.ConnectTun(ae.DevName)
if err != nil {
return err
func (ae *ASEntry) ReloadConfig(cfg *config.ASEntry) bool {
ae.Lock()
defer ae.Unlock()
// Method calls first to prevent skips due to logical short-circuit
s := ae.addNewSIGS(cfg.Sigs)
s = ae.delOldSIGS(cfg.Sigs) && s
s = ae.addNewNets(cfg.Nets) && s
return ae.delOldNets(cfg.Nets) && s
}

// addNewNets adds the networks in ipnets that are not currently configured.
func (ae *ASEntry) addNewNets(ipnets []*config.IPNet) bool {
s := true
for _, ipnet := range ipnets {
err := ae.addNet(ipnet.IPNet())
if err != nil {
ae.Error("Unable to add network", "net", ipnet, "err", err)
s = false
}
}
ae.Info("Network setup done")
go egress.NewDispatcher(ae.DevName, ae.tunIO, ae.Session).Run()
go ae.sigMgr()
ae.Session.Start()
return nil
return s
}

// delOldNets deletes currently configured networks that are not in ipnets.
func (ae *ASEntry) delOldNets(ipnets []*config.IPNet) bool {
s := true
Top:
for _, ne := range ae.Nets {
for _, ipnet := range ipnets {
if ne.Net.String() == ipnet.IPNet().String() {
continue Top
}
}
err := ae.delNet(ne.Net)
if err != nil {
ae.Error("Unable to delete network", "NetEntry", ne, "err", err)
s = false
}
}
return s
}

// AddNet idempotently adds a network for the remote IA.
func (ae *ASEntry) AddNet(ipnet *net.IPNet) error {
ae.Lock()
defer ae.Unlock()
return ae.addNet(ipnet)
}

func (ae *ASEntry) addNet(ipnet *net.IPNet) error {
if ae.tunLink == nil {
// Ensure that the network setup is done, as otherwise route entries can't be added.
if err := ae.setupNet(); err != nil {
Expand All @@ -106,23 +140,67 @@ func (ae *ASEntry) AddNet(ipnet *net.IPNet) error {
// DelIA removes a network for the remote IA.
func (ae *ASEntry) DelNet(ipnet *net.IPNet) error {
ae.Lock()
defer ae.Unlock()
return ae.delNet(ipnet)
}

// DelIA removes a network for the remote IA.
func (ae *ASEntry) delNet(ipnet *net.IPNet) error {
key := ipnet.String()
ne, ok := ae.Nets[key]
if !ok {
ae.Unlock()
return common.NewCError("DelNet: no network found", "ia", ae.IA, "net", ipnet)
}
delete(ae.Nets, key)
ae.Unlock() // Do cleanup outside the lock.
ae.Info("Removed network", "net", ipnet)
return ne.Cleanup()
}

// addNewSIGS adds the SIGs in sigs that are not currently configured.
func (ae *ASEntry) addNewSIGS(sigs config.SIGSet) bool {
s := true
for _, sig := range sigs {
ctrlPort := int(sig.CtrlPort)
if ctrlPort == 0 {
ctrlPort = sigcmn.DefaultCtrlPort
}
encapPort := int(sig.EncapPort)
if encapPort == 0 {
encapPort = sigcmn.DefaultEncapPort
}
err := ae.AddSig(sig.Id, sig.Addr, ctrlPort, encapPort, true)
if err != nil {
ae.Error("Unable to add SIG", "sig", sig, "err", err)
s = false
}
}
return s
}

// delOldSIGS deletes the currently configured SIGs that are not in sigs.
func (ae *ASEntry) delOldSIGS(sigs config.SIGSet) bool {
s := true
ae.Sigs.Range(func(id siginfo.SigIdType, sig *siginfo.Sig) bool {
if !sig.Static {
return true
}
if _, ok := sigs[sig.Id]; !ok {
err := ae.DelSig(sig.Id)
if err != nil {
ae.Error("Unable to delete SIG", "err", err)
s = false
}
}
return true
})
return s
}

// AddSig idempotently adds a SIG for the remote IA.
func (ae *ASEntry) AddSig(id siginfo.SigIdType, ip net.IP,
ctrlPort, encapPort int, static bool) error {
ae.Lock()
defer ae.Unlock()
func (ae *ASEntry) AddSig(id siginfo.SigIdType, ip net.IP, ctrlPort, encapPort int,
static bool) error {
// ae.Sigs is thread safe, no master lock needed
if len(id) == 0 {
return common.NewCError("AddSig: SIG id empty", "ia", ae.IA)
}
Expand All @@ -137,60 +215,52 @@ func (ae *ASEntry) AddSig(id siginfo.SigIdType, ip net.IP,
cerr := err.(*common.CError)
return cerr.AddCtx(cerr.Ctx, "ia", ae.IA, "id", id)
}
if _, ok := ae.Sigs[id]; ok {
// FIXME(kormat): support updating SIG entry.
return nil
if sig, ok := ae.Sigs.Load(id); ok {
sig.Host = addr.HostFromIP(ip)
sig.CtrlL4Port = ctrlPort
sig.EncapL4Port = encapPort
ae.Info("Updated SIG", "sig", sig)
} else {
sig := siginfo.NewSig(ae.IA, id, addr.HostFromIP(ip), ctrlPort, encapPort, static)
ae.Sigs.Store(id, sig)
ae.Info("Added SIG", "sig", sig)
}
ae.Sigs[id] = siginfo.NewSig(ae.IA, id, addr.HostFromIP(ip), ctrlPort, encapPort, static)
ae.Info("Added SIG", "sig", ae.Sigs[id])
return nil
}

// DelSIG removes an SIG for the remote IA.
func (ae *ASEntry) DelSig(id siginfo.SigIdType) error {
ae.Lock()
se, ok := ae.Sigs[id]
// ae.Sigs is thread safe, no master lock needed
se, ok := ae.Sigs.Load(id)
if !ok {
ae.Unlock()
return common.NewCError("DelSig: no SIG found", "ia", ae.IA, "id", id)
}
delete(ae.Sigs, id)
ae.Unlock() // Do cleanup outside the lock.
ae.Sigs.Delete(id)
ae.Info("Removed SIG", "id", id)
return se.Cleanup()
}

// Internal method to return a *copy* of the ASEntry's SigMap
func (ae *ASEntry) SigMap() siginfo.SigMap {
ae.Lock()
defer ae.Unlock()
smap := make(siginfo.SigMap)
for k, v := range ae.Sigs {
smap[k] = v
}
return smap
}

// manage the Sig map
func (ae *ASEntry) sigMgr() {
defer liblog.LogPanicAndExit()
ticker := time.NewTicker(sigMgrTick)
defer ticker.Stop()
log.Info("sigMgr starting")
ae.Info("sigMgr starting")
Top:
for {
// TODO(kormat): handle adding new SIGs from discovery, and updating existing ones.
select {
case <-ae.sigMgrStop:
break Top
case <-ticker.C:
smap := ae.SigMap()
for _, sig := range smap {
ae.Sigs.Range(func(id siginfo.SigIdType, sig *siginfo.Sig) bool {
sig.ExpireFails()
}
return true
})
}
}
log.Info("sigMgr stopping")
close(ae.sigMgrStop)
ae.Info("sigMgr stopping")
}

func (ae *ASEntry) Cleanup() error {
Expand All @@ -204,12 +274,7 @@ func (ae *ASEntry) Cleanup() error {
}
// Clean up sessions, and associated workers.
ae.cleanSessions()
for _, ne := range ae.Nets {
if err := ne.Cleanup(); err != nil {
cerr := err.(*common.CError)
ae.Error(cerr.Desc, cerr.Ctx...)
}
}
// The operating system also removes the routes when deleting the link.
if err := netlink.LinkDel(ae.tunLink); err != nil {
// Only return this error, as it's the only critical one.
return common.NewCError("Error removing TUN link",
Expand All @@ -223,3 +288,16 @@ func (ae *ASEntry) cleanSessions() {
ae.Session.Error("Error cleaning up session", "err", err)
}
}

func (ae *ASEntry) setupNet() error {
var err error
ae.tunLink, ae.tunIO, err = xnet.ConnectTun(ae.DevName)
if err != nil {
return err
}
ae.Info("Network setup done")
go egress.NewDispatcher(ae.DevName, ae.tunIO, ae.Session).Run()
go ae.sigMgr()
ae.Session.Start()
return nil
}
Loading

0 comments on commit 90cf7bf

Please sign in to comment.