Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to use this repo with gin? #73

Closed
Jekinnnnnn opened this issue Jul 31, 2021 · 12 comments
Closed

how to use this repo with gin? #73

Jekinnnnnn opened this issue Jul 31, 2021 · 12 comments
Labels

Comments

@Jekinnnnnn
Copy link

our project use gin as http router,and upgrade websocket request in gin handler with websocket lib,
but this repo seem need to register http handle with repo own server.
is there has a way to use this repo like other websocket libs?
besides,i notice the api OnDataFrame,if server should work with half a pack when client send big data with frames?
can i assume data is complete when callling handler OnMessage?

@lesismal
Copy link
Owner

lesismal commented Jul 31, 2021

For http, we can use gin router like this:

package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/lesismal/nbio/nbhttp"
)

func main() {
	router := gin.New()
	router.GET("/hello", func(c *gin.Context) {
		c.String(http.StatusOK, "hello")
	})

	svr := nbhttp.NewServer(nbhttp.Config{
		Network: "tcp",
		Addrs:   []string{"localhost:8080"},
	}, router, nil)

	err := svr.Start()
	if err != nil {
		fmt.Printf("nbio.Start failed: %v\n", err)
		return
	}

	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, os.Interrupt)
	<-interrupt

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()
	svr.Shutdown(ctx)

}

For websocket, we can not use this repo for gin or other std based repo with other std based websocket frameworks, because the std based repo websocket frameworks does not support non-blocking, their Upgrade and Read/Writte interfaces are still blocked which may make the application layer's goroutine pool blocked, then the service becomes slow or even not available.
You can use nbio/nbhttp/websocket with gin like this:

package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/lesismal/nbio/nbhttp"
	"github.com/lesismal/nbio/nbhttp/websocket"
)

type HttpHandler struct {
	router *gin.Engine
}

func (h *HttpHandler) OnWebsocket(w http.ResponseWriter, r *http.Request) {
	upgrader := websocket.NewUpgrader()
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		panic(err)
	}
	wsConn := conn.(*websocket.Conn)
	wsConn.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
		// echo
		fmt.Println("websocket:", messageType, string(data), len(data))
		c.WriteMessage(messageType, data)
	})
	wsConn.OnClose(func(c *websocket.Conn, err error) {
		fmt.Println("OnClose:", c.RemoteAddr().String(), err)
	})
	fmt.Println("OnOpen:", wsConn.RemoteAddr().String())
}

func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	switch r.URL.Path {
	case "/ws":
		h.OnWebsocket(w, r)
	default:
		h.router.ServeHTTP(w, r)
	}
}

func main() {
	router := gin.New()
	router.GET("/hello", func(c *gin.Context) {
		c.String(http.StatusOK, "hello")
		fmt.Println("http: hello")
	})

	svr := nbhttp.NewServer(nbhttp.Config{
		Network: "tcp",
		Addrs:   []string{"localhost:8080"},
	}, &HttpHandler{router}, nil)

	err := svr.Start()
	if err != nil {
		fmt.Printf("nbio.Start failed: %v\n", err)
		return
	}

	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, os.Interrupt)
	<-interrupt

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()
	svr.Shutdown(ctx)

}

