-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
on-closed not fired when client closes connection #375
Comments
|
Thanks for this info, it's very useful. My initial theory as to why I'm not seeing the on-closed callback called must be wrong. Why do you think the call back isn't called? Here's a test case: https://github.com/juxt/aleph-issue I'm seeing this with alpha5. I can try with older releases. |
@kachayev, thanks again. I now see that the stream is indeed an atom, which contains nil when Netty calls :channel-inactive
([_ ctx]
(when-let [s @stream]
(s/close! s))
(doseq [b @buffer]
(netty/release b))) The symbol I guess I need to go back and think about this some more. |
@malcolmsparks The code you're referring both in case of If you're looking Hope that reveals some useful insights. But I cannot reproduce the issue. Using (require '[aleph.http :as http])
=> nil
(require '[manifold.stream :as s])
=> nil
(defn handler [req]
(let [s' (s/stream)]
(s/on-closed s' (fn [] (println "Stream closed!")))
(s/on-drained s' (fn [] (println "Stream is drained!")))
(future
(loop [n 1]
(when (and (< n 100)
@(s/put! s' (str "data: " n "\r\n\r\n")))
(Thread/sleep 100)
(recur (inc n)))))
{:status 200
:headers {"content-type" "text/event-stream"}
:body s'}))
=> #'user/handler
(def server (http/start-server handler {:port 8080}))
=> #'user/server
Stream closed!
Stream is drained! |
Thanks @kachayev , I now understand the point about the request stream versus the response stream. The problem I'm trying to solve can be demonstrated if you comment out the I understand that the response stream is closed upon |
Ah, I see now. It works this way: (defn handler [s' req]
(s/on-closed s' (fn [] (println "Stream closed!")))
(s/on-drained s' (fn [] (println "Stream is drained!")))
{:status 200
:headers {"content-type" "text/event-stream"}
:body s'})
=> #'user/handler
(def s' (s/stream))
=> #'user/s'
(def server (http/start-server (partial handler s') {:port 8080}))
=> #'user/server
(s/put! s' "Hello")
Stream closed!
Stream is drained!
=> << true >> So, the idea here is that closing src does not propagate to the body, but when you're trying to put something onto the channel it figures out that it should be closed (due to internal of We could probably close |
I think changing There is an open issue on Manifold to allow for upstream propagation, which I started poking at a while back, but didn't finish. Linking it here for cross-referencing: clj-commons/manifold#128. |
@ztellman I think it's safe now to close this issue 🤔 |
My context is that I'm trying to implement GraphQL subscriptions for Lacinia with aleph (with WS and SSE, but that's not relevant). A GraphQL subscription event might be for, say, my age changing. That happens once a year, so is rare. Ideally, if the client gets bored of waiting for that event they should be able to close the connection which should trigger the unsubscription of the event source.
Let's consider the following request handler:
When I connect using
curl
, I see an empty event-stream, the connection remains alive (as it should) but when aCtrl-c
I don't get the expected "Stream closed, so I can unsubscribe!" message.Let's amend the handler by adding some events:
Now, if I connect with curl, I do see the println message. To understand why, look at
aleph.http.server/ring-handler
.When the client closes the channel we do indeed get the
:channel-inactive
event. However, the implementation then derefs the stream, which blocks until the next event (which in my contrived example might be a year away!). In the case I've got bored and closed the connection myself, the deref returns nil and so the(s/close! s)
body isn't executed.So I don't get any notification to unsubscribe my subscriber. Of course, I can wait (maybe a year) until the next event is generated and try to
put!
on the closed channel, and unsubscribing upon it returning false. I can also live with the assumption that GraphQL events will be produced fairly frequently, but it doesn't feel at all efficient/optimal.Is there a better way of signalling the closing client connection rather than attempting to
put!
on the stream? It seems that given the fact that:channel-inactive
is called by Netty to aleph, there should be a way of signalling 'user-space' code.The text was updated successfully, but these errors were encountered: