Skip to content

Commit

Permalink
Start breaking out rtp_proxy client interface to be more modular
Browse files Browse the repository at this point in the history
in lieu of changes done to the Python code. No functional changes
so far.
  • Loading branch information
sobomax committed Jan 6, 2025
1 parent 25d0976 commit 29a4085
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 82 deletions.
5 changes: 3 additions & 2 deletions cmd/b2bua_radius/call_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"time"

"github.com/sippy/go-b2bua/sippy"
"github.com/sippy/go-b2bua/sippy/rtp_proxy/session"
"github.com/sippy/go-b2bua/sippy/headers"
"github.com/sippy/go-b2bua/sippy/net"
"github.com/sippy/go-b2bua/sippy/time"
Expand All @@ -59,7 +60,7 @@ type callController struct {
cld string
caller_name string
challenge *sippy_header.SipWWWAuthenticate
rtp_proxy_session *sippy.Rtp_proxy_session
rtp_proxy_session *rtp_proxy_session.Rtp_proxy_session
eTry *sippy.CCEventTry
huntstop_scodes []int
acctA Accounting
Expand Down Expand Up @@ -167,7 +168,7 @@ func (self *callController) RecvEvent(event sippy_types.CCEvent, ua sippy_types.
}
if len(self.cmap.rtp_proxy_clients) > 0 {
var err error
self.rtp_proxy_session, err = sippy.NewRtp_proxy_session(self.global_config, self.cmap.rtp_proxy_clients, self.cId.CallId, "", "", self.global_config.B2bua_socket, /*notify_tag*/ fmt.Sprintf("r%%20%d", self.id), self.lock)
self.rtp_proxy_session, err = rtp_proxy_session.NewRtp_proxy_session(self.global_config, self.cmap.rtp_proxy_clients, self.cId.CallId, "", "", self.global_config.B2bua_socket, /*notify_tag*/ fmt.Sprintf("r%%20%d", self.id), self.lock)
if err != nil {
self.uaA.RecvEvent(sippy.NewCCEventFail(500, "Internal Server Error (4)", event.GetRtime(), ""))
self.state = CCStateDead
Expand Down
5 changes: 3 additions & 2 deletions cmd/b2bua_radius/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/sippy/go-b2bua/sippy/cli"
"github.com/sippy/go-b2bua/sippy/net"
"github.com/sippy/go-b2bua/sippy/types"
"github.com/sippy/go-b2bua/sippy/rtp_proxy"
"github.com/sippy/go-b2bua/sippy/utils"
)

Expand Down Expand Up @@ -64,14 +65,14 @@ func main() {
}
rtp_proxy_clients := make([]sippy_types.RtpProxyClient, len(global_config.Rtp_proxy_clients_arr))
for i, address := range global_config.Rtp_proxy_clients_arr {
opts, err := sippy.NewRtpProxyClientOpts(address, nil /*bind_address*/, global_config, global_config.ErrorLogger())
opts, err := rtp_proxy.NewRtpProxyClientOpts(address, nil /*bind_address*/, global_config, global_config.ErrorLogger())
if err != nil {
println("Cannot initialize rtpproxy client: " + err.Error())
return
}
opts.SetHeartbeatInterval(global_config.Hrtb_ival_dur)
opts.SetHeartbeatRetryInterval(global_config.Hrtb_retr_ival_dur)
rtpp := sippy.NewRtpProxyClient(opts)
rtpp := rtp_proxy.NewRtpProxyClient(opts)
err = rtpp.Start()
if err != nil {
println("Cannot initialize rtpproxy client: " + err.Error())
Expand Down
40 changes: 17 additions & 23 deletions sippy/rtp_proxy_client.go → sippy/rtp_proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package sippy

package rtp_proxy

import (
"bufio"
"net"
"strconv"
"strings"

"github.com/sippy/go-b2bua/sippy"
"github.com/sippy/go-b2bua/sippy/rtp_proxy/types"
"github.com/sippy/go-b2bua/sippy/net"
"github.com/sippy/go-b2bua/sippy/types"
)
Expand All @@ -43,7 +46,7 @@ func NewRtpProxyClient(opts *rtpProxyClientOpts) sippy_types.RtpProxyClient {
type Rtp_proxy_client_base struct {
heir sippy_types.RtpProxyClient
opts *rtpProxyClientOpts
transport rtp_proxy_transport
transport rtp_proxy_types.RtpProxyTransport
online bool
sbind_supported bool
tnot_supported bool
Expand All @@ -59,17 +62,8 @@ type Rtp_proxy_client_base struct {
ptransmitted int64
}

type rtp_proxy_transport interface {
address() net.Addr
get_rtpc_delay() float64
is_local() bool
send_command(string, func(string))
shutdown()
reconnect(net.Addr, *sippy_net.HostPort)
}

func (self *Rtp_proxy_client_base) IsLocal() bool {
return self.transport.is_local()
return self.transport.Is_local()
}

func (self *Rtp_proxy_client_base) IsOnline() bool {
Expand Down Expand Up @@ -100,7 +94,7 @@ func (self *Rtp_proxy_client_base) me() sippy_types.RtpProxyClient {
}

func (self *Rtp_proxy_client_base) Address() net.Addr {
return self.transport.address()
return self.transport.Address()
}

func NewRtp_proxy_client_base(heir sippy_types.RtpProxyClient, opts *rtpProxyClientOpts) *Rtp_proxy_client_base {
Expand Down Expand Up @@ -130,18 +124,18 @@ func (self *Rtp_proxy_client_base) Start() error {
}

func (self *Rtp_proxy_client_base) SendCommand(cmd string, cb func(string)) {
self.transport.send_command(cmd, cb)
self.transport.Send_command(cmd, cb)
}

func (self *Rtp_proxy_client_base) Reconnect(addr net.Addr, bind_addr *sippy_net.HostPort) {
self.transport.reconnect(addr, bind_addr)
self.transport.Reconnect(addr, bind_addr)
}

func (self *Rtp_proxy_client_base) version_check() {
if self.shut_down {
return
}
self.transport.send_command("V", self.version_check_reply)
self.transport.Send_command("V", self.version_check_reply)
}

func (self *Rtp_proxy_client_base) version_check_reply(version string) {
Expand All @@ -153,7 +147,7 @@ func (self *Rtp_proxy_client_base) version_check_reply(version string) {
} else if self.online {
self.me().GoOffline()
} else {
StartTimeoutWithSpread(self.version_check, nil, self.opts.hrtb_retr_ival, 1, self.opts.logger, 0.1)
sippy.StartTimeoutWithSpread(self.version_check, nil, self.opts.hrtb_retr_ival, 1, self.opts.logger, 0.1)
}
}

Expand All @@ -162,7 +156,7 @@ func (self *Rtp_proxy_client_base) heartbeat() {
if self.shut_down {
return
}
self.transport.send_command("Ib", self.heartbeat_reply)
self.transport.Send_command("Ib", self.heartbeat_reply)
}

func (self *Rtp_proxy_client_base) heartbeat_reply(stats string) {
Expand Down Expand Up @@ -198,7 +192,7 @@ func (self *Rtp_proxy_client_base) heartbeat_reply(stats string) {
}
self.me().UpdateActive(active_sessions, sessions_created, active_streams, preceived, ptransmitted)
}
StartTimeoutWithSpread(self.heartbeat, nil, self.opts.hrtb_ival, 1, self.opts.logger, 0.1)
sippy.StartTimeoutWithSpread(self.heartbeat, nil, self.opts.hrtb_ival, 1, self.opts.logger, 0.1)
}

func (self *Rtp_proxy_client_base) GoOnline() {
Expand All @@ -222,7 +216,7 @@ func (self *Rtp_proxy_client_base) GoOffline() {
//print "go_offline", self.address, self.online
if self.online {
self.online = false
StartTimeoutWithSpread(self.version_check, nil, self.opts.hrtb_retr_ival, 1, self.opts.logger, 0.1)
sippy.StartTimeoutWithSpread(self.version_check, nil, self.opts.hrtb_retr_ival, 1, self.opts.logger, 0.1)
}
}

Expand Down Expand Up @@ -259,7 +253,7 @@ func (self *Rtp_proxy_client_base) Shutdown() {
return
}
self.shut_down = true
self.transport.shutdown()
self.transport.Shutdown()
}

func (self *Rtp_proxy_client_base) IsShutDown() bool {
Expand All @@ -271,7 +265,7 @@ func (self *Rtp_proxy_client_base) GetOpts() sippy_types.RtpProxyClientOpts {
}

func (self *Rtp_proxy_client_base) GetRtpcDelay() float64 {
return self.transport.get_rtpc_delay()
return self.transport.Get_rtpc_delay()
}

type rtppCapsChecker struct {
Expand All @@ -297,7 +291,7 @@ func newRtppCapsChecker(rtpc *Rtp_proxy_client_base) *rtppCapsChecker {
attr := it.attr // For some reason the it.attr cannot be passed into the following
// function directly - the resulting value is always that of the
// last 'it.attr' value.
rtpc.transport.send_command("VF " + it.vers, func(res string) { self.caps_query_done(res, attr) })
rtpc.transport.Send_command("VF " + it.vers, func(res string) { self.caps_query_done(res, attr) })
}
return self
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package sippy

package rtp_proxy_client

import (
"fmt"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/sippy/go-b2bua/sippy/time"
"github.com/sippy/go-b2bua/sippy/types"
"github.com/sippy/go-b2bua/sippy/utils"
"github.com/sippy/go-b2bua/sippy/rtp_proxy/types"
)

const (
Expand Down Expand Up @@ -144,7 +146,7 @@ type Rtp_proxy_client_stream struct {
global_config sippy_conf.Config
}

func newRtp_proxy_client_stream(owner sippy_types.RtpProxyClient, global_config sippy_conf.Config, address net.Addr, bind_address *sippy_net.HostPort) (rtp_proxy_transport, error) {
func NewRtp_proxy_client_stream(owner sippy_types.RtpProxyClient, global_config sippy_conf.Config, address net.Addr, bind_address *sippy_net.HostPort) (rtp_proxy_types.RtpProxyTransport, error) {
var err error
if address == nil {
address, err = net.ResolveUnixAddr("unix", "/var/run/rtpproxy.sock")
Expand Down Expand Up @@ -178,23 +180,23 @@ func newRtp_proxy_client_stream(owner sippy_types.RtpProxyClient, global_config
return self, nil
}

func (self *Rtp_proxy_client_stream) is_local() bool {
func (self *Rtp_proxy_client_stream) Is_local() bool {
return self._is_local
}

func (self *Rtp_proxy_client_stream) address() net.Addr {
func (self *Rtp_proxy_client_stream) Address() net.Addr {
return self._address
}

func (self *Rtp_proxy_client_stream) send_command(command string, result_callback func(string)) {
func (self *Rtp_proxy_client_stream) Send_command(command string, result_callback func(string)) {
if command[len(command)-1] != '\n' {
command += "\n"
}
self.wi <- &rtpp_req_stream{ command, result_callback }
}

func (self *Rtp_proxy_client_stream) reconnect(address net.Addr, bind_addr *sippy_net.HostPort) {
self.shutdown()
func (self *Rtp_proxy_client_stream) Reconnect(address net.Addr, bind_addr *sippy_net.HostPort) {
self.Shutdown()
self._address = address
self.workers = make([]*_RTPPLWorker, self.nworkers)
for i := 0; i < self.nworkers; i++ {
Expand All @@ -203,7 +205,7 @@ func (self *Rtp_proxy_client_stream) reconnect(address net.Addr, bind_addr *sipp
self.delay_flt = sippy_math.NewRecFilter(0.95, 0.25)
}

func (self *Rtp_proxy_client_stream) shutdown() {
func (self *Rtp_proxy_client_stream) Shutdown() {
self.wi <- nil
for _, rworker := range self.workers {
<-rworker.shutdown_chan
Expand All @@ -216,7 +218,7 @@ func (self *Rtp_proxy_client_stream) register_delay(rtpc_delay time.Duration) {
self.delay_flt.Apply(rtpc_delay.Seconds())
}

func (self *Rtp_proxy_client_stream) get_rtpc_delay() float64 {
func (self *Rtp_proxy_client_stream) Get_rtpc_delay() float64 {
return self.delay_flt.GetLastval()
}
/*
Expand Down
Loading

0 comments on commit 29a4085

Please sign in to comment.