Or if there is a way to get the origin ResponseWriter from gin.Context(but I'm not sure how to get it), we can use nbio/nbhttp/websocket like this:

r := c.Request
w := c.SomeGetOriginHttpResponseWriterInterface()
upgrader := websocket.NewUpgrader()
conn, err := upgrader.Upgrade(w, r, nil)
...

@lesismal
Copy link
Owner

广州的?咱们中文吧。

其他的websocket框架,比如 gorilla/websocket,他们的 Upgrade/Read/Write 是阻塞的,如果poller本次读时间只有半个 http Upgrade Request或者半个websocket frame, 他们的 Upgrade/Read 就会失败。

还有像 gobwas/ws 这个库配合他们所说的 netpoll 方案,因为也是同样没有实现异步流解析,它的Upgrade/Read/Write也同样存在阻塞的问题,会导致应用层协程池占用甚至耗尽、服务变慢甚至不可用。前几年流行的一个关于golang websocke百万连接的帖子和仓库就是基于 gobwas/ws 的方案,但是他们的方案都存在这个问题。我有在他们的仓库中指出这个问题,有兴趣可以去他们的仓库issue列表里搜一下我相关的issue。

异步非阻塞库的目的是减少高并发场景的协程数量,进而减少内存占用量、gc压力、调度压力等;
这种方案的关键在于实现:

  1. poller网络库
  2. 异步流解析
  3. 应用层协程池的合理使用

nbio目前是完整实现了这些的,还支持异步tls

其他已有方案,除了 gev 的websocket是实现了异步流解析(但是实现较为简单、不够完善),其他已知的golang异步非阻塞网络库均没有实现异步流解析,其他的基于std的框架也都存在Upgrade/Read/Write阻塞的问题、无法用于poller方案。

@lesismal
Copy link
Owner

besides,i notice the api OnDataFrame,if server should work with half a pack when client send big data with frames?
can i assume data is complete when callling handler OnMessage?

OnDataFrame是收到帧,使用它时应该让协程池有序处理每一帧,nbhttp默认使用的协程池是无序的,如果使用OnDataFrame,应该配合有序协程池,比如:
https://github.com/lesismal/nbio/blob/master/examples/websocket/server_autobahn/server.go#L69

OnMessage收到的消息是完整的,无需自己再进行frame拼接,处理OnMessage的默认协程池是无序的,如果需要保证有序处理单个连接的websocket消息,也可以使用这个有序协程池:
https://github.com/lesismal/nbio/blob/master/examples/websocket/server_autobahn/server.go#L69

另外,关于异步网络框架配合同步业务逻辑的方案,要注意业务协程池的数量、其他IO阻塞场景等的参数配合,比如协程池的size可以根据自家业务实测情况的需要进行设置:

messageHandlerExecutePool := taskpool.NewFixedPool(100, 1000)

异步网络层+同步业务逻辑的具体影响在 gev 的讨论帖中有聊到,有兴趣可以来看看:
Allenxuxu/gev#4 (comment)

@lesismal
Copy link
Owner

lesismal commented Aug 1, 2021

提交了一些新的代码发布了,最新版本不需要上面那种HttpHandler的Wrap了,但websocket仍需要使用nbhttp的,gin的完整示例:
https://github.com/lesismal/nbio_with_other_frameworks/blob/master/gin_server/gin_server.go

package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/lesismal/nbio/nbhttp"
	"github.com/lesismal/nbio/nbhttp/websocket"
)

func main() {
	router := gin.New()
	router.GET("/hello", func(c *gin.Context) {
		c.String(http.StatusOK, "hello")
		fmt.Println("http: hello")
	})
	router.GET("/ws", func(c *gin.Context) {
		w := c.Writer
		r := c.Request
		upgrader := websocket.NewUpgrader()
		conn, err := upgrader.Upgrade(w, r, nil)
		if err != nil {
			panic(err)
		}
		wsConn := conn.(*websocket.Conn)
		wsConn.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
			// echo
			fmt.Println("websocket:", messageType, string(data), len(data))
			c.WriteMessage(messageType, data)
		})
		wsConn.OnClose(func(c *websocket.Conn, err error) {
			fmt.Println("OnClose:", c.RemoteAddr().String(), err)
		})
		fmt.Println("OnOpen:", wsConn.RemoteAddr().String())
	})

	svr := nbhttp.NewServer(nbhttp.Config{
		Network: "tcp",
		Addrs:   []string{"localhost:8888"},
	}, router, nil)

	err := svr.Start()
	if err != nil {
		fmt.Printf("nbio.Start failed: %v\n", err)
		return
	}

	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, os.Interrupt)
	<-interrupt

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()
	svr.Shutdown(ctx)

}

