Skip to content

Commit

Permalink
Move update command into its own TU and start adding subcommands
Browse files Browse the repository at this point in the history
support.
  • Loading branch information
sobomax committed Jan 8, 2025
1 parent c8127f1 commit a9b4de5
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 98 deletions.
29 changes: 9 additions & 20 deletions sippy/rtp_proxy/session/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,12 @@ type Rtp_proxy_session struct {
rtpp_wi chan *rtpp_cmd
}

type rtpproxy_update_result struct {
rtpproxy_address string
rtpproxy_port string
family string
sendonly bool
}

type rtpp_cmd struct {
cmd string
cb func(string)
rtp_proxy_client sippy_types.RtpProxyClient
}

func (self *rtpproxy_update_result) Address() string {
return self.rtpproxy_address
}

func NewRtp_proxy_session(config sippy_conf.Config, rtp_proxy_clients []sippy_types.RtpProxyClient, call_id, from_tag, to_tag, notify_socket, notify_tag string, session_lock sync.Locker) (*Rtp_proxy_session, error) {
self := &Rtp_proxy_session{
notify_socket : notify_socket,
Expand All @@ -91,8 +80,6 @@ func NewRtp_proxy_session(config sippy_conf.Config, rtp_proxy_clients []sippy_ty
}
self.caller.otherside = &self.callee
self.callee.otherside = &self.caller
self.caller.owner = self
self.callee.owner = self
self.caller.session_exists = false
self.callee.session_exists = false
online_clients := []sippy_types.RtpProxyClient{}
Expand Down Expand Up @@ -140,7 +127,7 @@ func NewRtp_proxy_session(config sippy_conf.Config, rtp_proxy_clients []sippy_ty
result_callback(result)
*/
func (self *Rtp_proxy_session) PlayCaller(prompt_name string, times int/*= 1*/, result_callback func(string)/*= nil*/, index int /*= 0*/) {
self.caller._play(prompt_name, times, result_callback, index)
self.caller._play(prompt_name, times, result_callback, index, self)
}

func (self *Rtp_proxy_session) send_command(cmd string, cb func(string)) {
Expand Down Expand Up @@ -175,12 +162,14 @@ func (self *Rtp_proxy_session) cmd_done(res string) {
}

func (self *Rtp_proxy_session) StopPlayCaller(result_callback func(string)/*= nil*/, index int/*= 0*/) {
self.caller._stop_play(result_callback, index)
self.caller._stop_play(result_callback, index, self)
}

func (self *Rtp_proxy_session) StartRecording(rname/*= nil*/ string, result_callback func(string)/*= nil*/, index int/*= 0*/) {
if ! self.caller.session_exists {
self.caller.update("0.0.0.0", "0", func(*rtpproxy_update_result) { self._start_recording(rname, result_callback, index) }, "", index, "IP4")
up_cb := func(*UpdateResult, *Rtp_proxy_session, sippy_types.SipHandlingError) { self._start_recording(rname, result_callback, index) }
up := NewUpdateParams(self, index, up_cb)
self.caller.update(up)
return
}
self._start_recording(rname, result_callback, index)
Expand Down Expand Up @@ -220,12 +209,12 @@ func (self *Rtp_proxy_session) Delete() {
self._rtp_proxy_client = nil
}

func (self *Rtp_proxy_session) OnCallerSdpChange(sdp_body sippy_types.MsgBody, result_callback func(sippy_types.MsgBody)) error {
return self.caller._on_sdp_change(sdp_body, result_callback)
func (self *Rtp_proxy_session) OnCallerSdpChange(sdp_body sippy_types.MsgBody, result_callback sippy_types.OnDelayedCB) error {
return self.caller._on_sdp_change(self, sdp_body, result_callback)
}

func (self *Rtp_proxy_session) OnCalleeSdpChange(sdp_body sippy_types.MsgBody, result_callback func(sippy_types.MsgBody)) error {
return self.callee._on_sdp_change(sdp_body, result_callback)
func (self *Rtp_proxy_session) OnCalleeSdpChange(sdp_body sippy_types.MsgBody, result_callback sippy_types.OnDelayedCB) error {
return self.callee._on_sdp_change(self, sdp_body, result_callback)
}

func rtp_proxy_session_destructor(self *Rtp_proxy_session) {
Expand Down
137 changes: 59 additions & 78 deletions sippy/rtp_proxy/session/side.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
package rtp_proxy_session

import (
"math"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -40,7 +39,6 @@ import (

type _rtpps_side struct {
otherside *_rtpps_side
owner *Rtp_proxy_session
session_exists bool
laddress string
raddress *sippy_net.HostPort
Expand All @@ -51,106 +49,76 @@ type _rtpps_side struct {
to_tag string
}

func (self *_rtpps_side) _play(prompt_name string, times int, result_callback func(string), index int) {
func (self *_rtpps_side) _play(prompt_name string, times int, result_callback func(string), index int, rtpps *Rtp_proxy_session) {
if ! self.session_exists {
return
}
if ! self.otherside.session_exists {
self.otherside.update("0.0.0.0", "0", func(*rtpproxy_update_result) { self.__play(prompt_name, times, result_callback, index) }, "", index, "IP4")
up_cb := func(*UpdateResult, *Rtp_proxy_session, sippy_types.SipHandlingError) { self.__play(prompt_name, times, result_callback, index, rtpps) }
up := NewUpdateParams(rtpps, index, up_cb)
self.otherside.update(up)
return
}
self.__play(prompt_name, times, result_callback, index)
self.__play(prompt_name, times, result_callback, index, rtpps)
}

func (self *_rtpps_side) __play(prompt_name string, times int, result_callback func(string), index int) {
command := "P" + strconv.Itoa(times) + " " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + prompt_name + " " + self.codecs + " " + self.from_tag + " " + self.to_tag
self.owner.send_command(command, func(r string) { self.owner.command_result(r, result_callback) })
func (self *_rtpps_side) __play(prompt_name string, times int, result_callback func(string), index int, rtpps *Rtp_proxy_session) {
command := "P" + strconv.Itoa(times) + " " + rtpps.call_id + "-" + strconv.Itoa(index) + " " + prompt_name + " " + self.codecs + " " + self.from_tag + " " + self.to_tag
rtpps.send_command(command, func(r string) { rtpps.command_result(r, result_callback) })
}

func (self *_rtpps_side) update(remote_ip string, remote_port string, result_callback func(*rtpproxy_update_result), options/*= ""*/ string, index /*= 0*/int, atype /*= "IP4"*/string) {
func max(a, b int) int {
if a >= b {return a}
return b
}

func (self *_rtpps_side) update(up *UpdateParams) {
var sbind_supported, is_local, tnot_supported bool
var err error

command := "U"
self.owner.max_index = int(math.Max(float64(self.owner.max_index), float64(index)))
if sbind_supported, err = self.owner.SBindSupported(); err != nil {
up.rtpps.max_index = max(up.rtpps.max_index, up.index)
if sbind_supported, err = up.rtpps.SBindSupported(); err != nil {
return
}
if is_local, err = self.owner.IsLocal(); err != nil {
if is_local, err = up.rtpps.IsLocal(); err != nil {
return
}
if tnot_supported, err = self.owner.TNotSupported(); err != nil {
if tnot_supported, err = up.rtpps.TNotSupported(); err != nil {
return
}
if sbind_supported {
if self.raddress != nil {
//if self.owner.IsLocal() && atype == "IP4" {
//if self.owner.IsLocal() && up.atype == "IP4" {
// options += "L" + self.laddress
//} else if ! self.owner.IsLocal() {
// options += "R" + self.raddress.Host.String()
//}
options += "R" + self.raddress.Host.String()
up.options += "R" + self.raddress.Host.String()
} else if self.laddress != "" && is_local {
options += "L" + self.laddress
up.options += "L" + self.laddress
}
}
command += options
command += up.options
if self.otherside.session_exists {
command += " " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + remote_ip + " " + remote_port + " " + self.from_tag + " " + self.to_tag
command += " " + up.rtpps.call_id + "-" + strconv.Itoa(up.index) + " " + up.remote_ip + " " + up.remote_port + " " + self.from_tag + " " + self.to_tag
} else {
command += " " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + remote_ip + " " + remote_port + " " + self.from_tag
command += " " + up.rtpps.call_id + "-" + strconv.Itoa(up.index) + " " + up.remote_ip + " " + up.remote_port + " " + self.from_tag
}
if self.owner.notify_socket != "" && index == 0 && tnot_supported {
command += " " + self.owner.notify_socket + " " + self.owner.notify_tag
if up.rtpps.notify_socket != "" && up.index == 0 && tnot_supported {
command += " " + up.rtpps.notify_socket + " " + up.rtpps.notify_tag
}
self.owner.send_command(command, func(r string) { self.update_result(r, remote_ip, atype, result_callback) })
up.rtpps.send_command(command, func(r string) { self.update_result(r, up) })
}

func (self *_rtpps_side) update_result(result, remote_ip, atype string, result_callback func(*rtpproxy_update_result)) {
func (self *_rtpps_side) update_result(result string, up *UpdateParams) {
//print "%s.update_result(%s)" % (id(self), result)
//result_callback, face, callback_parameters = args
self.session_exists = true
if result == "" {
result_callback(nil)
return
}
t1 := strings.Fields(result)
if t1[0][0] == 'E' {
result_callback(nil)
return
}
rtpproxy_port, err := strconv.Atoi(t1[0])
if err != nil || rtpproxy_port == 0 {
result_callback(nil)
return
}
family := "IP4"
rtpproxy_address := ""
if len(t1) > 1 {
rtpproxy_address = t1[1]
if len(t1) > 2 && t1[2] == "6" {
family = "IP6"
}
} else {
if rtpproxy_address, err = self.owner.GetProxyAddress(); err != nil {
return
}
}
sendonly := false
if atype == "IP4" && remote_ip == "0.0.0.0" {
sendonly = true
} else if atype == "IP6" && remote_ip == "::" {
sendonly = true
}
result_callback(&rtpproxy_update_result{
rtpproxy_address : rtpproxy_address,
rtpproxy_port : t1[0],
family : family,
sendonly : sendonly,
})
up.ProcessRtppResult(result)
}

func (self *_rtpps_side) _on_sdp_change(sdp_body sippy_types.MsgBody, result_callback func(sippy_types.MsgBody)) error {
func (self *_rtpps_side) _on_sdp_change(rtpps *Rtp_proxy_session, sdp_body sippy_types.MsgBody, result_callback sippy_types.OnDelayedCB) error {
parsed_body, err := sdp_body.GetSdp()
if err != nil {
return err
Expand All @@ -165,7 +133,7 @@ func (self *_rtpps_side) _on_sdp_change(sdp_body sippy_types.MsgBody, result_cal
}
if len(sects) == 0 {
sdp_body.SetNeedsUpdate(false)
result_callback(sdp_body)
result_callback(sdp_body, nil)
return nil
}
formats := sects[0].GetMHeader().GetFormats()
Expand All @@ -180,24 +148,37 @@ func (self *_rtpps_side) _on_sdp_change(sdp_body sippy_types.MsgBody, result_cal
if sect.GetCHeader().GetAType() == "IP6" {
sect_options = "6" + options
}
self.update(sect.GetCHeader().GetAddr(), sect.GetMHeader().GetPort(),
func (res *rtpproxy_update_result) { self._sdp_change_finish(res, sdp_body, parsed_body, sect, &sections_left, result_callback) },
sect_options, i, sect.GetCHeader().GetAType())
up_cb := func (ur *UpdateResult, rtpps *Rtp_proxy_session, ex sippy_types.SipHandlingError) { self._sdp_change_finish(sdp_body, parsed_body, sect, &sections_left, result_callback, ur, rtpps, ex) }
up := NewUpdateParams(rtpps, i, up_cb)
up.remote_ip = sect.GetCHeader().GetAddr()
up.remote_port = sect.GetMHeader().GetPort()
up.atype = sect.GetCHeader().GetAType()
up.options = sect_options

self.update(up)
}
return nil
}

func (self *_rtpps_side) _sdp_change_finish(cb_args *rtpproxy_update_result, sdp_body sippy_types.MsgBody, parsed_body sippy_types.Sdp, sect *sippy_sdp.SdpMediaDescription, sections_left *int64, result_callback func(sippy_types.MsgBody)) {
if cb_args != nil {
func (self *_rtpps_side) _sdp_change_finish(sdp_body sippy_types.MsgBody, parsed_body sippy_types.Sdp, sect *sippy_sdp.SdpMediaDescription, sections_left *int64, result_callback sippy_types.OnDelayedCB, ur *UpdateResult, rtpps *Rtp_proxy_session, ex sippy_types.SipHandlingError) {
if ! sdp_body.NeedsUpdate() {
return
}
if ex != nil {
sdp_body.SetNeedsUpdate(false)
result_callback(nil, ex)
return
}
if ur != nil {
if self.after_sdp_change != nil {
self.after_sdp_change(cb_args)
self.after_sdp_change(ur)
}
sect.GetCHeader().SetAType(cb_args.family)
sect.GetCHeader().SetAddr(cb_args.rtpproxy_address)
sect.GetCHeader().SetAType(ur.family)
sect.GetCHeader().SetAddr(ur.rtpproxy_address)
if sect.GetMHeader().GetPort() != "0" {
sect.GetMHeader().SetPort(cb_args.rtpproxy_port)
sect.GetMHeader().SetPort(strconv.Itoa(ur.rtpproxy_port))
}
if cb_args.sendonly {
if ur.sendonly {
sect.RemoveAHeader("sendrecv")
if ! sect.HasAHeader([]string{ "recvonly", "sendonly", "inactive" }) {
sect.AddHeader("a", "sendonly")
Expand All @@ -212,7 +193,7 @@ func (self *_rtpps_side) _sdp_change_finish(cb_args *rtpproxy_update_result, sdp
// more work is in progress
return
}
if self.owner.insert_nortpp {
if rtpps.insert_nortpp {
parsed_body.AppendAHeader("nortpproxy=yes")
}
sdp_body.SetNeedsUpdate(false)
Expand All @@ -228,13 +209,13 @@ func (self *_rtpps_side) _sdp_change_finish(cb_args *rtpproxy_update_result, sdp
origin.SetAddress("192.0.2.1")
origin.SetAddressType("IP4")
origin.SetNetworkType("IN")
result_callback(sdp_body)
result_callback(sdp_body, nil)
}

func (self *_rtpps_side) _stop_play(cb func(string), index int) {
func (self *_rtpps_side) _stop_play(cb func(string), index int, rtpps *Rtp_proxy_session) {
if ! self.otherside.session_exists {
return
}
command := "S " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + self.from_tag + " " + self.to_tag
self.owner.send_command(command, func(r string) { self.owner.command_result(r, cb) })
command := "S " + rtpps.call_id + "-" + strconv.Itoa(index) + " " + self.from_tag + " " + self.to_tag
rtpps.send_command(command, func(r string) { rtpps.command_result(r, cb) })
}
Loading

0 comments on commit a9b4de5

Please sign in to comment.