Skip to content

Commit

Permalink
anapaya-hackathon: introduce generic happy eyeballs
Browse files Browse the repository at this point in the history
Introduce a generic happy eyeballs implementation that can be reused
by all our clients. This allows us to reduce code duplication and
simplifies implementations.
  • Loading branch information
oncilla committed Dec 13, 2023
1 parent c6109d8 commit a15fa19
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 262 deletions.
134 changes: 31 additions & 103 deletions control/beaconing/happy/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package happy
import (
"context"
"net"
"sync"
"time"

"github.com/scionproto/scion/control/beaconing"
"github.com/scionproto/scion/pkg/addr"
"github.com/scionproto/scion/pkg/log"
"github.com/scionproto/scion/pkg/connect/happy"
"github.com/scionproto/scion/pkg/private/serrors"
seg "github.com/scionproto/scion/pkg/segment"
)
Expand Down Expand Up @@ -46,58 +44,20 @@ type BeaconSender struct {
}

func (s BeaconSender) Send(ctx context.Context, b *seg.PathSegment) error {
abortCtx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(2)

errs := [2]error{}
successCh := make(chan struct{}, 2)

go func() {
defer log.HandlePanic()
defer wg.Done()
err := s.Connect.Send(abortCtx, b)
if err == nil {
successCh <- struct{}{}
log.Info("Sent beacon via connect")
cancel()
} else {
log.Info("Failed to send beacon via connect", "err", err)
}
errs[0] = err
}()

go func() {
defer log.HandlePanic()
defer wg.Done()
select {
case <-abortCtx.Done():
return
case <-time.After(500 * time.Millisecond):
}
err := s.Grpc.Send(abortCtx, b)
if err == nil {
successCh <- struct{}{}
log.Info("Sent beacon via gRPC")
cancel()
} else {
log.Info("Failed to send beacon via gRPC", "err", err)
}
errs[1] = err
}()

wg.Wait()
var combinedErrs serrors.List
for _, err := range errs {
if err != nil {
combinedErrs = append(combinedErrs, err)
}
}
// Only report error if both sends were unsuccessful.
if len(combinedErrs) == 2 {
return combinedErrs.ToError()
}
return nil
_, err := happy.Happy(
ctx,
happy.Call1[*seg.PathSegment, struct{}]{
Call: happy.NoReturn1[*seg.PathSegment](s.Connect.Send).Call,
Input1: b,
Typ: "connect",
},
happy.Call1[*seg.PathSegment, struct{}]{
Call: happy.NoReturn1[*seg.PathSegment](s.Connect.Send).Call,
Input1: b,
Typ: "grpc",
},
)
return err
}

