Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data loss with io/decode-stream #60

Closed
bo-tato opened this issue Jan 12, 2023 · 9 comments
Closed

data loss with io/decode-stream #60

bo-tato opened this issue Jan 12, 2023 · 9 comments

Comments

@bo-tato
Copy link
Contributor

bo-tato commented Jan 12, 2023

I was coding this simple problem to learn clojure, it's just a server that reads newline separated json requests over tcp and responds to them. My code using gloss is here. I found gloss as that's what the aleph examples used though later I realized for this task there's simpler to-line-seq from byte-streams. I ran into a bug with decode-stream that it seems is already known
I'm totally new at clojure and manifold streams and everything so this is my understanding of the bug and my fix, that may be totally wrong:
decode-stream reads from some stream and writes to out, as soon as the input stream is drained, it calls close on out. As calls to put on out are non-blocking, maybe the last call to put! on out hasn't written yet at the time we close out
Edit: I think my understanding was wrong it seems if you call put! or put-all! on a stream and then immediately close! on a stream those puts still get written before it is closed. I think the race condition is that it reads bytes from src, parses it, and then calls put on dst, so src can be drained and trigger close call on dst, while it is still parsing and before it's called put!
here is the code that closes it:

(s/connect-via src f dst {:downstream? false})
(s/on-drained src #(do (f []) (s/close! dst)))

As far as I understand manifold streams, the only effect of :downstream? option is whether manifold will automatically close the stream for us, when manifold does it automatically it does wait for input stream to be drained or closed and the pending writes to out to finish writing before closing out. I just deleted those two lines and replaced with:

(s/connect-via src f dst)

Like I said I don't know clojure or manifold so I'm not sure if there is undesired consequence of leaving downstream as default of true, but it seems for my program it fixes this bug. All the tests on protohackers pass after I made that change.

Also with this little program I ran into this bug that is already fixed in git, but not in the published version of the package. Seems weird that with a very simple usecase (so I assume it happens also in plenty of real programs) I run into two bugs that are already known, that in a real program could cause subtle hard to debug issues. It seems important to at least add a note in the documentation if don't have the time to make a proper fix.

Other than the painful experience debugging, I really like the library! it looks very nice for simply dealing with binary protocols

@KingMob
Copy link
Collaborator

KingMob commented Jan 15, 2023

@bo-tato Thanks for using gloss and looking into this. Sorry you ran into problems.

Unfortunately, your suggested change is something I already considered, and it breaks a bunch of stuff. I talk about it in more detail in #53 (comment), but the gist is, a bunch of code relies on that final empty vector being sent when the source is drained, so it can't easily be deleted.

Unfortunately, there's two major reasons it's not fixed: gloss is not very popular, so it's lower priority, but worse, truly fixing the bug may require a massive overhaul of some of Manifold's internals.

Edit: I think my understanding was wrong it seems if you call put! or put-all! on a stream and then immediately close! on a stream those puts still get written before it is closed. I think the race condition is that it reads bytes from src, parses it, and then calls put on dst, so src can be drained and trigger close call on dst, while it is still parsing and before it's called put!

Your intuition is correct. The solution for gloss might be as simple as locking the dst so it can't be closed while the callback is running, but that's tricky to do without hurting performance. Like I said, this is not an easy fix.

@KingMob
Copy link
Collaborator

KingMob commented Jan 15, 2023

You're right that I should probably cut a new release that includes the buffer-underflow fix. I'll get on that.

@KingMob
Copy link
Collaborator

KingMob commented Jan 16, 2023

OK, 0.3.3 is available with the buffer underflow fix: https://clojars.org/org.clj-commons/gloss/versions/0.3.3

@bo-tato
Copy link
Contributor Author

bo-tato commented Jan 19, 2023

thanks! yea it looks tricky I realize now my original "fix" breaks it in other cases and doesn't even pass the tests.
I have another idea for a fix, that might also be wrong as I'm quite new with clojure and this and manifold libraries, but did spend some time trying to understand what's going on and this one does pass all the tests, so it might work. I just replaced the line:

(s/on-drained src #(do (f []) (s/close! dst)))

with:

(s/on-drained src #(let [state @state-ref]
                     (binding [complete? (s/drained? src)]
                       (let [[leftover] (decode-byte-sequence
                                         (:codecs state)
                                         (:bytes state))]
                         (if leftover
                           (s/connect leftover dst)
                           (s/close! dst))))))

My maybe wrong understanding of the original code and proposed fix:
Basically in the original code when the src is drained you still have some stuff pending in the state atom to decode and pass on to dst. So it calls f one last time with empty list and f will decode and send on to dst whatever was remaining in state, but then also it's calling close on dst maybe before f has written to it.
So in this fix we have two cases, one where there isn't anything left in the state atom, in that case we just close dst.
The other case I just copy pasted from f probably not the cleanest way to write it. But it's basically f except instead of calling put to write the decoded data to dst, it calls s/connect to stream the remaining decoded data to dst, and in this case we don't call close on dst. Manifold will close dst automatically after it's done writing.

@KingMob
Copy link
Collaborator

KingMob commented Jan 25, 2023

This looks plausible. Let me try it out.

@KingMob
Copy link
Collaborator

KingMob commented Jan 25, 2023

I extracted it to a shared fn like:

(defn- decode-cleanup
  "When a decoding stream is drained, this handles properly flushing out the remaining
   bytes from intermediate streams."
  [src state-ref dst]
  (let [state @state-ref]
    (binding [complete? (s/drained? src)]
      (let [[leftover _ _] (decode-byte-sequence
                             (:codecs state)
                             (:bytes state))]
        (if leftover
          (s/connect leftover dst)
          (s/close! dst))))))

and used it like (s/on-drained src #(decode-cleanup src state-ref dst))

however, the proposed test from #53 still fails. Your proposed change is not calling (f []) when there's no leftovers, but even if I add that to the fn like so:

(defn- decode-cleanup
  "When a decoding stream is drained, this handles properly flushing out the remaining
   bytes from intermediate streams."
  [src dst state-ref f]
  (let [state @state-ref]
    (binding [complete? (s/drained? src)]
      (let [[leftover _ _] (decode-byte-sequence
                             (:codecs state)
                             (:bytes state))]
        (if leftover
          (s/connect leftover dst)
          (do
            (f [])
            (s/close! dst)))))))

it's still failing, probably because dst is being closed before the intermediate streams have processed the []. I tried derefing the deferred returned by (f []) but that didn't work either.

I think this is a good first attempt, though!

@bo-tato
Copy link
Contributor Author

bo-tato commented Jan 29, 2023

hm this is tricky, I think my idea was off track again. The reason it's failing that test is that sometimes instead of doing:
1 2 3 4 5 6 7 8 9 []
it does:
1 2 3 4 5 6 7 8 [] 9
the on-drained event can fire before it's written the last 9 to the stream, my idea just addressed the problem of if it calls close! before the last call to put!, but still has the problem of the on-drained event fires and writing leftover/[] before it's written the last 9

I have a new idea that's kind of ugly but it is passing the test from #53 and all the existing tests:
change:
(let [src (s/->source src)...
to
(let [src (s/concat [(s/->source src) (s/->source [[]])])...
rather than trying calling (f []) after src is drained, we make a new src that is the original with a [] on the end,
then we can just do:
(s/connect-via src f dst)
without the {:downstream? false} and manifold takes care of closing dst

@KingMob
Copy link
Collaborator

KingMob commented Jan 30, 2023

Hmm, I hadn't considered that... have you proven that the 9 is coming after the []?

I think your idea is intriguing, can you make a PR so I can try it out? (Please include the test from #53)

@bo-tato
Copy link
Contributor Author

bo-tato commented Jan 30, 2023

have you proven that the 9 is coming after the []?

I added (println s) in f before (s/put-all! dst s) but I just realized writing to standard out can also have race conditions on it's ordering, so I changed it to (tap> s) and I view the tap in flowstorm debugger and it shows 8 then [] then 9 when the test is failing

can you make a PR so I can try it out?

ok just made PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants