diff --git a/.run/hboned.run.xml b/.run/hboned.run.xml new file mode 100644 index 0000000..747b11c --- /dev/null +++ b/.run/hboned.run.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.run/ugate-alice.run.xml b/.run/ugate-alice.run.xml index c0ea592..456bc11 100644 --- a/.run/ugate-alice.run.xml +++ b/.run/ugate-alice.run.xml @@ -9,7 +9,7 @@ - + diff --git a/.run/ugate-bob.run.xml b/.run/ugate-bob.run.xml index 41f4d8a..50fa2a8 100644 --- a/.run/ugate-bob.run.xml +++ b/.run/ugate-bob.run.xml @@ -9,7 +9,7 @@ - + diff --git a/cmd/hbone/hbone.go b/cmd/hbone/hbone.go index a0ba072..fd2ffe0 100644 --- a/cmd/hbone/hbone.go +++ b/cmd/hbone/hbone.go @@ -2,36 +2,32 @@ package main import ( "flag" - "io" "log" - "net/http" + "net" "os" - "github.com/costinm/ugate" "github.com/costinm/ugate/pkg/auth" "github.com/costinm/ugate/pkg/ugatesvc" ) var ( - // WIP: - // port = flag.Int("l", 0, "local port") - //debugPort = flag.Int("d", 0, "debug/status port") + port = flag.String("l", "", "local port") + tls = flag.Bool("s", false, "mTLS over hbone") ) - -var hc *http.Client - // Create a HBONE tunnel to a given URL. // -// Current client is authenticated for HBONE using local credentials, or a kube.json file. -// If no kube.json is found, one will be generated. +// Current client is authenticated for HBONE using local credentials, +// or a kube.json file. If no certs or kube.json is found, one will be generated. // // Example: // ssh -v -o ProxyCommand='hbone https://c1.webinf.info:443/dm/PZ5LWHIYFLSUZB7VHNAMGJICH7YVRU2CNFRT4TXFFQSXEITCJUCQ:22' root@PZ5LWHIYFLSUZB7VHNAMGJICH7YVRU2CNFRT4TXFFQSXEITCJUCQ +// ssh -v -o ProxyCommand='hbone https://%h:443/hbone/:22' root@fortio.app.run +// +// Note that SSH is converting %h to lowercase - the ID must be in this form // -// Bug: %h:%p doesn't work, ssh uses lower case and confuses the map. func main() { flag.Parse() @@ -40,47 +36,34 @@ func main() { ug := ugatesvc.New(config, authz, nil) - hc = &http.Client{ - Transport: ug, - } - if len(flag.Args()) == 0 { log.Fatal("Expecting URL") } url := flag.Arg(0) - err := Netcat(ug, url) - if err != nil { - log.Fatal(err) - } -} -// Netcat copies stdin/stdout to a HBONE stream. -func Netcat(ug *ugatesvc.UGate, s string) error { - i, o := io.Pipe() - r, _ := http.NewRequest("POST", s, i) - res, err := ug.RoundTrip(r) - if err != nil { - return err - } - nc := ugate.NewStreamRequestOut(r, o, res, nil) - go func() { - b1 := make([]byte, 1024) + if *port != "" { + l, err := net.Listen("tcp", *port) + if err != nil { + panic(err) + } for { - n, err := nc.Read(b1) + a, err := l.Accept() if err != nil { - log.Fatal("Tun read err", err) + panic(err) } - os.Stdout.Write(b1[0:n]) - } - }() - b1 := make([]byte, 1024) - for { - n, err := os.Stdin.Read(b1) - if err != nil { - return err + go func() { + err := ugatesvc.HboneCat(ug, url, *tls, a, a) + if err != nil { + log.Println(err) + } + }() } - nc.Write(b1[0:n]) } - return nil + + err := ugatesvc.HboneCat(ug, url, *tls, os.Stdin, os.Stdout) + if err != nil { + log.Fatal(err) + } } + diff --git a/cmd/hboned/hboned.go b/cmd/hboned/hboned.go new file mode 100644 index 0000000..69c5865 --- /dev/null +++ b/cmd/hboned/hboned.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + + "github.com/costinm/ugate" + "github.com/costinm/ugate/pkg/iptables" + "github.com/costinm/ugate/pkg/socks" + "github.com/costinm/ugate/pkg/ugatesvc" +) + + +// +// WIP: +// - Listens as H2C, using certs generated by krun +// - Forwards /hbone/PORT to port +// - Handles /hbone/mtls as mtls and forwards to 8080 as H2C +// - intercepts egress with iptables, forwards to a gate. +// +func main() { + config := ugatesvc.NewConf(".", "./var/lib/dmesh", + "./var/run/secrets/istio.io", "/var/run/secrets/istio.io") + + cfg := &ugate.GateCfg{ + BasePort: 14000, + } + + // Start a Gate. Basic H2 and H2R services enabled. + ug := ugatesvc.New(config, nil, cfg) + + // Inbound path + // TODO: create reverse tunnel to ugate + // go ug.H2Handler.UpdateReverseAccept() + + // Ingress using H2C, for example in CloudRun. + btscAddr := fmt.Sprintf("0.0.0.0:%d", cfg.BasePort+ugate.PORT_BTSC) + ug.StartListener(&ugate.Listener{ + Address: btscAddr, + Protocol: ugate.ProtoBTSC, + }) + + // WIP: egress side + // capture using iptables, socks, port + go iptables.IptablesCapture(ug, fmt.Sprintf("0.0.0.0:%d", cfg.BasePort + 1), false) + go iptables.IptablesCapture(ug, fmt.Sprintf("0.0.0.0:%d", cfg.BasePort + 6), true) + socks.New(ug) + ug.Config.Listeners[":10001"] = &ugate.Listener { + ForwardTo: "localhost:5201", + } + ug.Config.Listeners[":10002"] = &ugate.Listener { + ForwardTo: "localhost:8080", + } + + // + // TODO: get out endpoints using a metadata server ( backed by pilot-agent ) + // or XDS. + // TODO: configure an multi-network gateway, with SNI or BTS + + // VIPs for iptables capture test. Redirect to local host, to bench + // 1-hop iptables capture, in TCP mode + ug.Config.Routes["10.1.1.2:8080"] = &ugate.Route { + ForwardTo: "localhost:8080", + } + ug.Config.Routes[":8081"] = &ugate.Route { + ForwardTo: "localhost:8080", + } + ug.Config.Routes["10.1.1.3:5201"] = &ugate.Route { + ForwardTo: "localhost:5201", + } + + ug.Start() + + select {} +} diff --git a/cmd/ugate/go.mod b/cmd/ugate/go.mod index 6cc6d79..5c0e21f 100644 --- a/cmd/ugate/go.mod +++ b/cmd/ugate/go.mod @@ -10,6 +10,8 @@ replace github.com/costinm/ugate/ext/bootstrap => ../../ext/bootstrap replace github.com/costinm/ugate/ext/quic => ../../ext/quic +replace github.com/costinm/ugate/ext/h2r => ../../ext/h2r + replace github.com/costinm/ugate/ext/ssh => ../../ext/ssh require ( diff --git a/cmd/ugate/go.sum b/cmd/ugate/go.sum index 231252d..75b74c6 100644 --- a/cmd/ugate/go.sum +++ b/cmd/ugate/go.sum @@ -125,6 +125,7 @@ github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPg github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57 h1:eqyIo2HjKhKe/mJzTG8n4VqvLXIOEG+SLdDqX7xGtkY= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= @@ -189,16 +190,19 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86 h1:D6paGObi5Wu github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab h1:eFXv9Nu1lGbrNbj619aWwZfVF5HBrm9Plte8aNptuTI= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.16.1 h1:foqVmeWDD6yYpK+Yz3fHyNIxFYNxswxqNFjSKe+vI54= github.com/onsi/ginkgo v1.16.1/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= github.com/onsi/gomega v1.11.0 h1:+CqWgvj0OZycCaqclBD1pxKHAU+tOkHmQIWvDHq2aug= @@ -256,6 +260,7 @@ github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M github.com/pion/webrtc/v3 v3.0.8/go.mod h1:C5uzSMa9sGCtfVPLA+pB0eWoW/exZ0OV0KW7JJbkvp0= github.com/pion/webrtc/v3 v3.0.25 h1:AOsku4YIegyrGD6CQFDDGvNeS1p8KnwWlnDWaY3yTTM= github.com/pion/webrtc/v3 v3.0.25/go.mod h1:Qx9zd4xvIeFTN1hygyJ77XVi/YbElyjVitL6KyCEIpE= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -330,6 +335,7 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= @@ -354,6 +360,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= @@ -369,6 +376,7 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -481,6 +489,7 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -529,6 +538,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -541,6 +551,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/cmd/ugate/ugate_test.go b/cmd/ugate/ugate_test.go index 705e6c9..b0029b4 100644 --- a/cmd/ugate/ugate_test.go +++ b/cmd/ugate/ugate_test.go @@ -18,7 +18,7 @@ func TestFull(t *testing.T) { if err != nil { t.Fatal(err) } - alice.Add(&ugate.Listener{ + alice.StartListener(&ugate.Route{ Address: fmt.Sprintf("0.0.0.0:%d", 14011), Protocol: "tls", Handler: &ugatesvc.EchoHandler{}, @@ -29,7 +29,7 @@ func TestFull(t *testing.T) { bob, err := Run(config, &ugate.GateCfg{ BasePort: 14100, }) - bob.Add(&ugate.Listener{ + bob.StartListener(&ugate.Route{ Address: fmt.Sprintf("0.0.0.0:%d", 14111), Protocol: "tls", Handler: &ugatesvc.EchoHandler{}, diff --git a/cmd/ugatemin/ugatemin.go b/cmd/ugatemin/ugatemin.go index c4295ae..36bd49e 100644 --- a/cmd/ugatemin/ugatemin.go +++ b/cmd/ugatemin/ugatemin.go @@ -35,7 +35,7 @@ func main() { } // direct TCP connect to local iperf3 and fortio (or HTTP on default port) - ug.Add(&ugate.Listener{ + ug.StartListener(&ugate.Route{ Address: ":12011", ForwardTo: "localhost:5201", }) diff --git a/dns/dns.go b/dns/dns.go index b1b76a7..025b826 100644 --- a/dns/dns.go +++ b/dns/dns.go @@ -297,11 +297,11 @@ func (s *DmDns) DNSOverTCP(in io.ReadCloser, out io.Writer) error { // // Wrapps the real process method with stats gathering and builds a reverse map of IP to names func (s *DmDns) Process(req *dns.Msg) *dns.Msg { - //ClientMetrics.Total.Add(1) + //ClientMetrics.Total.StartListener(1) if len(req.Question) == 0 { m := new(dns.Msg) m.SetRcode(req, dns.RcodeServerFailure) - //ClientMetrics.Errors.Add(1) + //ClientMetrics.Errors.StartListener(1) return m } @@ -341,12 +341,12 @@ func (s *DmDns) Process(req *dns.Msg) *dns.Msg { if d > 1*time.Second { log.Println("DNS: ", req.Question[0].Name, d, res.Answer) } - //ClientMetrics.Latency.Add(time.Since(t0).Seconds()) + //ClientMetrics.Latency.StartListener(time.Since(t0).Seconds()) } if res == nil { m := new(dns.Msg) m.SetRcode(req, dns.RcodeServerFailure) - //ClientMetrics.Errors.Add(1) + //ClientMetrics.Errors.StartListener(1) return res } @@ -428,7 +428,7 @@ func (s *DmDns) process(req *dns.Msg) *dns.Msg { } //log.Printf("DNSE: '%s' %v", name, err) - //ClientMetrics.Errors.Add(1) + //ClientMetrics.Errors.StartListener(1) m := new(dns.Msg) m.SetRcode(req, dns.RcodeServerFailure) diff --git a/dns/dns_http.go b/dns/dns_http.go index 5be8e84..f9022ec 100644 --- a/dns/dns_http.go +++ b/dns/dns_http.go @@ -49,7 +49,7 @@ func sendRes(w http.ResponseWriter, res *dns.Msg, r *http.Request, m []byte) { ttl = t } } - //ServerMetrics.Total.Add(1) + //ServerMetrics.Total.StartListener(1) w.Header().Add("cache-control", "max-age="+strconv.Itoa(int(ttl))) resB, _ := res.PackBuffer(m) @@ -97,7 +97,7 @@ func (s *DmDns) ServeHTTP(w http.ResponseWriter, r *http.Request) { sendRes(w, res, r, m) if len(res.Answer) > 0 { //log.Println("HTTP DNS ", req.Question[0].Name, time.Since(t0)) - //ServerMetrics.Latency.Add(time.Since(t0).Seconds()) + //ServerMetrics.Latency.StartListener(time.Since(t0).Seconds()) } return } @@ -132,12 +132,12 @@ func (s *DmDns) ForwardHttp(req *dns.Msg) (*dns.Msg, error) { hreq = hreq.WithContext(ctx) res, err := s.H2.Do(hreq) if err != nil { - //ClientMetrics.Errors.Add(1) + //ClientMetrics.Errors.StartListener(1) //log.Print("DNS Error from http ", err) return nil, err } if res.StatusCode != 200 { - //ClientMetrics.Errors.Add(1) + //ClientMetrics.Errors.StartListener(1) log.Println("DNS Error from http ", s.BaseUrl, req.Question[0].Name, res.StatusCode) return nil, errors.New("Error") } diff --git a/docs/rate_protected.md b/docs/rate_protected.md new file mode 100644 index 0000000..db18ae0 --- /dev/null +++ b/docs/rate_protected.md @@ -0,0 +1,40 @@ + + +Assumptions: +- load tests against the workload in the given CPU/memory alloc shows it is capable +to handle 200 QPS with the required QOS (latency, etc). +- the service contains workloads in multiple clusters/VMs, with diverse +CPU allocs. + +Istio and other client load balancers can handle weighted load distribution, +but it is extremely hard to prevent distributed clients from creating more +than 200 QPS on a specific workload. + +The workload's sidecar - may be Istio Envoy or a lighter sidecer or 'native' +proxyless gRPC - can track exactly how many requests are active - as well +as the latency and CPU characteristics of the workload. +It is the best place to determine if the workload can handle the load - and +shed or rebalance the load. + +# Identity problem + +The main problem preventing Istio sidecar ingress (and alike systems) from +forwarding the request to another instance + +# Cross-replica communication + +Workloads may report the load to the control plane - and this can be +aggregated and distributed to all clients, resulting in a better +distribution of load. However the process is slow and expensive - it +needs to be spread to all clients. + +Having the load sharedacross the backend replicas is more efficient, +typically the number of endpoints in a service is smaller than the number +of client workload that may call the service. + +A side benefit of creating a cross-replica communication system is that +it may allow the replicas to exchange other information and implement +things like sharding and stickyness - a request can go to any replica +and be re-forwarded to the correct backend. + + diff --git a/ext/bootstrap/ugate_ssh.go b/ext/bootstrapx/ugate_ssh.go similarity index 94% rename from ext/bootstrap/ugate_ssh.go rename to ext/bootstrapx/ugate_ssh.go index 19374b0..4784348 100644 --- a/ext/bootstrap/ugate_ssh.go +++ b/ext/bootstrapx/ugate_ssh.go @@ -1,6 +1,6 @@ // +build !MIN -package bootstrap +package bootstrapx import ( "github.com/costinm/ugate/ext/ssh" diff --git a/ext/cfquiche/quiche.go b/ext/cfquiche/quiche.go index 9ed556e..331bed1 100644 --- a/ext/cfquiche/quiche.go +++ b/ext/cfquiche/quiche.go @@ -18,7 +18,7 @@ type Quiche struct { } -func (q Quiche) DialMux(ctx context.Context, node *ugate.DMNode, meta http.Header, ev func(t string, stream *ugate.Stream)) (ugate.Muxer, error) { +func (q Quiche) DialMux(ctx context.Context, node *ugate.DMNode, meta http.Header, ev func(t string, stream *ugate.Conn)) (ugate.Muxer, error) { //quiche.EnableDebugLogging() config, err := newConfig(quiche.ProtocolVersion) diff --git a/ext/gvisor/tun_capture_gvisor.go b/ext/gvisor/tun_capture_gvisor.go index 556fedb..a5bff5a 100644 --- a/ext/gvisor/tun_capture_gvisor.go +++ b/ext/gvisor/tun_capture_gvisor.go @@ -457,7 +457,7 @@ func (nt *GvisorTun) defUdpServer() error { for { // Will have the peer address //ep.SetSockOpt() - // Add is send address. Control should include the dest addr ( for raw ) + // StartListener is send address. Control should include the dest addr ( for raw ) bb := &bytes.Buffer{} rr, err := ep.Read(bb, ro) //v, _, err := ep.(UdpLocalReader).ReadLocal(&add) diff --git a/ext/h2r/h2r.go b/ext/h2r/h2r.go index 54805fa..10f33d0 100644 --- a/ext/h2r/h2r.go +++ b/ext/h2r/h2r.go @@ -84,7 +84,7 @@ func New(ug *ugatesvc.UGate) *H2R { type H2RMux struct { *http2.ClientConn - tlsStr *ugate.Stream + tlsStr *ugate.Conn dm *ugate.DMNode // Raw frame support @@ -98,7 +98,7 @@ type H2RMux struct { // DialMUX creates one connection to a mesh node, using one of the // supported multiplex protocols. -func (t *H2R) DialMux(ctx context.Context, dm *ugate.DMNode, meta http.Header, ev func(t string, stream *ugate.Stream)) (ugate.Muxer, error) { +func (t *H2R) DialMux(ctx context.Context, dm *ugate.DMNode, meta http.Header, ev func(t string, stream *ugate.Conn)) (ugate.Muxer, error) { // TODO: try all published addresses, including all protos addr := dm.Addr @@ -238,9 +238,14 @@ func (t *H2R) HandleH2R(w http.ResponseWriter, r *http.Request) { // //}() + //t.ug.RegisterEndpoint(n.ID) + // Wait until t.MarkDead is called - or the con is closed <-end + // TODO: + //t.ug.UnRegisterEndpoint(n.ID) + n.Muxer = nil return } diff --git a/ext/h2r/h2raw.go b/ext/h2r/h2raw.go index b234743..845b5de 100644 --- a/ext/h2r/h2raw.go +++ b/ext/h2r/h2raw.go @@ -175,7 +175,7 @@ func (h2s *H2RMux) stream(id uint32) *H2Stream { // H2Stream is a multiplexed stream. type H2Stream struct { - meta *ugate.Stream + meta *ugate.Conn id *uint32 s *H2RMux diff --git a/ext/quic/mux.go b/ext/quic/mux.go index 997fdf4..ad6a95d 100644 --- a/ext/quic/mux.go +++ b/ext/quic/mux.go @@ -62,7 +62,7 @@ func (q *Quic) handleRaw(qs quic.Stream) { } -func (ugs *QuicMUX) DialStream(ctx context.Context, addr string, inStream *ugate.Stream) (*ugate.Stream, error) { +func (ugs *QuicMUX) DialStream(ctx context.Context, addr string, inStream *ugate.Conn) (*ugate.Conn, error) { //if UseRawStream { s, err := ugs.s.OpenStream() if err != nil { diff --git a/ext/quic/quic.go b/ext/quic/quic.go index 946e119..56285b4 100644 --- a/ext/quic/quic.go +++ b/ext/quic/quic.go @@ -142,7 +142,7 @@ func (q *Quic) quicConfig() *quic.Config { } } -func (qd *Quic) DialMux(ctx context.Context, node *ugate.DMNode, meta http.Header, ev func(t string, stream *ugate.Stream)) (ugate.Muxer, error) { +func (qd *Quic) DialMux(ctx context.Context, node *ugate.DMNode, meta http.Header, ev func(t string, stream *ugate.Conn)) (ugate.Muxer, error) { tlsConf := &tls.Config{ // VerifyPeerCertificate used instead InsecureSkipVerify: true, @@ -184,7 +184,7 @@ func (qd *Quic) DialMux(ctx context.Context, node *ugate.DMNode, meta http.Heade // // // TODO: use MASQUE ( with extension headers ? ) // initReq, _ := http.NewRequest("GET", "https://"+node.ID+"/_dm/id/Q/"+qd.Auth.ID, nil) - // initReq.Header.Add("authorization", tok) + // initReq.Header.StartListener("authorization", tok) // res, err := rt.RoundTrip(initReq) // if err != nil { // return nil, err diff --git a/ext/webrtc/rtc.go b/ext/webrtc/rtc.go index cd0fc51..663f97f 100644 --- a/ext/webrtc/rtc.go +++ b/ext/webrtc/rtc.go @@ -8,7 +8,7 @@ import ( "github.com/pion/turn/v2" ) -// Add: +// StartListener: // - TURN - seems a much simpler and broader mechanism to create remote listeners // TCP is problematic, but QUIC seems possible. diff --git a/pkg/auth/auth.go b/pkg/auth/auth.go index 1b1a4a2..30f075b 100644 --- a/pkg/auth/auth.go +++ b/pkg/auth/auth.go @@ -84,6 +84,8 @@ type Auth struct { // Explicit certificates (lego), key is hostname from file // CertMap map[string]*tls.Certificate + + Trusted [][]byte } var certValidityPeriod = 100 * 365 * 24 * time.Hour @@ -161,6 +163,11 @@ func NewAuth(cs ugate.ConfStore, name, domain string) *Auth { edpub := PublicKey(priv) auth.PublicKey = edpub.(ed25519.PublicKey) pubkey = edpub + } else if priv, ok := pk.(*rsa.PrivateKey); ok { + edpub := PublicKey(priv) + auth.Priv = MarshalPrivateKey(priv) + auth.PublicKey = MarshalPublicKey(priv.Public()) + pubkey = edpub } auth.VIP6 = Pub2VIP(auth.PublicKey) @@ -409,6 +416,42 @@ func (auth *Auth) loadAuthCfg() { if auth.Config == nil { return } + rsaKey, _ := auth.Config.Get("key.pem") + rsaCert, _ := auth.Config.Get("cert-chain.pem") + // TODO: multiple roots + rootCert, _ := auth.Config.Get("root-cert.pem") + if rsaKey != nil && rsaCert != nil { + tlsCert, err := tls.X509KeyPair(rsaCert, rsaKey) + if err != nil { + log.Println("Invalid Istio cert ", err) + } else { + auth.Cert = &tlsCert + if rootCert != nil { + auth.Trusted = append(auth.Trusted, rootCert) + } + for n, c := range tlsCert.Certificate { + cert, err := x509.ParseCertificate(c) + if err != nil { + log.Println("Invalid Istio cert ", err) + continue + } + if n == 0 && len(cert.URIs) > 0 { + log.Println("ID ", cert.URIs[0], cert.Issuer, + cert.NotAfter) + // TODO: get cert fingerprint as well + + //log.Println("Cert: ", cert) + // TODO: extract domain, ns, name + } else { + // org and name are set + log.Println("Cert: ", cert.Subject.Organization, cert.NotAfter) + } + + } + return + } + } + // Single file - more convenient for upload // Java supports PKCS12 ( p12, pfx) kcfg, _ := auth.Config.Get("kube.json") @@ -446,10 +489,6 @@ func (auth *Auth) loadAuthCfg() { } // Load the primary cert - expects a PEM key file -// This will initialize all supported types of keys - current identity -// is based on ED25519. -// EC256 will be used for Webpush compat -// RSA for Istio or legacy compat. // // Rejected formats: // - PKCS12 (p12, pfx) - supported by Java. Too complex. @@ -466,7 +505,6 @@ func (auth *Auth) loadCert() error { // } // auth.EC256Cert = &tlsCert //} - //edKey, _ := auth.Config.Get("ed25519-key.pem") //edCert, _ := auth.Config.Get("ed25519-cert.pem") //if edKey != nil && edCert != nil { @@ -476,16 +514,6 @@ func (auth *Auth) loadCert() error { // } // auth.ED25519Cert = &tlsCert //} - // - //rsaKey, _ := auth.Config.Get("rsa-key.pem") - //rsaCert, _ := auth.Config.Get("rsa-cert.pem") - //if rsaKey != nil && rsaCert != nil { - // tlsCert, err := tls.X509KeyPair(rsaCert, rsaKey) - // if err != nil { - // return err - // } - // auth.RSACert = &tlsCert - //} return nil } @@ -746,18 +774,17 @@ func (auth *Auth) signCertDER(pub crypto.PublicKey, caPrivate crypto.PrivateKey, // Generate and save the primary self-signed Certificate func (auth *Auth) generateSelfSigned(prefix string, priv crypto.PrivateKey, sans ...string) (tls.Certificate, []byte, []byte) { - // Sign with the private key. + return auth.SignCert(priv, priv, sans...) +} + +func (auth *Auth) SignCert(priv crypto.PrivateKey, ca crypto.PrivateKey, sans ...string) (tls.Certificate, []byte, []byte) { pub := PublicKey(priv) - certDER := auth.signCertDER(pub, priv, sans...) + certDER := auth.signCertDER(pub, ca, sans...) certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) ecb, _ := x509.MarshalPKCS8PrivateKey(priv) keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: ecb}) - //if auth.Config != nil { - // auth.Config.Set(prefix+"-key.pem", keyPEM) - // auth.Config.Set(prefix+"-cert.pem", certPEM) - //} tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) if err != nil { log.Println("Error generating cert ", err) @@ -786,6 +813,15 @@ func MarshalPublicKey(key crypto.PublicKey) []byte { return nil } +func MarshalPrivateKey(key crypto.PrivateKey) []byte { + if k, ok := key.(*rsa.PrivateKey); ok { + bk := x509.MarshalPKCS1PrivateKey(k) + return bk + } + + return nil +} + // Convert a PublicKey to a marshalled format - in the raw format. // - 32 byte ED25519 // - 65 bytes EC256 ( 0x04 prefix ) diff --git a/pkg/auth/encrypt.go b/pkg/auth/encrypt.go index 3fb05db..1e476e0 100644 --- a/pkg/auth/encrypt.go +++ b/pkg/auth/encrypt.go @@ -390,7 +390,7 @@ func hkdf(salt, ikm, info []byte, length int) []byte { // Encrypt the plaintext message using AES128/GCM func encrypt128(plaintext, key, nonce []byte) ([]byte, error) { - // Add padding. There is a uint16 size followed by that number of bytes of + // StartListener padding. There is a uint16 size followed by that number of bytes of // padding. // TODO: Right now we leave the size at zero. We should add a padding option // that allows the payload size to be obscured. diff --git a/pkg/cfgfs/conf.go b/pkg/cfgfs/conf.go index 0c5b05a..5154a03 100644 --- a/pkg/cfgfs/conf.go +++ b/pkg/cfgfs/conf.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "log" "os" + "path/filepath" "strings" "sync" @@ -80,7 +81,7 @@ func (c *Conf) Get(name string) ([]byte, error) { } for _, b := range c.base { - l := b + name + l := filepath.Join(b, name) if _, err := os.Stat(l); err == nil { // || !os.IsNotExist(err) res, err := ioutil.ReadFile(l) diff --git a/pkg/http_proxy/httpproxy_capture.go b/pkg/http_proxy/httpproxy_capture.go index d0d4f8d..3e1b26e 100644 --- a/pkg/http_proxy/httpproxy_capture.go +++ b/pkg/http_proxy/httpproxy_capture.go @@ -138,7 +138,7 @@ func (gw *HTTPGate) handleConnect(w http.ResponseWriter, r *http.Request) { gw.gw.OnStream(str) str.Dest = host - str.Egress = true + str.Direction = ugate.StreamTypeOut str.PostDialHandler = func(conn net.Conn, err error) { if err != nil { w.WriteHeader(503) diff --git a/pkg/iptables/iptables.go b/pkg/iptables/iptables.go index dd86271..fb07791 100644 --- a/pkg/iptables/iptables.go +++ b/pkg/iptables/iptables.go @@ -15,12 +15,17 @@ import ( // func IptablesCapture(ug *ugatesvc.UGate, addr string, in bool) error { - // For http proxy we need a dedicated plain HTTP port nl, err := net.Listen("tcp", addr) if err != nil { log.Println("Failed to listen", err) return err } + ipo := &IptablesOut{ + Gate: ug, + } + ipi := &IptablesIn{ + Gate: ug, + } for { remoteConn, err := nl.Accept() ugate.VarzAccepted.Add(1) @@ -35,40 +40,76 @@ func IptablesCapture(ug *ugatesvc.UGate, addr string, in bool) error { log.Println("Accept error, closing iptables listener ", err) return err } - go handleAcceptedConn(ug, remoteConn, in) + + go func() { + str := ugate.GetStream(remoteConn, remoteConn) + ug.OnStream(str) + defer ug.OnStreamDone(str) + + if in { + ipi.Handle(str) + } else { + ipo.Handle(str) + } + }() } return nil } -// Mirroring handleAcceptedConn in UGate -func handleAcceptedConn(ug *ugatesvc.UGate, acceptedCon net.Conn, in bool) { - bconn := ugate.GetStream(acceptedCon, acceptedCon) - ug.OnStream(bconn) - defer ug.OnStreamDone(bconn) - str := bconn + +type IptablesOut struct { + Gate *ugatesvc.UGate +} + +type IptablesIn struct { + Gate *ugatesvc.UGate +} + +func (ipo *IptablesOut) Handle(str *ugate.Conn) error { + str.Dest, str.ReadErr = SniffIptables(str) + if str.ReadErr != nil { + return str.ReadErr + } + + // str.Dest is a VIP or real IP. It will be mapped to a real + // destination. + cfg := ipo.Gate.FindRouteOut(str) + if cfg.ForwardTo != "" { + str.Dest = cfg.ForwardTo + } + + str.Route = cfg + str.Type = cfg.Protocol + str.Direction = ugate.StreamTypeOut + + str.ReadErr = ipo.Gate.HandleStream(str) + return str.ReadErr +} + +// Similar with Istio ingress capture. Original DST is the intended +// addr and port. +func (ipo *IptablesIn) Handle(str *ugate.Conn) error { //case ugate.ProtoIPTablesIn: // // iptables is replacing the conn - process before creating the buffer // DestAddr is also set as a sideeffect str.Dest, str.ReadErr = SniffIptables(str) - if str.ReadErr != nil { - return + return str.ReadErr } - cfg := ug.FindCfgIptablesIn(bconn) + // Local routes - redirect or additional manipulation. + cfg := ipo.Gate.FindRouteIn(str) if cfg.ForwardTo != "" { str.Dest = cfg.ForwardTo } - str.Listener = cfg + str.Route = cfg str.Type = cfg.Protocol + str.Direction = ugate.StreamTypeIn - if !in { - str.Egress = true - } - - str.ReadErr = ug.HandleStream(str) + str.ReadErr = ipo.Gate.HandleStream(str) + return str.ReadErr } // Status: @@ -86,7 +127,7 @@ func handleAcceptedConn(ug *ugatesvc.UGate, acceptedCon net.Conn, in bool) { // https://github.com/ryanchapman/go-any-proxy/blob/master/any_proxy.go, // and other examples. // Based on REDIRECT. -func SniffIptables(str *ugate.Stream) (string, error) { +func SniffIptables(str *ugate.Conn) (string, error) { if _, ok := str.Out.(*net.TCPConn); !ok { return "", errors.New("invalid connection for iptbles") } diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go index 2ed4473..1440243 100644 --- a/pkg/pipe/pipe.go +++ b/pkg/pipe/pipe.go @@ -15,7 +15,7 @@ import ( // Useful utility from http2 package. // b is a bytes.RBuffer or databuffer. -// Deprecated, useless - no back pressure / flow control. +// No back pressure / flow control. // Pipe is a goroutine-safe io.Reader/io.Writer pair. It's like // io.Pipe except there are no PipeReader/PipeWriter halves, and the @@ -36,6 +36,11 @@ func New() *Pipe { return &Pipe{Buffer: new(bytes.Buffer)} } +func NewPipe() (*Pipe, *Pipe) { + p := &Pipe{Buffer: new(bytes.Buffer)} + return p, p +} + type PipeBuffer interface { Len() int io.Writer diff --git a/pkg/socks/socks5_capture.go b/pkg/socks/socks5_capture.go index de7147e..9f17a24 100644 --- a/pkg/socks/socks5_capture.go +++ b/pkg/socks/socks5_capture.go @@ -84,15 +84,13 @@ const ( */ func New(ug *ugatesvc.UGate) { p := ug.Config.BasePort+ugate.PORT_SOCKS - ll := &ugatesvc.PortListener{ - Listener: ugate.Listener{ - Address: fmt.Sprintf("127.0.0.1:%d", p), - Protocol: ugate.ProtoSocks, - }, + ll := &ugate.Listener{ + Address: fmt.Sprintf("127.0.0.1:%d", p), + Protocol: ugate.ProtoSocks, PortHandler: &Socks{ug: ug}, } - err := ll.Start(ug) + err := ug.StartListener(ll) if err != nil { log.Println("Failed to start SOCKS, continue ", err) } @@ -106,9 +104,8 @@ func (s *Socks) String() string { return "socks" } -func (s *Socks) Handle(bconn *ugate.Stream) error{ - - bconn.Egress = true +func (s *Socks) Handle(bconn *ugate.Conn) error{ + bconn.Direction = ugate.StreamTypeOut _, bconn.ReadErr = Unmarshal(bconn) if bconn.ReadErr != nil { @@ -120,7 +117,7 @@ func (s *Socks) Handle(bconn *ugate.Stream) error{ return bconn.ReadErr } -func Unmarshal(s *ugate.Stream) (done bool, err error) { +func Unmarshal(s *ugate.Conn) (done bool, err error) { // Fill the read buffer with one Read. // Typically 3-4 bytes unless client is eager. diff --git a/pkg/udp/udpproxy.go b/pkg/udp/udpproxy.go index 67bc537..82ea766 100644 --- a/pkg/udp/udpproxy.go +++ b/pkg/udp/udpproxy.go @@ -123,7 +123,7 @@ type UdpNat struct { // External address DestAddr *net.UDPAddr - //ugate.Stream + //ugate.Conn // bound to a local port (on the real network). UDP *net.UDPConn @@ -425,7 +425,7 @@ func (udpg *UDPGate) HandleUdp(dstAddr net.IP, dstPort uint16, localAddr net.IP, UDP: udpCon, } - l := udpg.cfg.FindListener(dstAddr, dstPort, "udp://") + l := udpg.cfg.FindRoutePrefix(dstAddr, dstPort, "udp://") if l.ForwardTo != "" { udpN.DestAddr, err = net.ResolveUDPAddr("udp", l.ForwardTo) if err != nil { diff --git a/pkg/uds/stream.go b/pkg/uds/stream.go index 9785c8b..175a15c 100644 --- a/pkg/uds/stream.go +++ b/pkg/uds/stream.go @@ -35,7 +35,7 @@ import ( // first line: METHOD param:val ... // 'subject', ' -//type Stream struct { +//type Conn struct { // Reader *bufio.Reader // // delim byte diff --git a/pkg/uds/uds.go b/pkg/uds/uds.go index 63ad763..7b172d9 100644 --- a/pkg/uds/uds.go +++ b/pkg/uds/uds.go @@ -485,7 +485,7 @@ func (uds *UdsConn) File() *os.File { return fd } -func processUnixConn(bc *ugate.Stream) error { +func processUnixConn(bc *ugate.Conn) error { uc, ok := bc.Out.(*net.UnixConn) if !ok { return errors.New("Unexpected con") diff --git a/pkg/uds/uds_test.go b/pkg/uds/uds_test.go index 9d19bb9..e714def 100644 --- a/pkg/uds/uds_test.go +++ b/pkg/uds/uds_test.go @@ -8,7 +8,7 @@ import ( "strings" "testing" - "github.com/costinm/ugate/pkg/msgs" + msgs "github.com/costinm/ugate/webpush" ) func TestUDS(t *testing.T) { diff --git a/pkg/ugatesvc/accept.go b/pkg/ugatesvc/accept.go index 492121e..55aff56 100644 --- a/pkg/ugatesvc/accept.go +++ b/pkg/ugatesvc/accept.go @@ -1,6 +1,7 @@ package ugatesvc import ( + "fmt" "log" "net" "strings" @@ -41,7 +42,7 @@ import ( // New style, based on lwip. Blocks until connect, proxy runs in background. func (ug *UGate) HandleTUN(conn net.Conn, ra *net.TCPAddr, la *net.TCPAddr) error { bconn := ugate.GetStream(conn, conn) - bconn.Egress = true + bconn.Direction = ugate.StreamTypeOut ug.OnStream(bconn) defer ug.OnStreamDone(bconn) @@ -52,7 +53,6 @@ func (ug *UGate) HandleTUN(conn net.Conn, ra *net.TCPAddr, la *net.TCPAddr) erro bconn.Dest = ra.String() log.Println("LTUN ", ra, la, bconn.Dest) } - bconn.Egress = true log.Println("TUN TCP ", bconn) // TODO: config ? Could be shared with iptables port @@ -62,36 +62,59 @@ func (ug *UGate) HandleTUN(conn net.Conn, ra *net.TCPAddr, la *net.TCPAddr) erro // Handle a virtual (multiplexed) stream, received over // another connection, for example H2 POST/CONNECT, etc // The connection will have metadata, may include identify of the caller. -func (ug *UGate) HandleVirtualIN(bconn *ugate.Stream) error { +func (ug *UGate) HandleVirtualIN(bconn *ugate.Conn) error { ug.OnStream(bconn) defer ug.OnStreamDone(bconn) return ug.HandleStream(bconn) } +// handleSNI is intended for a dedicated SNI port. +// Will use the Config.Routes to map the SNI host to a ForwardTo address. If not found, +// will use a callback for a dynamic route. +func (ug *UGate) handleSNI(str *ugate.Conn) error { + // Used to present the right cert + _, str.ReadErr = ParseTLS(str) + if str.ReadErr != nil { + return str.ReadErr + } + + + route := ug.Config.Routes[str.Dest] + if route != nil { + if route.ForwardTo != "" { + str.Dest = route.ForwardTo + } + if route.Handler != nil { + // SOCKS and others need to send something back - we don't + // have a real connection, faking it. + str.PostDial(str, nil) + str.Dest = fmt.Sprintf("%v", route.Handler) + err:= route.Handler.Handle(str) + str.Close() + return err + } + } + + // Default stream handling is proxy to the SNI dest. + // Note that SNI does not include port number. + str.ReadErr = ug.DialAndProxy(str) + return nil +} + // handles a directly accepted TCP connection for a TLS port. // May SNI-forward or terminate, based on listener config. // // If terminating, based on ALPN and domain will route the stream. // For SNI - will use the SNI name to route the stream. -func (ug *UGate) handleTLS(l *ugate.Listener, acceptedCon net.Conn) { - // Attempt to determine the Listener and target - // Ingress mode, forward to an IP - - rawStream := ugate.GetStream(acceptedCon, acceptedCon) - - // Track the original stream. Report error and bytes on the original. - ug.OnStream(rawStream) - defer ug.OnStreamDone(rawStream) - - rawStream.Listener = l - rawStream.Type = l.Protocol - +// +// TODO: depreacte, too complex. Dedicated port for SNI is better and cleaner. +func (ug *UGate) handleTLSorSNI(rawStream *ugate.Conn) error { // Used to present the right cert _, rawStream.ReadErr = ParseTLS(rawStream) if rawStream.ReadErr != nil { - return + return rawStream.ReadErr } sni := rawStream.Dest @@ -117,7 +140,7 @@ func (ug *UGate) handleTLS(l *ugate.Listener, acceptedCon net.Conn) { } else { // Check certs defined for the listener wild := "" - for cn, k := range l.Certs { + for cn, k := range rawStream.Listener.Certs { if cn == "*" { wild = k } else { @@ -139,7 +162,12 @@ func (ug *UGate) handleTLS(l *ugate.Listener, acceptedCon net.Conn) { } } - sniCfg := ug.Listeners[rawStream.Dest] + // At this point, if tlsTerm is true it means we should terminate + // and handle the connection. + // Else - SNI forward + + // TODO: local routes if tlsTerm + sniCfg := ug.Config.Routes[rawStream.Dest] if sniCfg != nil { if sniCfg.ForwardTo != "" { rawStream.Dest = sniCfg.ForwardTo @@ -156,144 +184,57 @@ func (ug *UGate) handleTLS(l *ugate.Listener, acceptedCon net.Conn) { // TODO: tlsCfg = ug.Auth.GetServerConfig(cert) } // TODO: present the right ALPN for the port ( if not set, use default) - tc, err := ug.NewTLSConnIn(rawStream.Context(),l, rawStream, tlsCfg) + tc, err := ug.NewTLSConnIn(rawStream.Context(), rawStream.Listener, rawStream, tlsCfg) if err != nil { rawStream.ReadErr = err - log.Println("TLS: ", rawStream.RemoteAddr(), rawStream.Dest, rawStream.Listener, err) - return + log.Println("TLS: ", rawStream.RemoteAddr(), rawStream.Dest, rawStream.Route, err) + return err } // Handshake done. Now we have access to the ALPN. - rawStream.ReadErr = ug.HandleBTSStream(tc) + tc.PostDial(tc, nil) + rawStream.ReadErr = ug.H2Handler.HandleHTTPS(tc) } else { - // Default stream handling is proxy. - rawStream.ReadErr = ug.HandleStream(rawStream) - } -} - -// Dedicated BTS handler, for accepted connections with TLS. -// Port 443 (if root or redirected), or BASE + 7 -// -// curl https://$NAME/ --connect-to $NAME:443:127.0.0.1:15007 -func (ug *UGate) handleBTS(l *ugate.Listener, acceptedCon net.Conn) { - // Attempt to determine the Listener and target - // Ingress mode, forward to an IP - - rawStream := ugate.GetStream(acceptedCon, acceptedCon) - - // Track the original stream. Report error and bytes on the original. - ug.OnStream(rawStream) - defer ug.OnStreamDone(rawStream) + // Default stream handling is proxy to the SNI dest. + // Note that SNI does not include port number. - rawStream.Listener = l - rawStream.Type = l.Protocol - - // Used to present the right cert - _, rawStream.ReadErr = ParseTLS(rawStream) - if rawStream.ReadErr != nil { - return - } - - sni := rawStream.Dest - - // 2 main cases: - // - terminated here - if we have a cert - // - SNI routed - no termination. - tlsTerm := false - cert := "" - - if rawStream.Dest == "" { - // No explicit destination - terminate here - tlsTerm = true - } else if rawStream.Dest == ug.Auth.ID { - tlsTerm = true - } else { - // Not sure if this is worth it, may be too complex - // TODO: try to find certificate for domain or parent. - //if ug.Auth.Config.Get("key/" + dest) { - // - //} - _, ok := ug.Auth.CertMap[sni] - if ok { - tlsTerm = true - cert = sni - } - } - - sniCfg := ug.Listeners[rawStream.Dest] - if sniCfg == nil { - idx := strings.Index(rawStream.Dest, ".") - if idx > 0 { - sniCfg = ug.Listeners[rawStream.Dest[0:idx]] - } - } - if sniCfg != nil { - if sniCfg.ForwardTo != "" { - rawStream.Dest = sniCfg.ForwardTo - } - } - - if l.ForwardTo != "" { - // Explicit override - this is for listeners with type TLS and explicit - // forward, for example terminating MySQL. - // We still terminate TLS if we have a cert. - rawStream.Dest = l.ForwardTo - } - - - // Terminate TLS if the stream is detected as TLS and the matched config - // is configured for termination. - // Else it's just a proxied SNI connection. - if tlsTerm { - tlsCfg := ug.TLSConfig - if cert != "" { - // explicit cert based on listener config - // TODO: tlsCfg = ug.Auth.GetServerConfig(cert) - } - // TODO: present the right ALPN for the port ( if not set, use default) - tc, err := ug.NewTLSConnIn(rawStream.Context(),l, rawStream, tlsCfg) - if err != nil { - rawStream.ReadErr = err - log.Println("TLS: ", rawStream.RemoteAddr(), rawStream.Dest, rawStream.Listener, err) - return - } - - // Handshake done. Now we have access to the ALPN. - - rawStream.ReadErr = ug.HandleBTSStream(tc) - } else { - // Default stream handling is proxy. - rawStream.ReadErr = ug.HandleSNIStream(rawStream) + rawStream.ReadErr = ug.HandleStream(rawStream) } + return nil } + // Hamdle implements the common interface for handling accepted streams. // Will init and log the stream, then handle. // -func (ug *UGate) Handle(s *ugate.Stream) { +func (ug *UGate) Handle(s *ugate.Conn) { ug.OnStream(s) defer ug.OnStreamDone(s) ug.HandleStream(s) } -// A real accepted connection from port_listener - a real port, typically for -// legacy protocols and 'whitebox'. -func (ug *UGate) handleAcceptedConn(cfg *ugate.Listener, acceptedCon net.Conn) { +// A real accepted connection on a 'legacy' port. Will be forwarded to +// the mesh. +func (ug *UGate) handleTCPForward(bconn *ugate.Conn) error { + bconn.Direction = ugate.StreamTypeForward - bconn := ugate.GetStream(acceptedCon, acceptedCon) - bconn.Listener = cfg - bconn.Type = cfg.Protocol + if bconn.Listener.ForwardTo != "" { + bconn.Dest = bconn.Listener.ForwardTo + } - ug.OnStream(bconn) - defer ug.OnStreamDone(bconn) + bconn.ReadErr = ug.HandleStream(bconn) + return bconn.ReadErr +} - if cfg.ForwardTo != "" { - bconn.Dest = cfg.ForwardTo - } +func (ug *UGate) handleTCPEgress(bconn *ugate.Conn) error { + bconn.Direction = ugate.StreamTypeOut + + bconn.Dest = bconn.Listener.ForwardTo bconn.ReadErr = ug.HandleStream(bconn) + return bconn.ReadErr } // Auto-detect protocol on the wire, so routing info can be @@ -351,7 +292,7 @@ func (ug *UGate) handleAcceptedConn(cfg *ugate.Listener, acceptedCon net.Conn) { // case ProtoTLS: // pl.gate.sniffSNI(br) // } -// br.Stream.Type = proto +// br.Conn.Type = proto // // return nil //} diff --git a/pkg/ugatesvc/handlers.go b/pkg/ugatesvc/handlers.go index a1752dd..e6928f4 100644 --- a/pkg/ugatesvc/handlers.go +++ b/pkg/ugatesvc/handlers.go @@ -29,7 +29,7 @@ func (eh *EchoHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.(http.Flusher).Flush() // H2 requests require write to be flushed - buffering happens ! - // Wrap w.Body into Stream which does this automatically + // Wrap w.Body into Conn which does this automatically str := ugate.NewStreamRequest(r, w, nil) eh.handle(str, false) @@ -51,7 +51,7 @@ type StreamInfo struct { } -func GetStreamInfo(str *ugate.Stream) *StreamInfo { +func GetStreamInfo(str *ugate.Conn) *StreamInfo { si := &StreamInfo{ LocalAddr: str.LocalAddr(), RemoteAddr: str.RemoteAddr(), @@ -68,7 +68,7 @@ func GetStreamInfo(str *ugate.Stream) *StreamInfo { return si } -func (*EchoHandler) handle(str *ugate.Stream, serverFirst bool) error { +func (*EchoHandler) handle(str *ugate.Conn, serverFirst bool) error { d := make([]byte, 2048) si := GetStreamInfo(str) si.RemoteID= RemoteID(str) @@ -80,7 +80,7 @@ func (*EchoHandler) handle(str *ugate.Stream, serverFirst bool) error { if serverFirst { str.Write(b.Bytes()) } - //ac.SetDeadline(time.Now().Add(5 * time.Second)) + //ac.SetDeadline(time.Now().StartListener(5 * time.Second)) n, err := str.Read(d) if err != nil { return err @@ -102,7 +102,7 @@ func (*EchoHandler) handle(str *ugate.Stream, serverFirst bool) error { func (eh *EchoHandler) String() string { return "Echo" } -func (eh *EchoHandler) Handle(ac *ugate.Stream) error { +func (eh *EchoHandler) Handle(ac *ugate.Conn) error { if DebugEcho { log.Println("ECHOS ", ac) } @@ -306,3 +306,49 @@ func (gw *UGate) HandleTCPProxy(w http.ResponseWriter, r *http.Request) { } } + +// Inbound TLS over H2C. +// Used in KNative or similar environmnets that get a H2 POST or CONNECT +// Dest is local. +func (gw *UGate) HandleTLSoverH2(w http.ResponseWriter, r *http.Request) { + // Create a stream, used for proxy with caching. + str := ugate.NewStreamRequest(r, w, nil) + + parts := strings.Split(r.RequestURI, "/") + str.Dest = parts[2] + + str.PostDialHandler = func(conn net.Conn, err error) { + if err != nil { + w.Header().Add("Error", err.Error()) + w.WriteHeader(500) + w.(http.Flusher).Flush() + return + } + //w.Header().Set("Trailer", "X-Close") + w.WriteHeader(200) + w.(http.Flusher).Flush() + } + defer func() { + // Handler is done - even if it didn't call close, prevent calling it again. + if ugate.DebugClose { + log.Println("HTTP.Close - handler done, no close ", str.Dest) + } + str.ServerClose = true + }() + + tlsCfg := gw.TLSConfig + tc, err := gw.NewTLSConnIn(str.Context(), nil, str, tlsCfg) + if err != nil { + str.ReadErr = err + log.Println("TLS: ", str.RemoteAddr(), str.Dest, str.Route, err) + return + } + + // Treat it as regular stream forwarding + gw.HandleVirtualIN(tc) + + if ugate.DebugClose { + log.Println("Handler closed for ", r.RequestURI) + } + +} diff --git a/pkg/ugatesvc/hbone.go b/pkg/ugatesvc/hbone.go new file mode 100644 index 0000000..4ad9a66 --- /dev/null +++ b/pkg/ugatesvc/hbone.go @@ -0,0 +1,200 @@ +package ugatesvc + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "runtime/debug" + "strings" + "time" + + "github.com/costinm/ugate" + "golang.org/x/net/http2" +) + +// Dedicated BTS handler, for accepted connections with TLS. +// +// Port 443 (if root or redirected), or BASE + 7 +// +// curl https://$NAME/ --connect-to $NAME:443:127.0.0.1:15007 +func (ug *UGate) acceptedHbone(rawStream *ugate.Conn) error { + + + tlsCfg := ug.TLSConfig + tc, err := ug.NewTLSConnIn(rawStream.Context(),rawStream.Listener, rawStream, tlsCfg) + if err != nil { + rawStream.ReadErr = err + log.Println("TLS: ", rawStream.RemoteAddr(), rawStream.Dest, rawStream.Route, err) + return nil + } + + // Handshake done. Now we have access to the ALPN. + tc.PostDial(tc, nil) + + // http2 and http expect a net.Listener, and do their own accept() + ug.H2Handler.h2Server.ServeConn( + tc, + &http2.ServeConnOpts{ + Handler: ug.H2Handler, // Also plain text, needs to be upgraded + Context: tc.Context(), // associated with the stream, with cancel + + //Context: // can be used to cancel, pass meta. + // h2 adds http.LocalAddrContextKey(NetAddr), ServerContextKey (*Server) + }) + return nil +} + +// Handles an accepted connection with plain text h2, intended for +// hbone protocol. +func (ug *UGate) acceptedHboneC(tc *ugate.Conn) error { + tc.PostDial(tc, nil) + ug.H2Handler.h2Server.ServeConn( + tc, + &http2.ServeConnOpts{ + Handler: http.HandlerFunc(ug.H2Handler.httpHandleHboneC), // Also plain text, needs to be upgraded + Context: tc.Context(), // associated with the stream, with cancel + //Context: // can be used to cancel, pass meta. + // h2 adds http.LocalAddrContextKey(NetAddr), ServerContextKey (*Server) + }) + return nil +} + + +func (l *H2Transport) httpHandleHboneC(w http.ResponseWriter, r *http.Request) { + t0 := time.Now() + defer func() { + // TODO: add it to an event buffer + l.ug.OnHClose("Hbone", "", "", r, time.Since(t0)) + + if r := recover(); r != nil { + fmt.Println("Recovered in hbone", r) + + debug.PrintStack() + + // find out exactly what the error was and set err + var err error + + switch x := r.(type) { + case string: + err = errors.New(x) + case error: + err = x + default: + err = errors.New("Unknown panic") + } + if err != nil { + fmt.Println("ERRROR: ", err) + } + } + }() + + // TODO: parse Envoy / hbone headers. + + if ugate.DebugClose { + log.Println("Hbone-Start ", r.Method, r.URL, r.Proto, r.Header, RemoteID, "", r.RemoteAddr) + } + + // TCP proxy for SSH + if r.Method == "CONNECT" || strings.HasPrefix(r.RequestURI, "/_hbone/") { + l.ug.HandleTCPProxy(w, r) + } + + tlsCfg := l.ug.TLSConfig + + // Create a stream, used for proxy with caching. + rawStream := ugate.NewStreamRequest(r, w, nil) + rawStream.Listener = &ugate.Listener{ + ALPN: []string{"h2"}, + } + + tc, err := l.ug.NewTLSConnIn(rawStream.Context(),rawStream.Listener, rawStream, + tlsCfg) + if err != nil { + rawStream.ReadErr = err + log.Println("TLS: ", rawStream.RemoteAddr(), rawStream.Dest, rawStream.Route, err) + return + } + + // TODO: All Istio checks go here. The TLS handshake doesn't check + // root cert or anything - this is proof of concept only, to eval + // perf. + + if tc.TLS.NegotiatedProtocol == "h2" { + // http2 and http expect a net.Listener, and do their own accept() + l.ug.H2Handler.h2Server.ServeConn( + tc, + &http2.ServeConnOpts{ + Handler: http.HandlerFunc(l.ug.H2Handler.httpHandleHboneCHTTP), + Context: tc.Context(), // associated with the stream, with cancel + }) + } else { + // HTTP/1.1 + // TODO. Typically we want to upgrade over the wire to H2 + } +} + +// At this point we got the TLS stream over H2, and forward to the app +// We still need to know if the app is H2C or HTTP/1.1 +func (l *H2Transport) httpHandleHboneCHTTP(w http.ResponseWriter, r *http.Request) { + l.ForwardHTTP(w,r, "127.0.0.1:8080") +} + +// HboneCat copies stdin/stdout to a HBONE stream. +func HboneCat(ug *UGate, urlOrHost string, tls bool, stdin io.ReadCloser, + stdout io.WriteCloser) error { + i, o := io.Pipe() + + + if !strings.HasPrefix(urlOrHost, "https://") { + h, p, err := net.SplitHostPort(urlOrHost) + if err != nil { + return err + } + urlOrHost = "https://" + h + "/hbone/:" + p + } + r, _ := http.NewRequest("POST", urlOrHost, i) + res, err := ug.RoundTrip(r) + if err != nil { + return err + } + + var nc *ugate.Conn + if tls { + plain := ugate.NewStreamRequestOut(r, o, res, nil) + nc, err = ug.NewTLSConnOut(context.Background(), plain, ug.TLSConfig, + "", []string{"h2"}) + if err != nil { + return err + } + } else { + nc = ugate.NewStreamRequestOut(r, o, res, nil) + } + go func() { + b1 := make([]byte, 1024) + for { + n, err := nc.Read(b1) + if err != nil { + stdout.Close() + stdin.Close() + log.Println("Tun read err", err) + return + } + stdout.Write(b1[0:n]) + } + }() + + b1 := make([]byte, 1024) + for { + n, err := stdin.Read(b1) + if err != nil { + return err + } + nc.Write(b1[0:n]) + } + return nil +} + diff --git a/pkg/ugatesvc/headers.go b/pkg/ugatesvc/headers.go index bd32f63..7a1799d 100644 --- a/pkg/ugatesvc/headers.go +++ b/pkg/ugatesvc/headers.go @@ -12,11 +12,28 @@ import ( "github.com/costinm/ugate" ) + +// There are few options on how to pass stream metadata around: +// +// Mux proto: +// - H2 - clear protocol, but has overhead and complexity and because +// of muxing we can't splice +// +// Splice-able proto: +// - use HTTP/1.1 CONNECT - and mime headers +// - use HA Proxy - custom binary +// - use a proto (possibly with simplified proto parsing), like Istio +// - ??? +// +// A mixed mode is also possible - proto in a h2 or http header. +// Encoding/decoding speed and memory use are the key - technically +// all options can be supported. + type MimeEncoder struct { } -func (*MimeEncoder) Unmarshal(s *ugate.Stream) (done bool, err error) { +func (*MimeEncoder) Unmarshal(s *ugate.Conn) (done bool, err error) { buf, err := s.Fill(5) if err != nil { return false, err @@ -40,16 +57,17 @@ func (*MimeEncoder) Unmarshal(s *ugate.Stream) (done bool, err error) { s.Skip(5 + int(len)) if ugate.DebugClose { - log.Println("Stream.receiveHeaders ", s.StreamId, s.InHeader, s.RBuffer().Size()) + log.Println("Conn.receiveHeaders ", s.StreamId, s.InHeader, s.RBuffer().Size()) } return true, nil } -func (*MimeEncoder) Marshal(s *ugate.Stream) error { +func (*MimeEncoder) Marshal(s *ugate.Conn) error { bb := s.WBuffer() h := s.InHeader - if s.Egress { + if s.Direction == ugate.StreamTypeOut || + s.Direction == ugate.StreamTypeUnknown { h = s.OutHeader } @@ -70,7 +88,7 @@ func (*MimeEncoder) Marshal(s *ugate.Stream) error { } if ugate.DebugClose { - log.Println("Stream.sendHeaders ", s.StreamId, h) + log.Println("Conn.sendHeaders ", s.StreamId, h) } return nil @@ -98,7 +116,7 @@ type BEncoder struct { } -func (*BEncoder) AddHeader(s *ugate.Stream, k, v []byte) { +func (*BEncoder) AddHeader(s *ugate.Conn, k, v []byte) { bb := s.WBuffer() if bb.Size() == 0 { bb.WriteByte(0) @@ -113,7 +131,7 @@ func (*BEncoder) AddHeader(s *ugate.Stream, k, v []byte) { bb.Write(v) } -func (*BEncoder) Unmarshal(s *ugate.Stream) (done bool, err error) { +func (*BEncoder) Unmarshal(s *ugate.Conn) (done bool, err error) { h, err := s.Fill(5) if err != nil { return false, err @@ -128,10 +146,11 @@ func (*BEncoder) Unmarshal(s *ugate.Stream) (done bool, err error) { } -func (*BEncoder) Marshal(s *ugate.Stream) error { +func (*BEncoder) Marshal(s *ugate.Conn) error { bb := s.WBuffer() h := s.InHeader - if s.Egress { + if s.Direction == ugate.StreamTypeOut || + s.Direction == ugate.StreamTypeUnknown { h = s.OutHeader } // TODO: leave 5 bytes at Start to reproduce streaming gRPC format @@ -152,7 +171,7 @@ func (*BEncoder) Marshal(s *ugate.Stream) error { bb.WriteByte(2) if ugate.DebugClose { - log.Println("Stream.sendHeaders ", s.StreamId, h) + log.Println("Conn.sendHeaders ", s.StreamId, h) } return nil diff --git a/pkg/ugatesvc/http.go b/pkg/ugatesvc/http.go index 6671e57..25abbf7 100644 --- a/pkg/ugatesvc/http.go +++ b/pkg/ugatesvc/http.go @@ -2,7 +2,6 @@ package ugatesvc import ( "bytes" - "context" "crypto/tls" "errors" "fmt" @@ -67,97 +66,17 @@ func NewH2Transport(ug *UGate) (*H2Transport, error) { ug.Mux.HandleFunc("/_dm/", ug.HandleID) ug.Mux.HandleFunc("/dm/", ug.HandleTCPProxy) + ug.Mux.HandleFunc("/hbone/", ug.HandleTCPProxy) + ug.Mux.HandleFunc("/hbonec/", ug.HandleTLSoverH2) + ug.Mux.HandleFunc("/_hb/", ug.HandleTCPProxy) + ug.Mux.HandleFunc("/_hbc/", ug.HandleTLSoverH2) + // Plain HTTP requests - we only care about CONNECT/ws go http.Serve(h2.httpListener, h2) return h2, nil } -// UpdateReverseAccept updates the upstream accept connections, based on config. -// Should be called when the config changes -func (t *H2Transport) UpdateReverseAccept() { - ev := make(chan string) - for addr, key := range t.ug.Config.H2R { - // addr is a hostname - dm := t.ug.GetOrAddNode(addr) - if dm.Addr == "" { - if key == "" { - dm.Addr = net.JoinHostPort(addr, "443") - } else { - dm.Addr = net.JoinHostPort(addr, "15007") - } - } - - go t.maintainPinnedConnection(dm, ev) - } - <- ev - log.Println("maintainPinned connected for ", t.ug.Auth.VIP6) - -} - -// Reverse Accept dials a connection to addr, and registers a H2 SERVER -// conn on it. The other end will register a H2 client, and create streams. -// The client cert will be used to associate incoming streams, based on config or direct mapping. -// TODO: break it in 2 for tests to know when accept is in effect. -func (t *H2Transport) maintainPinnedConnection(dm *ugate.DMNode, ev chan string) { - // maintain while the host is in the 'pinned' list - if _, f := t.ug.Config.H2R[dm.ID]; !f { - return - } - - //ctx := context.Background() - if dm.Backoff == 0 { - dm.Backoff = 1000 * time.Millisecond - } - - ctx := context.TODO() - //ctx, ctxCancel := context.WithTimeout(ctx, 5*time.Second) - //defer ctxCancel() - - protos := t.ug.Config.TunProto - if len(protos) == 0 { - protos = []string{"quic", "h2r"} - } - var err error - var muxer ugate.Muxer - for _, k := range protos { - muxer, err = t.ug.DialMUX(ctx, k, dm, nil) - if err == nil { - break; - } - } - if err == nil { - log.Println("UP: ", dm.Addr, muxer) - // wait for mux to be closed - dm.Backoff = 1000 * time.Millisecond - return - } - - log.Println("UP: err", dm.Addr, err, dm.Backoff) - // Failed to connect - if dm.Backoff < 15*time.Minute { - dm.Backoff = 2 * dm.Backoff - } - - time.AfterFunc(dm.Backoff, func() { - t.maintainPinnedConnection(dm, ev) - }) - - // p := str.TLS.NegotiatedProtocol - //if p == "h2r" { - // // Old code used the 'raw' TLS connection to create a server connection - // t.h2Server.ServeConn( - // str, - // &http2.ServeConnOpts{ - // Handler: t, // Also plain text, needs to be upgraded - // Context: str.Context(), - // - // //Context: // can be used to cancel, pass meta. - // // h2 adds http.LocalAddrContextKey(NetAddr), ServerContextKey (*Server) - // }) - //} -} - // Common entry point for H1, H2 - both plain and tls // Will do the 'common' operations - authn, authz, logging, metrics for all BTS and regular HTTP. @@ -223,7 +142,7 @@ func (l *H2Transport) ServeHTTP(w http.ResponseWriter, r *http.Request) { tls := r.TLS // If the request was handled by normal uGate listener. us := r.Context().Value("ugate.stream") - if ugs, ok := us.(*ugate.Stream); ok { + if ugs, ok := us.(*ugate.Conn); ok { tls = ugs.TLS r.TLS = tls } @@ -277,10 +196,7 @@ func (l *H2Transport) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Will sniff H2 and http/1.1 and use the right handler. // // Ex: curl localhost:9080/debug/vars --http2-prior-knowledge -func (t *H2Transport) handleHTTPListener(pl *ugate.Listener, acceptedCon net.Conn) error { - bconn := ugate.GetStream(acceptedCon, acceptedCon) - t.ug.OnStream(bconn) - defer t.ug.OnStreamDone(bconn) +func (t *H2Transport) handleHTTPListener(bconn *ugate.Conn) error { err := SniffH2(bconn) if err != nil { @@ -321,7 +237,7 @@ var ( ) -func SniffH2(s *ugate.Stream) error { +func SniffH2(s *ugate.Conn) error { var proto string for { @@ -354,15 +270,14 @@ func SniffH2(s *ugate.Stream) error { // listeners. // // Blocking. -func (t *H2Transport) HandleHTTPS(c *ugate.Stream) error { +func (t *H2Transport) HandleHTTPS(str *ugate.Conn) error { // http2 and http expect a net.Listener, and do their own accept() - str := c if str.TLS != nil && str.TLS.NegotiatedProtocol == "h2" { t.h2Server.ServeConn( - c, + str, &http2.ServeConnOpts{ Handler: t, // Also plain text, needs to be upgraded - Context: c.Context(), // associated with the stream, with cancel + Context: str.Context(), // associated with the stream, with cancel //Context: // can be used to cancel, pass meta. // h2 adds http.LocalAddrContextKey(NetAddr), ServerContextKey (*Server) @@ -371,13 +286,14 @@ func (t *H2Transport) HandleHTTPS(c *ugate.Stream) error { } // Else: HTTP/1.1 - t.httpListener.incoming <- c + t.httpListener.incoming <- str // TODO: wait for connection to be closed. <-str.Context().Done() return nil } + func (l *H2Transport) Close() error { return nil } @@ -433,26 +349,26 @@ func (l *listener) Accept() (net.Conn, error) { } } -type HttpClientStream struct { - ugate.Stream - *http.Response - request *http.Request -} - -func NewHttpClientStream(s *ugate.Stream) *HttpClientStream { - h := &HttpClientStream{ - } - return h -} - -type HttpServerStream struct { - ugate.Stream - http.ResponseWriter - request *http.Request -} - -func NewHttpServerStream(s *ugate.Stream) *HttpServerStream { - h := &HttpServerStream{ - } - return h -} +//type HttpClientStream struct { +// ugate.Conn +// *http.Response +// request *http.Request +//} +// +//func NewHttpClientStream(s *ugate.Conn) *HttpClientStream { +// h := &HttpClientStream{ +// } +// return h +//} +// +//type HttpServerStream struct { +// ugate.Conn +// http.ResponseWriter +// request *http.Request +//} +// +//func NewHttpServerStream(s *ugate.Conn) *HttpServerStream { +// h := &HttpServerStream{ +// } +// return h +//} diff --git a/pkg/ugatesvc/http_proxy.go b/pkg/ugatesvc/http_proxy.go index a4295be..a997d59 100644 --- a/pkg/ugatesvc/http_proxy.go +++ b/pkg/ugatesvc/http_proxy.go @@ -20,14 +20,12 @@ func (ht *H2Transport) ForwardHTTP(w http.ResponseWriter, r *http.Request, pathH // will be used by RoundTrip. r1.URL.Host = pathH - // can add more headers - // can add headers to the response - // This uses the BTS/H2 protocol or reverse path. // Forward to regular sites not supported. res, err := ht.ug.RoundTrip(r1) SendBackResponse(w, r, res, err) } + // Used by both ForwardHTTP and ForwardMesh, after RoundTrip is done. // Will copy response headers and body func SendBackResponse(w http.ResponseWriter, r *http.Request, @@ -51,7 +49,7 @@ func SendBackResponse(w http.ResponseWriter, r *http.Request, CopyResponseHeaders(w.Header(), res.Header) w.WriteHeader(res.StatusCode) - stats := &ugate.Stream{} + stats := &ugate.Conn{} n, err := stats.CopyBuffered(w, res.Body, true) log.Println("Done: ", r.URL, res.StatusCode, n, err) diff --git a/pkg/ugatesvc/port_listener.go b/pkg/ugatesvc/port_listener.go index 268f86c..6b4bd72 100644 --- a/pkg/ugatesvc/port_listener.go +++ b/pkg/ugatesvc/port_listener.go @@ -1,6 +1,7 @@ package ugatesvc import ( + "errors" "log" "net" "strings" @@ -56,38 +57,45 @@ import ( // - use websocket - no multiplexing. // - binary messages, using websocket frames -type PortListener struct { - ugate.Listener - NetListener net.Listener - PortHandler ugate.Handler +// StartListener and Start a real port listener on a port. +// Virtual listeners can be added to ug.Conf or the mux. +func (ug *UGate) StartListener(ll *ugate.Listener) error { + + err := ug.startPortListener(ll) + if err != nil { + return err + } + + return nil } + // Creates a raw (port) TCP listener. Accepts connections // on a local port, forwards to a remote destination. -func (cfg *PortListener) Start(gate *UGate) error { - ll := cfg +func (gate *UGate) startPortListener(pl *ugate.Listener) error { + ll := pl - if cfg.Address == "" { - cfg.Address = ":0" + if pl.Address == "" { + pl.Address = ":0" } - if cfg.Address[0] == '-' { + if pl.Address[0] == '-' { return nil // virtual listener } - if cfg.NetListener == nil { - if strings.HasPrefix(cfg.Address, "/") || - strings.HasPrefix(cfg.Address, "@") { + if pl.NetListener == nil { + if strings.HasPrefix(pl.Address, "/") || + strings.HasPrefix(pl.Address, "@") { us, err := net.ListenUnix("unix", &net.UnixAddr{ - Name: cfg.Address, + Name: pl.Address, Net: "unix", }) if err != nil { return err } - cfg.NetListener = us + pl.NetListener = us } else { // Not supported: RFC: address "" means all families, 0.0.0.0 IP4, :: IP6, localhost IP4/6, etc listener, err := net.Listen("tcp", ll.Address) @@ -100,19 +108,43 @@ func (cfg *PortListener) Start(gate *UGate) error { return err } } - cfg.NetListener = listener + pl.NetListener = listener } - + } + switch pl.Protocol { + case ugate.ProtoTLS: + pl.PortHandler = ugate.HandlerFunc(gate.handleTLSorSNI) + case ugate.ProtoSNI: + pl.PortHandler = ugate.HandlerFunc(gate.handleSNI) + case ugate.ProtoBTS: + pl.PortHandler = ugate.HandlerFunc(gate.acceptedHbone) + case ugate.ProtoBTSC: + pl.PortHandler = ugate.HandlerFunc(gate.acceptedHboneC) + case ugate.ProtoHTTP: + pl.PortHandler = ugate.HandlerFunc(gate.H2Handler.handleHTTPListener) + case ugate.ProtoTCPOut: + if pl.ForwardTo == "" { + return errors.New("invalid TCPOut, missing ForwardTo") + } + pl.PortHandler = ugate.HandlerFunc(gate.handleTCPEgress) + case ugate.ProtoTCPIn: + if pl.ForwardTo == "" { + return errors.New("invalid TCPIn, missing ForwardTo") + } + pl.PortHandler = ugate.HandlerFunc(gate.handleTCPForward) + default: + log.Println("Unspecified port, default to forward (in)") + pl.PortHandler = ugate.HandlerFunc(gate.handleTCPForward) } - go ll.serve(gate) + go serve(ll, gate) return nil } -func (pl *PortListener) Close() error { - pl.NetListener.Close() - return nil -} +//func (pl *PortListener) Close() error { +// pl.NetListener.Close() +// return nil +//} //func (pl PortListener) Accept() (net.Conn, error) { // return pl.NetListener.Accept() @@ -128,7 +160,7 @@ func (pl *PortListener) Close() error { // For -R, runs on the remote ssh server to accept connections and forward back to client, which in turn // will forward to a Port/app. // Blocking. -func (pl *PortListener) serve(gate *UGate) { +func serve(pl *ugate.Listener, gate *UGate) { log.Println("uGate: listen ", pl.Address, pl.NetListener.Addr(), pl.ForwardTo, pl.Protocol, pl.Handler, pl.PortHandler) for { remoteConn, err := pl.NetListener.Accept() @@ -147,7 +179,7 @@ func (pl *PortListener) serve(gate *UGate) { if pl.PortHandler != nil { go func() { bconn := ugate.GetStream(remoteConn, remoteConn) - bconn.Listener = &pl.Listener + bconn.Listener = pl bconn.Type = pl.Protocol gate.OnStream(bconn) defer gate.OnStreamDone(bconn) @@ -156,16 +188,6 @@ func (pl *PortListener) serve(gate *UGate) { }() return } - switch pl.Protocol { - case ugate.ProtoTLS: - go gate.handleTLS(&pl.Listener, remoteConn) - case ugate.ProtoBTS: - go gate.handleBTS(&pl.Listener, remoteConn) - case ugate.ProtoHTTP: - go gate.H2Handler.handleHTTPListener(&pl.Listener, remoteConn) - default: - go gate.handleAcceptedConn(&pl.Listener, remoteConn) - } } } diff --git a/pkg/ugatesvc/reverse.go b/pkg/ugatesvc/reverse.go new file mode 100644 index 0000000..28f7f9d --- /dev/null +++ b/pkg/ugatesvc/reverse.go @@ -0,0 +1,103 @@ +package ugatesvc + +import ( + "context" + "log" + "net" + "time" + + "github.com/costinm/ugate" +) + +// Reverse tunnel: create a persistent connection to a gateway, and +// accept connections over that connection. +// +// The gateway registers the current endpoint with it's own IP:port +// (for example WorkloadEntry or Endpoint or in-memory ), and forwards accepted requests over the +// established connection. + +// UpdateReverseAccept updates the upstream accept connections, based on config. +// Should be called when the config changes +func (t *H2Transport) UpdateReverseAccept() { + ev := make(chan string) + for addr, key := range t.ug.Config.H2R { + // addr is a hostname + dm := t.ug.GetOrAddNode(addr) + if dm.Addr == "" { + if key == "" { + dm.Addr = net.JoinHostPort(addr, "443") + } else { + dm.Addr = net.JoinHostPort(addr, "15007") + } + } + + go t.maintainPinnedConnection(dm, ev) + } + <- ev + log.Println("maintainPinned connected for ", t.ug.Auth.VIP6) + +} + +// Reverse Accept dials a connection to addr, and registers a H2 SERVER +// conn on it. The other end will register a H2 client, and create streams. +// The client cert will be used to associate incoming streams, based on config or direct mapping. +// TODO: break it in 2 for tests to know when accept is in effect. +func (t *H2Transport) maintainPinnedConnection(dm *ugate.DMNode, ev chan string) { + // maintain while the host is in the 'pinned' list + if _, f := t.ug.Config.H2R[dm.ID]; !f { + return + } + + //ctx := context.Background() + if dm.Backoff == 0 { + dm.Backoff = 1000 * time.Millisecond + } + + ctx := context.TODO() + //ctx, ctxCancel := context.WithTimeout(ctx, 5*time.Second) + //defer ctxCancel() + + protos := t.ug.Config.TunProto + if len(protos) == 0 { + protos = []string{"quic", "h2r"} + } + var err error + var muxer ugate.Muxer + for _, k := range protos { + muxer, err = t.ug.DialMUX(ctx, k, dm, nil) + if err == nil { + break; + } + } + if err == nil { + log.Println("UP: ", dm.Addr, muxer) + // wait for mux to be closed + dm.Backoff = 1000 * time.Millisecond + return + } + + log.Println("UP: err", dm.Addr, err, dm.Backoff) + // Failed to connect + if dm.Backoff < 15*time.Minute { + dm.Backoff = 2 * dm.Backoff + } + + time.AfterFunc(dm.Backoff, func() { + t.maintainPinnedConnection(dm, ev) + }) + + // p := str.TLS.NegotiatedProtocol + //if p == "h2r" { + // // Old code used the 'raw' TLS connection to create a server connection + // t.h2Server.ServeConn( + // str, + // &http2.ServeConnOpts{ + // Handler: t, // Also plain text, needs to be upgraded + // Context: str.Context(), + // + // //Context: // can be used to cancel, pass meta. + // // h2 adds http.LocalAddrContextKey(NetAddr), ServerContextKey (*Server) + // }) + //} +} + diff --git a/pkg/ugatesvc/routing.go b/pkg/ugatesvc/routing.go index 8ad7d9c..56b159c 100644 --- a/pkg/ugatesvc/routing.go +++ b/pkg/ugatesvc/routing.go @@ -14,7 +14,7 @@ import ( "golang.org/x/net/http2" ) -// After a Stream ( TCP+meta or HTTP ) is accepted/captured, we need to route it based on +// After a Conn ( TCP+meta or HTTP ) is accepted/captured, we need to route it based on // the config. // // Use cases: @@ -37,7 +37,7 @@ func (ug *UGate) Dial(netw, addr string) (net.Conn, error) { // // Used internally to create the raw TLS connections to both mesh // and non-mesh nodes. -func (ug *UGate) DialTLS(ctx context.Context, addr string, alpn []string) (*ugate.Stream, error) { +func (ug *UGate) DialTLS(ctx context.Context, addr string, alpn []string) (*ugate.Conn, error) { ctx1, cf := context.WithTimeout(ctx, 5*time.Second) defer cf() tcpC, err := ug.parentDialer.DialContext(ctx1, "tcp", addr) @@ -63,7 +63,7 @@ func (ug *UGate) DialTLS(ctx context.Context, addr string, alpn []string) (*ugat // str.Dest is the destination hostname:port or hostname. // -func (ug *UGate) DialAndProxy(str *ugate.Stream) error { +func (ug *UGate) DialAndProxy(str *ugate.Conn) error { nc, err := ug.dial(context.Background(), str.Dest, str) str.PostDial(nc, err) @@ -76,7 +76,7 @@ func (ug *UGate) DialAndProxy(str *ugate.Stream) error { // Dial may begin streaming from input connection to the dialed. // When dial return, the headers from dialed con are received. - if ncs, ok := nc.(*ugate.Stream) ; ok { + if ncs, ok := nc.(*ugate.Conn) ; ok { if ncs.OutHeader != nil { CopyResponseHeaders(str.Header(), ncs.OutHeader) } @@ -88,9 +88,9 @@ func (ug *UGate) DialAndProxy(str *ugate.Stream) error { return str.ProxyTo(nc) } -// Stream received via a BTS SNI route. +// Conn received via a BTS SNI route. // Similar with dialAndProxy -func (ug *UGate) HandleSNIStream(str *ugate.Stream) error { +func (ug *UGate) HandleSNIStream(str *ugate.Conn) error { idx := strings.Index(str.Dest, ".") if idx > 0 { // Only SNI route to mesh nodes, ignore the domain name @@ -147,7 +147,7 @@ func (ug *UGate) DialContext(ctx context.Context, netw, addr string) (net.Conn, // // If it has real endpoint address - we can use the associated protocol. // Else we can try all supported protos. -func (ug *UGate) DialMUX(ctx context.Context, net string, node *ugate.DMNode, ev func(t string, stream *ugate.Stream)) (ugate.Muxer, error) { +func (ug *UGate) DialMUX(ctx context.Context, net string, node *ugate.DMNode, ev func(t string, stream *ugate.Conn)) (ugate.Muxer, error) { // TODO: list, try them all. rd := ug.MuxDialers[net] if rd == nil { @@ -171,7 +171,7 @@ func (ug *UGate) OnMUX(node *ugate.DMNode) error { const usePipe = false // Dial creates a stream to the given address. -func (ug *UGate) dial(ctx context.Context, addr string, s *ugate.Stream) (net.Conn, error) { +func (ug *UGate) dial(ctx context.Context, addr string, s *ugate.Conn) (net.Conn, error) { // sets clientEventContextKey - if ctx is used for a round trip, will // set all data. // Will also make sure DNSStart, Connect, etc are set (if we want to) @@ -317,7 +317,7 @@ func (t *H2Transport) GetClientConn(req *http.Request, addr string) (*http2.Clie // Real address - addr = dmn.Addr } - var tc *ugate.Stream + var tc *ugate.Conn var err error // TODO: use local announces // TODO: use VPN server for all or for mesh diff --git a/pkg/ugatesvc/tls_conn.go b/pkg/ugatesvc/tls_conn.go index a8fb1a6..4372ba8 100644 --- a/pkg/ugatesvc/tls_conn.go +++ b/pkg/ugatesvc/tls_conn.go @@ -19,23 +19,23 @@ import ( type TLSConn struct { // Raw TCP connection, for remote address and stats // TODO: for H2-over-TLS-over-WS, it will be a WS conn - *ugate.Stream + *ugate.Conn // wrapps the original conn for Local/RemoteAddress and deadlines // Implements CloseWrite, ConnectionState, tls *tls.Conn } -func (ug *UGate) NewTLSConnOut(ctx context.Context, nc net.Conn, cfg *tls.Config, peerID string, alpn []string) (*ugate.Stream, error) { +func (ug *UGate) NewTLSConnOut(ctx context.Context, nc net.Conn, cfg *tls.Config, peerID string, alpn []string) (*ugate.Conn, error) { lc := &TLSConn{ } - if mc, ok := nc.(*ugate.Stream); ok { - lc.Stream = mc + if mc, ok := nc.(*ugate.Conn); ok { + lc.Conn = mc if rnc, ok := lc.Out.(net.Conn); ok { nc = rnc } } else { - lc.Stream = ugate.NewStream() + lc.Conn = ugate.NewStream() } config, keyCh := ConfigForPeer(ug.Auth,cfg, peerID) @@ -47,34 +47,35 @@ func (ug *UGate) NewTLSConnOut(ctx context.Context, nc net.Conn, cfg *tls.Config if err != nil { return nil, err } - lc.Stream.In = lc.tls - lc.Stream.Out = lc.tls + lc.Conn.In = lc.tls + lc.Conn.Out = lc.tls //lc.tls.ConnectionState().DidResume tcs := lc.tls.ConnectionState() - lc.Stream.TLS = &tcs + lc.Conn.TLS = &tcs lc.tls = cs - return lc.Stream, err + return lc.Conn, err } // SecureInbound runs the TLS handshake as a server. // Accepts connections without client certificate - alternate form of auth will be used, either // an inner TLS connection or JWT in metadata. -func (ug *UGate) NewTLSConnIn(ctx context.Context, l *ugate.Listener, nc net.Conn, cfg *tls.Config) (*ugate.Stream, error) { +func (ug *UGate) NewTLSConnIn(ctx context.Context, l *ugate.Listener, nc net.Conn, cfg *tls.Config) (*ugate.Conn, error) { config, keyCh := ConfigForPeer(ug.Auth, cfg, "") - if l.ALPN == nil { - config.NextProtos = []string{"h2r", "h2"} - } else { - config.NextProtos = l.ALPN + if l != nil { + if l.ALPN == nil { + config.NextProtos = []string{"h2r", "h2"} + } else { + config.NextProtos = l.ALPN + } } - tc := &TLSConn{} - tc.Stream = ugate.NewStream() - if mc, ok := nc.(*ugate.Stream); ok { + tc.Conn = ugate.NewStream() + if mc, ok := nc.(*ugate.Conn); ok { m := mc // Sniffed, etc - tc.Listener = m.Listener + tc.Route = m.Route tc.Dest = m.Dest } @@ -85,11 +86,11 @@ func (ug *UGate) NewTLSConnIn(ctx context.Context, l *ugate.Listener, nc net.Con return nil, err } tcs := tc.tls.ConnectionState() - tc.Stream.TLS = &tcs - tc.Stream.In = tc.tls - tc.Stream.Out = tc.tls + tc.Conn.TLS = &tcs + tc.Conn.In = tc.tls + tc.Conn.Out = tc.tls - return tc.Stream, err + return tc.Conn, err } func ConfigForPeer(a *auth.Auth, cfg *tls.Config, remotePeerID string) (*tls.Config, <-chan []*x509.Certificate) { @@ -218,7 +219,7 @@ const ( // // TODO: in mesh, use one cypher suite (TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256) // maybe 2 ( since keys are ECDSA ) -func ParseTLS(acc *ugate.Stream) (*clientHelloMsg,error) { +func ParseTLS(acc *ugate.Conn) (*clientHelloMsg,error) { buf, err := acc.Fill(5) if err != nil { return nil, err diff --git a/pkg/ugatesvc/ugate.go b/pkg/ugatesvc/ugate.go index f2e4f39..91aa554 100644 --- a/pkg/ugatesvc/ugate.go +++ b/pkg/ugatesvc/ugate.go @@ -27,10 +27,6 @@ var InitHooks []func(gate *UGate) StartFunc type UGate struct { - // Actual (real) port listeners, key is the host:port - // Typically host is 0.0.0.0 or 127.0.0.1 - may also be one of the - // local addresses. - Listeners map[string]*PortListener Config *ugate.GateCfg @@ -41,10 +37,8 @@ type UGate struct { // integration. parentDialer ugate.ContextDialer - // Configurations, keyed by host + port and URL. - Conf map[string]*ugate.Listener - DefaultListener *ugate.Listener + DefaultRoute *ugate.Route // Handlers for incoming connections - local accept or forwarding. Mux *http.ServeMux @@ -63,17 +57,18 @@ type UGate struct { // Active connection by internal stream ID. // Tracks incoming Streams - if the stream is getting proxied to - // a net.Conn or Stream, the dest will not be tracked here. - ActiveTcp map[int]*ugate.Stream + // a net.Conn or Conn, the dest will not be tracked here. + ActiveTcp map[int]*ugate.Conn m sync.RWMutex + Auth *auth.Auth Msg *msgs.Mux - MuxDialers map[string]ugate.MuxDialer - DNS ugate.UDPHandler - UDPHandler ugate.UDPHandler + MuxDialers map[string]ugate.MuxDialer + DNS ugate.UDPHandler + UDPHandler ugate.UDPHandler } func NewConf(base ...string) ugate.ConfStore { @@ -101,6 +96,9 @@ func NewGate(d ugate.ContextDialer, a *auth.Auth, cfg *ugate.GateCfg, cs ugate.C if d != nil { ug.parentDialer = d } + go ug.H2Handler.UpdateReverseAccept() + ug.DefaultPorts(ug.Config.BasePort) + ug.Start() return ug } @@ -125,6 +123,9 @@ func New(cs ugate.ConfStore, a *auth.Auth, cfg *ugate.GateCfg) *UGate { if cfg.Listeners == nil { cfg.Listeners = map[string]*ugate.Listener{} } + if cfg.Routes == nil { + cfg.Routes = map[string]*ugate.Route{} + } // Merge 'ugate' JSON config, from config store. Get(cs, "ugate", cfg) @@ -141,23 +142,22 @@ func New(cs ugate.ConfStore, a *auth.Auth, cfg *ugate.GateCfg) *UGate { } ug := &UGate{ - Listeners: map[string]*PortListener{}, parentDialer: &net.Dialer{}, MuxDialers: map[string]ugate.MuxDialer{}, Config: cfg, NodesByID: map[string]*ugate.DMNode{}, Nodes: map[uint64]*ugate.DMNode{}, - Conf: map[string]*ugate.Listener{}, Mux: http.NewServeMux(), Auth: a, - ActiveTcp: map[int]*ugate.Stream{}, - DefaultListener: &ugate.Listener{ + ActiveTcp: map[int]*ugate.Conn{}, + DefaultRoute: &ugate.Route{ + Protocol: "OriginalDST", }, Msg: msgs.DefaultMux, } - if l, ok := cfg.Listeners["*"]; ok { - ug.DefaultListener = l + if l, ok := cfg.Routes["*"]; ok { + ug.DefaultRoute = l } @@ -204,23 +204,19 @@ func New(cs ugate.ConfStore, a *auth.Auth, cfg *ugate.GateCfg) *UGate { // Start listening on all configured ports. func (ug *UGate) Start() { - go ug.H2Handler.UpdateReverseAccept() - ug.DefaultPorts(ug.Config.BasePort) - // Explicit TCP forwarders. for k, t := range ug.Config.Listeners { if strings.HasPrefix(k, "udp://") { continue } t.Address = k - ug.Add(t) + ug.StartListener(t) } log.Println("Starting uGate ", ug.Config.Name, ug.Config.BasePort, auth.IDFromPublicKey(auth.PublicKey(ug.Auth.Cert.PrivateKey)), ug.Auth.VIP6) - } @@ -284,7 +280,7 @@ func (ug *UGate) GetOrAddNode(id string) *ugate.DMNode { } // All streams must call this method, and defer OnStreamDone -func (ug *UGate) OnStream(s *ugate.Stream) { +func (ug *UGate) OnStream(s *ugate.Conn) { ug.m.Lock() ug.ActiveTcp[s.StreamId] = s ug.m.Unlock() @@ -296,7 +292,7 @@ func (ug *UGate) OnStream(s *ugate.Stream) { // Called at the end of the connection handling. After this point // nothing should use or refer to the connection, both proxy directions // should already be closed for write or fully closed. -func (ug *UGate) OnStreamDone(str *ugate.Stream) { +func (ug *UGate) OnStreamDone(str *ugate.Conn) { ug.m.Lock() delete(ug.ActiveTcp, str.StreamId) @@ -350,7 +346,7 @@ func (ug *UGate) OnStreamDone(str *ugate.Stream) { // RemoteID returns the node ID based on authentication. // -func RemoteID(s *ugate.Stream) string { +func RemoteID(s *ugate.Conn) string { if s.TLS == nil { return "" } @@ -365,33 +361,27 @@ func RemoteID(s *ugate.Stream) string { return auth.IDFromPublicKey(pk) } - -// Add and Start a real port listener on a port. -// Virtual listeners can be added to ug.Conf or the mux. -func (ug *UGate) Add(cfg *ugate.Listener) error { - ll := &PortListener{Listener: *cfg} - err := ll.Start(ug) - if err != nil { - return err - } - ug.Listeners[cfg.Address] = ll - return nil -} - func (ug *UGate) Close() error { var err error - for _, p := range ug.Listeners { - e := p.Close() - if e != nil { - err = err + for _, p := range ug.Config.Listeners { + if p.NetListener != nil { + e := p.NetListener.Close() + if e != nil { + err = err + } + p.NetListener = nil } - delete(ug.Listeners, p.Address) } return err } // Setup default ports, using base port. -// For Istio, should be 15000 +// For Istio, should be 15000. If running in Knative, use PORT and start +// only a H2 listener. +// +// Will run: +// - plaintext HTTP/1 or H2 - on PORT or base (15000) +// - BTS on 15007 (or 443 if running as root) func (ug *UGate) DefaultPorts(base int) error { // Set if running in a knative env, or if an Envoy runs as a sidecar to handle // TLS, QUIC, H2. In this mode only standard H2/MASQUE are supported, with @@ -405,84 +395,106 @@ func (ug *UGate) DefaultPorts(base int) error { } // ProtoHTTP detects H1/H2 and sends to ug.H2Handler // That deals with auth and dispatches to ugate.Mux - ug.Add(&ugate.Listener{ + ug.StartListener(&ugate.Listener{ Address: haddr, Protocol: ugate.ProtoHTTP, }) // KNative doesn't support other ports by default - but still register them btsAddr := fmt.Sprintf("0.0.0.0:%d", base+ugate.PORT_BTS) - if os.Getuid() == 0 { - btsAddr = ":443" - } + btscAddr := fmt.Sprintf("0.0.0.0:%d", base+ugate.PORT_BTSC) // Main BTS port, with TLS certificates // Normally should be 443 for SNI gateways, when running as root // Use iptables to redirect, or an explicit config for port 443 if running as root. - ug.Add(&ugate.Listener{ + ug.StartListener(&ugate.Listener{ Address: btsAddr, Protocol: ugate.ProtoBTS, ALPN: []string{"h2","h2r"}, }) + ug.StartListener(&ugate.Listener{ + Address: btscAddr, + Protocol: ugate.ProtoBTSC, + }) + if os.Getuid() == 0 { + ug.StartListener(&ugate.Listener{ + Address: "0.0.0.0:443", + Protocol: ugate.ProtoTLS, + ALPN: []string{"h2","h2r"}, + }) + ug.StartListener(&ugate.Listener{ + Address: "0.0.0.0:80", + Protocol: ugate.ProtoHTTP, + }) + } return nil } // Based on the port in the Dest, find the Listener config. // Used when the dest IP:port is extracted from the metadata -func (ug *UGate) FindCfgIptablesIn(m *ugate.Stream) *ugate.Listener { - l := ug.Config.Listeners[m.Dest] +func (ug *UGate) FindRouteIn(m *ugate.Conn) *ugate.Route { + //_, p, _ := net.SplitHostPort(m.Dest) + //l := ug.Config.Listeners[":"+p] + //if l != nil { + // return l + //} + + //l := ug.Config.Listeners["-:"+p] + //if l != nil { + // return &l.Route + //} + return ug.DefaultRoute +} + +// FindRouteOut will use the IP in Dest, and find the cluster +// and endpoints. +func (ug *UGate) FindRouteOut(m *ugate.Conn) *ugate.Route { + l := ug.Config.Routes[m.Dest] if l != nil { return l } - _, p, _ := net.SplitHostPort(m.Dest) - l = ug.Config.Listeners[":"+p] + h, p, _ := net.SplitHostPort(m.Dest) + l = ug.Config.Routes[h] if l != nil { return l } - - l = ug.Config.Listeners["-:"+p] + l = ug.Config.Routes[":"+p] if l != nil { return l } - return ug.DefaultListener + return ug.DefaultRoute } -func (ug *UGate) FindListener(dstaddr net.IP, p uint16, prefix string) *ugate.Listener { +func (ug *UGate) FindRoutePrefix(dstaddr net.IP, p uint16, prefix string) *ugate.Route { port := ":" + strconv.Itoa(int(p)) - l := ug.Config.Listeners[prefix + dstaddr.String() + port] + l := ug.Config.Routes[prefix + dstaddr.String() + port] if l != nil { return l } - l = ug.Config.Listeners[prefix + port] + l = ug.Config.Routes[prefix + port] if l != nil { return l } - l = ug.Config.Listeners[prefix + "-" + port] + l = ug.Config.Routes[prefix + "-" + port] if l != nil { return l } - return ug.DefaultListener + return ug.DefaultRoute } -func (ug *UGate) HandleBTSStream(str *ugate.Stream) error { - if str.Listener == nil { - str.Listener = ug.FindCfgIptablesIn(str) - } - str.PostDial(str, nil) - return ug.H2Handler.HandleHTTPS(str) -} // HandleStream is called for accepted (incoming) streams. // -// Multiplexed streams ( H2 ) also call this method. +// Multiplexed streams ( H2, SNI ) also call this method. // // At this point the stream has the metadata: // -// - Dest and Listener are set. +// - Listener - actual port that accepted connection. +// - Dest - SNI, Host, original dest for iptables, listener's forward addr // - RequestURI // - Host // - Headers @@ -490,36 +502,38 @@ func (ug *UGate) HandleBTSStream(str *ugate.Stream) error { // // In addition TrackStreamIn has been called. // This is a blocking method. -func (ug *UGate) HandleStream(str *ugate.Stream) error { - if str.Listener == nil { - str.Listener = ug.FindCfgIptablesIn(str) +func (ug *UGate) HandleStream(str *ugate.Conn) error { + if str.Route == nil { + str.Route = ug.FindRouteOut(str) } - cfg := str.Listener + route := str.Route - if cfg.Protocol == ugate.ProtoBTS { - str.PostDial(str, nil) - return ug.H2Handler.HandleHTTPS(str) - } + //if route.Protocol == ugate.ProtoBTS || route.Protocol == ugate.ProtoBTSC { + // panic("Should not happen") + // str.PostDial(str, nil) + // // TLS is already wrapped for BTS + // return ug.H2Handler.HandleHTTPS(str) + //} - if cfg.ForwardTo != "" { - str.Dest = cfg.ForwardTo + if route.ForwardTo != "" { + str.Dest = route.ForwardTo } - if cfg.Handler == nil && strings.HasPrefix(cfg.ForwardTo, "/") { + if route.Handler == nil && strings.HasPrefix(route.ForwardTo, "/") { // TODO: register handlers - if cfg.ForwardTo == "/echo" { - cfg.Handler = &EchoHandler{} + if route.ForwardTo == "/echo" { + route.Handler = &EchoHandler{} } } // Config has an in-process handler - not forwarding (or the handler may // forward). - if cfg.Handler != nil { + if route.Handler != nil { // SOCKS and others need to send something back - we don't // have a real connection, faking it. str.PostDial(str, nil) - str.Dest = fmt.Sprintf("%v", cfg.Handler) - err:= cfg.Handler.Handle(str) + str.Dest = fmt.Sprintf("%v", route.Handler) + err:= route.Handler.Handle(str) str.Close() return err } @@ -547,7 +561,7 @@ func (gw *UGate) OnHClose(s string, id string, san string, r *http.Request, sinc } } -func (gw *UGate) OnSClose(str *ugate.Stream, addr net.Addr) { +func (gw *UGate) OnSClose(str *ugate.Conn, addr net.Addr) { if !gw.Config.NoAccessLog { if str.ReadErr != nil || str.WriteErr != nil { log.Printf("%d AC: src=%s://%v dst=%s rcv=%d/%d snd=%d/%d la=%v ra=%v op=%v %v %v", diff --git a/pkg/ugatesvc/ugate_test.go b/pkg/ugatesvc/ugate_test.go index e7141ad..17c013d 100644 --- a/pkg/ugatesvc/ugate_test.go +++ b/pkg/ugatesvc/ugate_test.go @@ -77,7 +77,7 @@ func TestSrv(t *testing.T) { if err != nil { t.Fatal(err) } - mc := ab.(*ugate.Stream) + mc := ab.(*ugate.Conn) log.Println("Result ", res, mc) }) diff --git a/stream.go b/stream.go index 4b54369..a97fe61 100644 --- a/stream.go +++ b/stream.go @@ -19,8 +19,7 @@ import ( "time" ) - -// Stream is the main abstraction, representing a connection with metadata and additional +// Conn is the main abstraction, representing a connection with metadata and additional // helpers. // // The connection is typically: @@ -40,7 +39,7 @@ import ( // // Implements net.Conn - but does not implement ConnectionState(), so the // stream can be used with H2 library. -type Stream struct { +type Conn struct { // StreamId is based on a counter, it is the key in the Active table. // Streams may also have local ids associated with the transport. @@ -75,12 +74,11 @@ type Stream struct { // or if the stream is originated locally and sent to a HTTP dest. // // For streams associated with HTTP server handlers, Out is the ResponseWriter, - // can be retrieved with Stream.ResponseWriter. + // can be retrieved with Conn.ResponseWriter. // @Deprecated - use separate structure when using the h2/h3 stack. Request *http.Request `json:"-"` - - // Metadata to send. Stream implements http.ResponseWriter. + // Metadata to send. Conn implements http.ResponseWriter. // For streams without metadata - will be ignored. // Incoming metadata is set in Request. // TODO: without a request, use a buffer, append headers in serialized format directly, flush on first Write @@ -98,7 +96,7 @@ type Stream struct { // Session is set for all multiplexed streams. May be a quic session, h2 mux, etc. // // nil after close. - // TODO: session.Stream == stream if the stream is used for a session + // TODO: session.Conn == stream if the stream is used for a session Session *Session // Set if the connection finished a TLS handshake. @@ -113,7 +111,6 @@ type Stream struct { // Remote mesh ID, in byte form. Remote [32]byte - // VIP is the internal ID used in dmesh, based on the SHA of address or public key. RemoteVIP uint64 @@ -147,7 +144,7 @@ type Stream struct { // --------------------- // Additional closer, to be called after the proxy function is done and both client and remote closed. - Closer func() `json:"-"` + Closer func() `json:"-"` // Methods to call when the stream is closed on the read side, i.e. received a FIN or RST or // the context was canceled. @@ -172,26 +169,26 @@ type Stream struct { ProxyReadErr error `json:"-"` ProxyWriteErr error `json:"-"` - // Context and cancel funciton for this stream. - ctx context.Context `json:"-"` + ctx context.Context `json:"-"` // Close will invoke this method if set, and cancel the context. ctxCancel context.CancelFunc `json:"-"` // Set for accepted stream, with the config associated with the listener. - Listener *Listener `json:"-"` + Route *Route `json:"-"` // Optional function to call after dial (proxied streams) or after a stream handling has started for local handlers. // Used to send back metadata or finish the handshake. // // For example in SOCKS it sends back the IP/port of the remote. - // net.Conn may be a Stream or a regular TCP/TLS connection. + // net.Conn may be a Conn or a regular TCP/TLS connection. PostDialHandler func(net.Conn, error) `json:"-"` - // True if the Stream is originated from local machine, i.e. - // SOCKS/iptables/TUN capture or dialed from local process - Egress bool + // + // + // + Direction StreamType // If the stream is multiplexed, this is the Mux. MUX *Muxer `json:"-"` @@ -203,7 +200,9 @@ type Stream struct { rbuffer *StreamBuffer // If not nil, this stream has a pooled write attached. - wbuffer *StreamBuffer + wbuffer *StreamBuffer + + Listener *Listener // TODO: add wbuffer // Use Flush() to write the wbuffer. @@ -214,6 +213,22 @@ type Stream struct { } +type StreamType int +const ( + StreamTypeUnknown = 0 + + // Ingress - received on the HBONE mux for the local process, on + // a 'sidecar'. + StreamTypeIn = 1 + + // Egress - indicates if is originated from local machine, i.e. + // SOCKS/iptables/TUN capture or dialed from local process + StreamTypeOut = 2 + + // Forward - received on HBONE mux to forward to a workload + StreamTypeForward = 3 +) + // --------- Buffering and sniffing -------------- // TODO: benchmark different sizes. var bufSize = 32 * 1024 @@ -235,7 +250,7 @@ var bufferedConPool = sync.Pool{New: func() interface{} { // GetStream should be used to get a (recycled) stream. // Streams will be tracked, and must be closed and recycled. -func GetStream(out io.Writer, in io.ReadCloser) *Stream { +func GetStream(out io.Writer, in io.ReadCloser) *Conn { s := NewStream() s.In = in s.Out = out @@ -246,7 +261,7 @@ func GetStream(out io.Writer, in io.ReadCloser) *Stream { // headers or sniffing. The 'Read' and 'WriteTo' methods are aware of the // buffer, and will use the first consume buffered data, but if the buffer is // IsEmpty will use directly In. -func (s *Stream) RBuffer() *StreamBuffer { +func (s *Conn) RBuffer() *StreamBuffer { if s.rbuffer != nil { return s.rbuffer } @@ -259,7 +274,7 @@ func (s *Stream) RBuffer() *StreamBuffer { return s.rbuffer } -func (s *Stream) WBuffer() *StreamBuffer { +func (s *Conn) WBuffer() *StreamBuffer { if s.wbuffer != nil { return s.wbuffer } @@ -278,7 +293,7 @@ func (s *Stream) WBuffer() *StreamBuffer { // // // Future calls to Read() will use the remaining data in the buffer. -func (s *Stream) Fill(nb int) ([]byte, error) { +func (s *Conn) Fill(nb int) ([]byte, error) { b := s.RBuffer() if b.IsEmpty() { b.off = 0 @@ -305,7 +320,7 @@ func (s *Stream) Fill(nb int) ([]byte, error) { } // Skip only implemented for buffer -func (s *Stream) Skip(n int) { +func (s *Conn) Skip(n int) { b := s.rbuffer if n > b.Size() { n -= b.Size() @@ -339,7 +354,7 @@ func (s *Stream) Skip(n int) { } } -func (s *Stream) ReadByte() (byte, error) { +func (s *Conn) ReadByte() (byte, error) { b := s.RBuffer() if b.IsEmpty() { _, err := s.Fill(0) @@ -355,8 +370,8 @@ func (s *Stream) ReadByte() (byte, error) { // ---------------------------------------------- // NewStream create a new stream. This stream is not tracked. -func NewStream() *Stream { - return &Stream{ +func NewStream() *Conn { + return &Conn{ StreamId: int(atomic.AddUint32(&StreamId, 1)), Stats: Stats{Open: time.Now(),}, } @@ -368,8 +383,8 @@ func NewStream() *Stream { // Server validates method, path and scheme=http|https. Req.Body is a pipe - similar with what we use for egress. // Request context is based on stream context, which is a 'with cancel' based on the serverConn baseCtx. // -func NewStreamRequest(r *http.Request, w http.ResponseWriter, con *Stream) *Stream { - return &Stream{ +func NewStreamRequest(r *http.Request, w http.ResponseWriter, con *Conn) *Conn { + return &Conn{ StreamId: int(atomic.AddUint32(&StreamId, 1)), Stats: Stats{Open: time.Now(),}, @@ -381,8 +396,8 @@ func NewStreamRequest(r *http.Request, w http.ResponseWriter, con *Stream) *Stre } } -func NewStreamRequestOut(r *http.Request, out io.Writer, w *http.Response, con *Stream) *Stream { - return &Stream{ +func NewStreamRequestOut(r *http.Request, out io.Writer, w *http.Response, con *Conn) *Conn { + return &Conn{ StreamId: int(atomic.AddUint32(&StreamId, 1)), Stats: Stats{Open: time.Now(),}, OutHeader: w.Header, @@ -394,7 +409,7 @@ func NewStreamRequestOut(r *http.Request, out io.Writer, w *http.Response, con * } } -//func (s *Stream) Reset() { +//func (s *Conn) Reset() { // s.Open = time.Now() // s.LastRead = time.Time{} // s.LastWrite = time.Time{} @@ -417,7 +432,7 @@ const ContextKey = "ugate.stream" // Also auth is more flexibile then mTLS. //// Used by H2 server to populate TLS in accepted requests. //// For 'fake' TLS (raw HTTP) it must be populated. -//func (s *Stream) ConnectionState() tls.ConnectionState { +//func (s *Conn) ConnectionState() tls.ConnectionState { // if s.TLS == nil { // return tls.ConnectionState{Version: tls.VersionTLS12} // } @@ -430,7 +445,7 @@ const ContextKey = "ugate.stream" // // This is NOT associated with the context of the original H2 request, // there is a lot of complexity and strange behaviors in the stack. -func (s *Stream) Context() context.Context { +func (s *Conn) Context() context.Context { //if s.Request != nil { // return s.Request.Context() //} @@ -441,7 +456,7 @@ func (s *Stream) Context() context.Context { return s.ctx } -func (s *Stream) Write(b []byte) (n int, err error) { +func (s *Conn) Write(b []byte) (n int, err error) { n, err = s.Out.Write(b) if err != nil { s.WriteErr = err @@ -457,7 +472,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { return } -func (s *Stream) Flush() { +func (s *Conn) Flush() { if f, ok := s.Out.(http.Flusher); ok { f.Flush() } @@ -474,7 +489,7 @@ func CanSplice(in io.Reader, out io.Writer) bool { return false } -func (s *Stream) Read(out []byte) (int, error) { +func (s *Conn) Read(out []byte) (int, error) { if s.rbuffer != nil { // Duplicated - the other method may be removed. b:=s.rbuffer @@ -510,7 +525,7 @@ func (s *Stream) Read(out []byte) (int, error) { // Must be called at the end. It is expected CloseWrite has been called, for graceful FIN. // -func (s *Stream) Close() error { +func (s *Conn) Close() error { if s.Closed { return nil } @@ -548,7 +563,7 @@ func (s *Stream) Close() error { return s.In.Close() } -func (s *Stream) CloseWrite() error { +func (s *Conn) CloseWrite() error { if s.ServerClose { log.Println("Double CloseWrite") return nil @@ -589,28 +604,28 @@ func (s *Stream) CloseWrite() error { return nil } -func (s *Stream) SetDeadline(t time.Time) error { +func (s *Conn) SetDeadline(t time.Time) error { if cw, ok := s.Out.(net.Conn); ok { cw.SetDeadline(t) } return nil } -func (s *Stream) SetReadDeadline(t time.Time) error { +func (s *Conn) SetReadDeadline(t time.Time) error { if cw, ok := s.Out.(net.Conn); ok { cw.SetReadDeadline(t) } return nil } -func (s *Stream) SetWriteDeadline(t time.Time) error { +func (s *Conn) SetWriteDeadline(t time.Time) error { if cw, ok := s.Out.(net.Conn); ok { cw.SetWriteDeadline(t) } return nil } -func (s *Stream) Header() http.Header { +func (s *Conn) Header() http.Header { if rw, ok := s.Out.(http.ResponseWriter); ok { return rw.Header() } @@ -620,7 +635,7 @@ func (s *Stream) Header() http.Header { return s.OutHeader } -func (s *Stream) WriteHeader(statusCode int) { +func (s *Conn) WriteHeader(statusCode int) { if rw, ok := s.Out.(http.ResponseWriter); ok { rw.WriteHeader(statusCode) return @@ -637,7 +652,7 @@ func (s *Stream) WriteHeader(statusCode int) { // // srcIsRemote indicates that the connection is from the server to client. (remote to local) // If false, the connection is from client to server ( local to remote ) -func (s *Stream) CopyBuffered(dst io.Writer, src io.Reader, srcIsRemote bool) (written int64, err error) { +func (s *Conn) CopyBuffered(dst io.Writer, src io.Reader, srcIsRemote bool) (written int64, err error) { buf1 := bufferPoolCopy.Get().([]byte) defer bufferPoolCopy.Put(buf1) bufCap := cap(buf1) @@ -708,7 +723,7 @@ func (s *Stream) CopyBuffered(dst io.Writer, src io.Reader, srcIsRemote bool) (w } // Send will marshall the metadata (headers) and start sending the body to w. -func (s *Stream) SendHeader(w io.WriteCloser, h http.Header) error { +func (s *Conn) SendHeader(w io.WriteCloser, h http.Header) error { // First format: TAG(=2), 4B LEN, Text headers. Required len, buffer bb := s.WBuffer() @@ -736,12 +751,12 @@ func (s *Stream) SendHeader(w io.WriteCloser, h http.Header) error { return err } if DebugClose { - log.Println("Stream.sendHeaders ", s.StreamId, h) + log.Println("Conn.sendHeaders ", s.StreamId, h) } return nil } -func (s *Stream) ReadHeader(in io.Reader) error { +func (s *Conn) ReadHeader(in io.Reader) error { // TODO: move to buffered stream, unify buf1 := bufferPoolCopy.Get().([]byte) defer bufferPoolCopy.Put(buf1) @@ -762,12 +777,12 @@ func (s *Stream) ReadHeader(in io.Reader) error { s.InHeader = http.Header(mh) if DebugClose { - log.Println("Stream.receiveHeaders ", s.StreamId, s.InHeader) + log.Println("Conn.receiveHeaders ", s.StreamId, s.InHeader) } return nil } -func (s *Stream) LocalAddr() net.Addr { +func (s *Conn) LocalAddr() net.Addr { if s.Session != nil && s.Session.LocalAddr != nil { return s.Session.LocalAddr } @@ -783,7 +798,7 @@ func (s *Stream) LocalAddr() net.Addr { // RemoteAddr is the client (for accepted) or server (for originated). // It should be the real IP, extracted from connection or metadata. // RemoteID returns the authenticated ID. -func (s *Stream) RemoteAddr() net.Addr { +func (s *Conn) RemoteAddr() net.Addr { if s.Session != nil && s.Session.RemoteAddr != nil { return s.Session.RemoteAddr } @@ -812,7 +827,7 @@ func (s *Stream) RemoteAddr() net.Addr { // Reads data from cin (the client/dialed con) until EOF or error // TCP Connections typically implement this, using io.Copy(). -func (s *Stream) ReadFrom(cin io.Reader) (n int64, err error) { +func (s *Conn) ReadFrom(cin io.Reader) (n int64, err error) { //if wt, ok := cin.(io.WriterTo); ok { // return wt.WriteTo(s.ServerOut) @@ -821,7 +836,7 @@ func (s *Stream) ReadFrom(cin io.Reader) (n int64, err error) { //if _, ok := cin.(*os.File); ok { // if _, ok := b.ServerOut.(*net.TCPConn); ok { // if wt, ok := b.ServerOut.(io.ReaderFrom); ok { - // VarzReadFromC.Add(1) + // VarzReadFromC.StartListener(1) // n, err = wt.ReadFrom(cin) // return // } @@ -875,7 +890,7 @@ func (s *Stream) ReadFrom(cin io.Reader) (n int64, err error) { return } -func (b *Stream) PostDial(nc net.Conn, err error) { +func (b *Conn) PostDial(nc net.Conn, err error) { if b.PostDialHandler != nil { b.PostDialHandler(nc, err) } @@ -887,19 +902,19 @@ const DebugClose = true // Proxy the accepted connection to a dialed connection. // Blocking, will wait for both sides to FIN or RST. -func (s *Stream) ProxyTo(nc net.Conn) error { +func (s *Conn) ProxyTo(nc net.Conn) error { errCh := make(chan error, 2) go s.proxyFromClient(nc, errCh) // Blocking, returns when all data is read from In, or error var err1 error - if ncs, ok := nc.(*Stream); ok { + if ncs, ok := nc.(*Conn); ok { if ncs.Out != nil { - err1 = s.proxyToClient(nc, errCh) + err1 = s.proxyToClient(nc) } // TODO: we need to wait for the request to consume the stream. } else { - err1 = s.proxyToClient(nc, errCh) + err1 = s.proxyToClient(nc) } @@ -921,7 +936,7 @@ func (s *Stream) ProxyTo(nc net.Conn) error { // Read from the Reader, send to the cout client. // Updates ReadErr and ProxyWriteErr -func (s *Stream) proxyToClient(cout io.WriteCloser, errch chan error) error { +func (s *Conn) proxyToClient(cout io.WriteCloser) error { s.WriteTo(cout) // errors are preserved in stats, 4 kinds possible // At this point an error or graceful EOF from our Reader has been received. @@ -964,7 +979,7 @@ func (s *Stream) proxyToClient(cout io.WriteCloser, errch chan error) error { } // WriteTo implements the interface, using the read buffer. -func (s *Stream) WriteTo(w io.Writer) (n int64, err error) { +func (s *Conn) WriteTo(w io.Writer) (n int64, err error) { // Finish up the buffer first if s.rbuffer != nil && !s.rbuffer.IsEmpty() { b := s.rbuffer @@ -1043,7 +1058,7 @@ func NoEOF(err error) error { // proxyFromClient reads from cin, writes to the stream. Should be in a go routine. // Updates ProxyReadErr and WriteErr -func (s *Stream) proxyFromClient(cin io.ReadCloser, errch chan error) { +func (s *Conn) proxyFromClient(cin io.ReadCloser, errch chan error) { _, err := s.ReadFrom(cin) // At this point cin either returned an EOF (FIN), or error (RST from remote, or error writing) if NoEOF(s.ProxyReadErr) != nil || s.WriteErr != nil { diff --git a/test/e2e.go b/test/e2e.go index aa41d53..55242dd 100644 --- a/test/e2e.go +++ b/test/e2e.go @@ -52,7 +52,7 @@ func InitEcho(port int) *ugatesvc.UGate { }, cs) // Echo - TCP - ug.Add(&ugate.Listener{ + ug.StartListener(&ugate.Route{ Address: fmt.Sprintf("0.0.0.0:%d", port + 12), Handler: &ugatesvc.EchoHandler{}, }) @@ -75,12 +75,12 @@ func InitTestServer(kubecfg string, cfg *ugate.GateCfg, ext func(*ugatesvc.UGate } // Echo - TCP - ug.Add(&ugate.Listener{ + ug.StartListener(&ugate.Route{ Address: fmt.Sprintf("0.0.0.0:%d", basePort+11), Protocol: "tls", Handler: &ugatesvc.EchoHandler{}, }) - ug.Add(&ugate.Listener{ + ug.StartListener(&ugate.Route{ Address: fmt.Sprintf("0.0.0.0:%d", basePort+12), Handler: &ugatesvc.EchoHandler{}, }) @@ -104,7 +104,7 @@ func CheckEcho(in io.Reader, out io.Writer) (string, error) { return "", err } - //ab.SetDeadline(time.Now().Add(5 * time.Second)) + //ab.SetDeadline(time.Now().StartListener(5 * time.Second)) n, err := in.Read(d) if err != nil { return "", err diff --git a/tools/setup/cluster.sh b/tools/setup/cluster.sh new file mode 100644 index 0000000..3ebdcd9 --- /dev/null +++ b/tools/setup/cluster.sh @@ -0,0 +1,74 @@ +# Example command to create a regular cluster. +function create_cluster() { + + cloud beta container --project "${PROJECT_ID}" clusters create \ + "${CLUSTER_NAME}" --zone "${CLUSTER_LOCATION}" \ + --no-enable-basic-auth \ + --cluster-version "1.20.8-gke.700" \ + --release-channel "rapid" \ + --machine-type "e2-standard-8" \ + --image-type "COS_CONTAINERD" \ + --disk-type "pd-standard" \ + --disk-size "100" \ + --metadata disable-legacy-endpoints=true \ + --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \ + --max-pods-per-node "110" \ + --num-nodes "1" \ + --enable-stackdriver-kubernetes \ + --enable-ip-alias \ + --network "projects/${PROJECT_ID}/global/networks/default" \ + --subnetwork "projects/${PROJECT_ID}/regions/${REGION}/subnetworks/default" \ + --no-enable-intra-node-visibility \ + --default-max-pods-per-node "110" \ + --enable-autoscaling \ + --min-nodes "0" \ + --max-nodes "9" \ + --enable-network-policy \ + --no-enable-master-authorized-networks \ + --addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver \ + --enable-autoupgrade \ + --enable-autorepair \ + --max-surge-upgrade 1 \ + --max-unavailable-upgrade 0 \ + --workload-pool "${PROJECT_ID}.svc.id.goog" \ + --enable-shielded-nodes \ + --node-locations "${CLUSTER_LOCATION}" + +} + +# WIP: using an autopilot cluster for configurations. Note that only gateways can run right now inside the +# autopilot - other workloads should be in regular clusters (iptables) +function create_cluster_autopilot() { + gcloud beta container --project "${PROJECT_ID}" clusters create-auto \ + "${CLUSTER_NAME}" --region "${CLUSTER_LOCATION}" \ + --release-channel "regular" \ + --network "projects/${PROJECT_ID}/global/networks/default" \ + --subnetwork "projects/${PROJECT_ID}/regions/${REGION}/subnetworks/default" \ + --cluster-ipv4-cidr "/17" \ + --services-ipv4-cidr "/22" + + gcloud container clusters get-credentials ${CLUSTER_NAME} --zone ${CLUSTER_LOCATION} --project ${PROJECT_ID} + kubectl create ns istio-system + +} + +function setup_asm() { + curl https://storage.googleapis.com/csm-artifacts/asm/install_asm_1.10 > install_asm + chmod +x install_asm + + # Managed CP: + ./install_asm --mode install --output_dir ${CLUSTER_NAME} --enable_all --managed +} + +# Per project setup +function setup_project() { + gcloud services enable --project ${PROJECT_ID} vpcaccess.googleapis.com + gcloud compute networks vpc-access connectors create serverlesscon \ + --project ${PROJECT_ID} \ + --region ${REGION} \ + --subnet default \ + --subnet-project ${PROJECT_ID} \ + --min-instances 2 \ + --max-instances 10 +} + diff --git a/ugate.go b/ugate.go index 0dcd06c..68025d0 100644 --- a/ugate.go +++ b/ugate.go @@ -56,16 +56,20 @@ type GateCfg struct { Domain string `json:"domain,omitempty"` // TODO: other trusted domains/meshes (federation, migration, etc) - //DomainAliases []string `json:"domainAliases,omitempty"` + DomainAliases []string `json:"domainAliases,omitempty"` // Additional port listeners. - // Egress: listen on 127.0.0.1:port + // Routes: listen on 127.0.0.1:port // Ingress: listen on 0.0.0.0:port (or actual IP) // // Port proxies: will register a listener for each port, forwarding to the // given address. Listeners map[string]*Listener `json:"listeners,omitempty"` + //Local map[string]*Route `json:"local,omitempty"` + + Routes map[string]*Route `json:"routes,omitempty"` + // Configured hosts, key is a domain name without port. // This includes public keys, active addresses. Discovery and on-demand // are also used to load this info. @@ -75,7 +79,7 @@ type GateCfg struct { // secure - Wireguard or IPsec equivalent). Hosts map[string]*DMNode `json:"hosts,omitempty"` - // Egress gateways for reverse H2 - key is the domain name, value is the + // Routes gateways for reverse H2 - key is the domain name, value is the // pubkey or root CA. If IsEmpty, public certificates are required. H2R map[string]string `json:"remoteAccept,omitempty"` @@ -103,6 +107,9 @@ const ( // H2, HTTPS, H2R PORT_BTS = 7 + + // H2C + PORT_BTSC = 8 ) // Statistics for streams and hosts @@ -249,12 +256,17 @@ type DMNode struct { } const ProtoTLS = "tls" +const ProtoSNI = "sni" // autodetected for TLS. const ProtoH2 = "h2" const ProtoHTTP = "http" // 1.1 const ProtoHTTPTrusted = "httpTrusted" // behind envoy or trusted gateway const ProtoBTS = "bts" +const ProtoBTSC = "btsc" + +const ProtoTCPOut = "tcpOut" +const ProtoTCPIn = "tcpIn" const ProtoHAProxy = "haproxy" @@ -263,8 +275,7 @@ const ProtoSocks = "socks5" const ProtoIPTables = "iptables" const ProtoIPTablesIn = "iptables-in" -// Listener represents the configuration for accepted streams. -// +// Listener represents the configuration for a real port listener. // uGate has a set of special listeners that multiplex requests: // - socks5 dest // - iptables original dst ( may be combined with DNS interception ) @@ -275,9 +286,6 @@ const ProtoIPTablesIn = "iptables-in" // // Multiplexed channels do an additional lookup to find the listener // based on the channel address. -// -// uGate doesn't use a 'route' - a real gateway should be used for -// low level routes, or a handler. type Listener struct { // Address address (ex :8080). This is the requested address. @@ -299,7 +307,6 @@ type Listener struct { ForwardTo string `json:"forwardTo,omitempty"` // Must block until the connection is fully handled ! - // @Deprecated - use ForwardTo -:NAME and register handlers Handler Handler `json:-` // ALPN to announce, for TLS listeners @@ -310,6 +317,39 @@ type Listener struct { // Certificates to use. // Key is a domain, *.domain or *. Certs map[string]string + + NetListener net.Listener `json:-` + PortHandler Handler `json:-` +} + +// Route controls the routing in the gate. +// Address is used to match the destination of a stream: +// - VIP or IP from iptables capture +// - +type Route struct { + // Address address (ex :8080). This is the requested address. + // + // BTS, SOCKS, HTTP_PROXY and IPTABLES have default ports and bindings, don't + // need to be configured here. + Address string `json:"address,omitempty"` + + // How to connect. Default: original dst + Protocol string `json:"proto,omitempty"` + + // ForwardTo where to forward the proxied connections. + // Used for accepting on a dedicated port. Will be set as Dest in + // the stream, can be mesh node. + // host:port format. + ForwardTo string `json:"forwardTo,omitempty"` + + // Must block until the connection is fully handled ! + // @Deprecated - use ForwardTo -:NAME and register handlers + Handler Handler `json:-` + + + //SAN []string + + //Endpoints []string } @@ -338,7 +378,7 @@ type Listener struct { // ContextDialer is same with x.net.proxy.ContextDialer // Used to create the actual connection to an address using the mesh. -// The result may have metadata, and be an instance of ugate.Stream. +// The result may have metadata, and be an instance of ugate.Conn. // // A uGate implements this interface, it is the primary interface // for creating streams where the caller does not want to pass custom @@ -368,7 +408,7 @@ type StreamDialer interface { // inStream.In, if not nil, will be automatically forwarded to new dialed stream. // inStream in headers, if set, will be forwarded to the remote host. // - DialStream(ctx context.Context, addr string, inStream *Stream) (*Stream, error) + DialStream(ctx context.Context, addr string, inStream *Conn) (*Conn, error) } // Muxer is the interface implemented by a multiplexed connection with metadata @@ -389,7 +429,7 @@ type MuxDialer interface { // For non-mesh nodes the H2 connection may not allow incoming streams or // messages. Mesh nodes emulate incoming streams using /h2r/ and send/receive // messages using /.dm/msg/ - DialMux(ctx context.Context, node *DMNode, meta http.Header, ev func(t string, stream *Stream)) (Muxer, error) + DialMux(ctx context.Context, node *DMNode, meta http.Header, ev func(t string, stream *Conn)) (Muxer, error) } @@ -409,44 +449,44 @@ type Session struct { // StreamDialer is similar with RoundTrip, makes a single connection using a MUX. // -// Unlike ContextDialer, also takes 'meta' and returns a Stream ( which implements net.Conn). +// Unlike ContextDialer, also takes 'meta' and returns a Conn ( which implements net.Conn). // // UGate implements ContextDialer, so it can be used in other apps as a library without // dependencies to the API. The context can be used for passing metadata. // It also implements RoundTripper, since streams are mapped to HTTP. //type StreamDialer interface { -// DialStream(ctx context.Context, netw string, addr string, meta http.Header) (*Stream, error) +// DialStream(ctx context.Context, netw string, addr string, meta http.Header) (*Conn, error) //} // Handler is a handler for net.Conn with metadata. // Lighter alternative to http.Handler type Handler interface { - Handle(conn *Stream) error + Handle(conn *Conn) error } // Wrap a function as a stream handler. -type HandlerFunc func(conn *Stream) error +type HandlerFunc func(conn *Conn) error -func (c HandlerFunc) Handle(conn *Stream) error { +func (c HandlerFunc) Handle(conn *Conn) error { return c(conn) } // HeaderEncoder abstracts the encoding of metadata. // Standard HTTP/2 or QUIC, proto, other formats may be used. // -// This interface uses the Stream buffer and encodes/decodes the +// This interface uses the Conn buffer and encodes/decodes the // metadata associated with the stream. type HeaderEncoder interface { // Marshall will encode the headers into the wBuffer. // Flush will need to be called to send to s.Out - Marshal(s *Stream) error + Marshal(s *Conn) error // AddHeader directly to the stream buffer - without adding it to the meta. - AddHeader(s *Stream, k, v []byte) + AddHeader(s *Conn, k, v []byte) // Unmarshall will decode from s.rBuffer. s.Fill() may be called to get // additional data. The decoded headers will be set on the stream. - Unmarshal(s *Stream) (done bool, err error) + Unmarshal(s *Conn) (done bool, err error) } @@ -586,7 +626,7 @@ type NodeAnnounce struct { // // On return, the stream ServerOut and ServerIn will be // // populated, and connected to stream Dest. // // deprecated: use CreateStream -// DialProxy(tp *Stream) error +// DialProxy(tp *Conn) error // // // The VIP of the remote host, after authentication. // RemoteVIP() net.IP diff --git a/webpush/http.go b/webpush/http.go index d0377c1..5629ef7 100644 --- a/webpush/http.go +++ b/webpush/http.go @@ -251,7 +251,7 @@ func (mux *Mux) SubscribeHandler(res http.ResponseWriter, req *http.Request) { // May provide support for set: should be enabled if a // set interface is present, want to test without set as well - //res.Header().Add("Link", "

;rel=\"urn:ietf:params:push:set\"") diff --git a/webpush/mux.go b/webpush/mux.go index 77c4ab3..b875777 100644 --- a/webpush/mux.go +++ b/webpush/mux.go @@ -299,7 +299,7 @@ func (r *rw) WriteHeader(statusCode int) { r.Code = statusCode } -// Add a local handler for a specific message type. +// StartListener a local handler for a specific message type. // Special topics: *, /open, /close func (mux *Mux) AddHandler(path string, cp MessageHandler) { mux.mutex.Lock() diff --git a/webpush/push.go b/webpush/push.go index a1238f9..dce7e95 100644 --- a/webpush/push.go +++ b/webpush/push.go @@ -30,10 +30,10 @@ import ( // } // // // TODO: Make the TTL variable -// req.Header.Add("TTL", "0") +// req.Header.StartListener("TTL", "0") // // if token != "" { -// req.Header.Add("Authorization", fmt.Sprintf(`key=%s`, token)) +// req.Header.StartListener("Authorization", fmt.Sprintf(`key=%s`, token)) // } // // // If there is no payload then we don't actually need encryption @@ -48,9 +48,9 @@ import ( // // req.Body = ioutil.NopCloser(bytes.NewReader(payload.Ciphertext)) // req.ContentLength = int64(len(payload.Ciphertext)) -// req.Header.Add("Encryption", headerField("salt", payload.Salt)) -// req.Header.Add("Crypto-Key", headerField("dh", payload.ServerPublicKey)) -// req.Header.Add("Content-Encoding", "aesgcm") +// req.Header.StartListener("Encryption", headerField("salt", payload.Salt)) +// req.Header.StartListener("Crypto-Key", headerField("dh", payload.ServerPublicKey)) +// req.Header.StartListener("Content-Encoding", "aesgcm") // // return req, nil //} @@ -142,7 +142,7 @@ func SubscriptionFromJSON(b []byte) (*Subscription, error) { // // req, err := NewPushRequest(sub, message, token) // // Default TTL -// req.Header.Add("ttl", "0") +// req.Header.StartListener("ttl", "0") // if err != nil { // return nil, err // }