func (s BeaconSender) Close() error {
Expand All @@ -117,52 +77,20 @@ type Registrar struct {
}

func (r *Registrar) RegisterSegment(ctx context.Context, meta seg.Meta, remote net.Addr) error {
abortCtx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(2)

errs := [2]error{}
go func() {
defer log.HandlePanic()
defer wg.Done()
err := r.Connect.RegisterSegment(abortCtx, meta, remote)
if err == nil {
log.Info("Registered segments via connect")
cancel()
} else {
log.Info("Failed to register segments via connect", "err", err)
}
errs[0] = err
}()

go func() {
defer log.HandlePanic()
defer wg.Done()
select {
case <-abortCtx.Done():
return
case <-time.After(500 * time.Millisecond):
}
err := r.Grpc.RegisterSegment(abortCtx, meta, remote)
if err == nil {
log.Info("Registered segments via gRPC")
cancel()
} else {
log.Info("Failed to register segments via gRPC", "err", err)
}
errs[1] = err
}()

wg.Wait()
var combinedErrs serrors.List
for _, err := range errs {
if err != nil {
combinedErrs = append(combinedErrs, err)
}
}
// Only report error if both sends were unsuccessful.
if len(combinedErrs) == 2 {
return combinedErrs.ToError()
}
return nil
_, err := happy.Happy(
ctx,
happy.Call2[seg.Meta, net.Addr, struct{}]{
Call: happy.NoReturn2[seg.Meta, net.Addr](r.Connect.RegisterSegment).Call,
Input1: meta,
Input2: remote,
Typ: "connect",
},
happy.Call2[seg.Meta, net.Addr, struct{}]{
Call: happy.NoReturn2[seg.Meta, net.Addr](r.Connect.RegisterSegment).Call,
Input1: meta,
Input2: remote,
Typ: "grpc",
},
)
return err
}
12 changes: 12 additions & 0 deletions pkg/connect/happy/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("//tools/lint:go.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["happy.go"],
importpath = "github.com/scionproto/scion/pkg/connect/happy",
visibility = ["//visibility:public"],
deps = [
"//pkg/log:go_default_library",
"//pkg/private/serrors:go_default_library",
],
)
116 changes: 116 additions & 0 deletions pkg/connect/happy/happy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package happy

import (
"context"
"sync"
"time"

"github.com/scionproto/scion/pkg/log"
"github.com/scionproto/scion/pkg/private/serrors"
)

type Caller[R any] interface {
Invoke(context.Context) (R, error)
Type() string
}

type NoReturn1[I1 any] func(context.Context, I1) error

func (d NoReturn1[I1]) Call(ctx context.Context, i1 I1) (struct{}, error) {
return struct{}{}, d(ctx, i1)
}

type Call1[I1 any, R any] struct {
Call func(context.Context, I1) (R, error)
Input1 I1
Typ string
}

func (c Call1[I1, R]) Invoke(ctx context.Context) (R, error) {
return c.Call(ctx, c.Input1)
}

func (c Call1[I1, R]) Type() string {
return c.Typ
}

type Call2[I1 any, I2, R any] struct {
Call func(context.Context, I1, I2) (R, error)
Input1 I1
Input2 I2
Typ string
}

func (c Call2[I1, I2, R]) Invoke(ctx context.Context) (R, error) {
return c.Call(ctx, c.Input1, c.Input2)
}

func (c Call2[I1, I2, R]) Type() string {
return c.Typ
}

type NoReturn2[I1, I2 any] func(context.Context, I1, I2) error

func (d NoReturn2[I1, I2]) Call(ctx context.Context, i1 I1, i2 I2) (struct{}, error) {
return struct{}{}, d(ctx, i1, i2)
}

func Happy[R any](ctx context.Context, fast, slow Caller[R]) (R, error) {
logger := log.FromCtx(ctx)

var (
wg sync.WaitGroup
reps [2]R
errs [2]error
)

wg.Add(2)
abortCtx, cancel := context.WithCancel(ctx)
go func() {
defer log.HandlePanic()
defer wg.Done()
rep, err := fast.Invoke(abortCtx)
if err == nil {
reps[0] = rep
logger.Debug("Received response via fast path", "type", fast.Type())
cancel()
} else {
logger.Debug("Failed to receive on fast path", "type", fast.Type(), "err", err)
}
errs[0] = err
}()

go func() {
defer log.HandlePanic()
defer wg.Done()
select {
case <-abortCtx.Done():
return
case <-time.After(500 * time.Millisecond):
}
rep, err := slow.Invoke(abortCtx)
if err == nil {
reps[0] = rep
logger.Debug("Received response via slow path", "type", slow.Type())
cancel()
} else {
logger.Debug("Failed to receive on slow path", "type", slow.Type(), "err", err)
}
errs[1] = err
}()

wg.Wait()

var zero R
switch {
// Both requests failed.
case errs[0] != nil && errs[1] != nil:
return zero, serrors.List(errs[:]).ToError()
// Fast request failed. Return slow.
case errs[0] != nil:
return reps[1], nil
// Fast succeeded. Return fast (even if slow succeeded too)
default:
return reps[0], nil
}
}
70 changes: 16 additions & 54 deletions private/segment/segfetcher/happy/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ package happy
import (
"context"
"net"
"sync"
"time"

"github.com/scionproto/scion/pkg/log"
"github.com/scionproto/scion/pkg/private/serrors"
"github.com/scionproto/scion/pkg/connect/happy"
"github.com/scionproto/scion/private/segment/segfetcher"
)

Expand All @@ -20,54 +17,19 @@ type Requester struct {
func (f *Requester) Segments(ctx context.Context, req segfetcher.Request,
server net.Addr) (segfetcher.SegmentsReply, error) {

abortCtx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(2)

reps := [2]segfetcher.SegmentsReply{}
errs := [2]error{}

go func() {
defer log.HandlePanic()
defer wg.Done()
rep, err := f.Connect.Segments(abortCtx, req, server)
if err == nil {
reps[0] = rep
log.Info("Received segments via connect")
cancel()
} else {
log.Info("Failed to fetch segments via connect", "err", err)
}
errs[0] = err
}()

go func() {
defer log.HandlePanic()
defer wg.Done()
select {
case <-abortCtx.Done():
return
case <-time.After(500 * time.Millisecond):
}
rep, err := f.Grpc.Segments(abortCtx, req, server)
if err == nil {
reps[0] = rep
log.Info("Received segments via gRPC")
cancel()
} else {
log.Info("Failed to fetch segments via gRPC", "err", err)
}
errs[1] = err
}()

wg.Wait()
var combinedErrs serrors.List
for i := range reps {
if errs[i] != nil {
combinedErrs = append(combinedErrs, errs[i])
continue
}
return reps[i], nil
}
return segfetcher.SegmentsReply{}, combinedErrs.ToError()
return happy.Happy(
ctx,
happy.Call2[segfetcher.Request, net.Addr, segfetcher.SegmentsReply]{
Call: f.Connect.Segments,
Input1: req,
Input2: server,
Typ: "connect",
},
happy.Call2[segfetcher.Request, net.Addr, segfetcher.SegmentsReply]{
Call: f.Grpc.Segments,
Input1: req,
Input2: server,
Typ: "grpc",
},
)
}
Loading

0 comments on commit a15fa19

Please sign in to comment.