@Jekinnnnnn
Copy link
Author

thank you for your reply.
as a side note,it's good for you to use english for introducing your repo to the whole world,that's why i submit issue in english first.
Back to the main topic, i found a new issue about how server proactively close a single websocket? It looks like there is no such an interface.
I've seen most of the code,i think code can be more concise,e.g separate configurations,another layer of abstraction for the connection layer(websocket to conn maybe would like conn_uinx,this part is a bit confusing to me),and replace Gopher with a more meaningful name.

@lesismal
Copy link
Owner

lesismal commented Aug 2, 2021

as a side note,it's good for you to use english for introducing your repo to the whole world,that's why i submit issue in english first.

There's something hard to explain in English due to my poor English, so I replied in Chinese 😂

Back to the main topic, i found a new issue about how server proactively close a single websocket? It looks like there is no such an interface.

Just use:

wsConn.Close()

I've seen most of the code,i think code can be more concise,e.g separate configurations,another layer of abstraction for the connection layer(websocket to conn maybe would like conn_uinx,this part is a bit confusing to me)

I'm afraid it is not that easy as you think... 😂
You can try it, or show me some examples of your design, or make a pull request, I will be happy to try with better code.

and replace Gopher with a more meaningful name.

I did spend time thinking of a good name, but I failed to think of a good name. . .

@randree
Copy link

randree commented Apr 28, 2022

Hi @lesismal, I use your example to combine gin with nbio and it seems to work great. Its super fast and scalable. There is only one issue which I try to figure out. How can I make use of c *gin.Context inside of OnMessage? I use gin to retrieve a state, lets say an userName and that name should be attached to the message send by c.WriteMessage. Basically I need a user state with several parameters for each connection. How can I do this?
...and thanks for your work!

@lesismal
Copy link
Owner

lesismal commented Apr 28, 2022

Hi @lesismal, I use your example to combine gin with nbio and it seems to work great. Its super fast and scalable. There is only one issue which I try to figure out. How can I make use of c *gin.Context inside of OnMessage? I use gin to retrieve a state, lets say an userName and that name should be attached to the message send by c.WriteMessage. Basically I need a user state with several parameters for each connection. How can I do this? ...and thanks for your work!

Thank you for your attention and feedback!
You can save the session info by websocket.Conn.SetSession, please try like this:

func onWebsocket(c *gin.Context) {
	var userInfo *YourType := getFromGinContext(c)

	upgrader := websocket.NewUpgrader()
	upgrader.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
		// step 2: get your session info
		session := c.Session(); 
		 if session == nil {
			c.Close()
			// do something with user
			return 
		} 

		user := session.(*YourType)
		// do something with user
	})

	w := c.Writer
	r := c.Request
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		// ...
		return
	}
	wsConn := conn.(*websocket.Conn)
	wsConn.OnClose(func(c *websocket.Conn, err error) {
		// ...
	})

	// step 1: save your session info
	wsConn.SetSession(userInfo)
}

@Jekinnnnnn Jekinnnnnn reopened this Apr 28, 2022
@randree
Copy link

randree commented Apr 28, 2022

@lesismal thank you so much! I tried it. Perfect! It is working like a charm.

So with that, it is possible to combine gin and nbio if a combination of REST and websockets is needed on large scale.

@lesismal
Copy link
Owner

@Jekinnnnnn 翻了下以往的回复

OnDataFrame是收到帧,使用它时应该让协程池有序处理每一帧,nbhttp默认使用的协程池是无序的,如果使用OnDataFrame,应该配合有序协程池

现在的版本,每个Conn对应的OnDataFrame、OnMessage的执行都是默认有序的了

@github-actions
Copy link

This issue is stale because it has been open for 30 days with no activity.

@github-actions github-actions bot added the stale label May 29, 2022
@github-actions
Copy link

This issue was closed because it has been inactive for 14 days since being marked as stale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants