Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent results with deferred/onto #151

Open
kingpong opened this issue Feb 21, 2018 · 28 comments
Open

Inconsistent results with deferred/onto #151

kingpong opened this issue Feb 21, 2018 · 28 comments

Comments

@kingpong
Copy link

kingpong commented Feb 21, 2018

I'm trying to ensure that my chained callbacks are always run on a specific thread pool, so I'm using d/onto followed by d/chain to set up the callbacks. What I'm seeing, though, is that sometimes the callbacks will be run directly on the derefing thread. I'm able to reproduce this behavior reliably (clojure 1.8, Java 1.8, manifold 0.1.6).

(require '[manifold.deferred :as d])
(import '(java.util.concurrent ForkJoinPool))

(defn tid []
  (let [t (Thread/currentThread)]
    (format "Thread %d: %s" (.getId t) (.getName t))))

(defn chained-onto-threads
  "Tries n times to run f (which should produce a deferred) then chains the
   result into a callback function that should run on the ForkJoinPool.
   Returns a map containing the frequencies of threads that were actually used."
  [n f]
  (let [forkjoin-pool (ForkJoinPool/commonPool)
        seen-threads  (atom {})]
    (run! deref
          (for [i (range n)]
            (-> (f)
                (d/onto forkjoin-pool)
                (d/chain
                  (fn [_]
                    ;; Expecting this to be ForkJoinPool, but isn't always.
                    (swap! seen-threads update (tid) #(inc (or % 0))))))))
    @seen-threads))

When I run chained-onto-threads with an f that produces a manifold future, sometimes the chained function runs on an nREPL thread, not a ForkJoinPool thread as expected:

user=> (require '[clojure.pprint :as pp])
nil
user=> (pp/pprint (chained-onto-threads 1000 #(d/future "any value")))
{"Thread 33: nREPL-worker-3" 7,
 "Thread 35: ForkJoinPool.commonPool-worker-0" 142,
 "Thread 37: ForkJoinPool.commonPool-worker-1" 123,
 "Thread 38: ForkJoinPool.commonPool-worker-2" 160,
 "Thread 39: ForkJoinPool.commonPool-worker-3" 144,
 "Thread 40: ForkJoinPool.commonPool-worker-4" 143,
 "Thread 43: ForkJoinPool.commonPool-worker-5" 163,
 "Thread 45: ForkJoinPool.commonPool-worker-6" 118}

With an f that produces a success-deferred, the chained function always runs on an nREPL thread, not on the executor that was specified:

user=>   (pp/pprint
  #_=>     (chained-onto-threads 1000 #(d/success-deferred "any value")))
{"Thread 15: nREPL-worker-1" 1000}

Can you shed any light on what's happening? Am I doing something wrong or is this a bug?

@dm3
Copy link
Contributor

dm3 commented Feb 22, 2018

I'd say it's an inconsistency. This stems from

https://github.com/ztellman/manifold/blob/193f5f48972c8e20dd0a9fc41a1311566a9f7bdd/src/manifold/deferred.clj#L378 - when state of deferred is realized, the listener execution happens immediately on the thread which attaches the listener.

@kingpong
Copy link
Author

Here's a more practical example of why this is a problem:

  (let [d  (-> 100
               (d/success-deferred)
               (d/onto (ForkJoinPool/commonPool)))
        t0 (System/currentTimeMillis)
        _  (d/chain d #(Thread/sleep %))  ;; want to run in parallel with ↓
        _  (d/chain d #(Thread/sleep %))  ;; want to run in parallel with ↑
        t1 (System/currentTimeMillis)]
    (println "Chaining took" (- t1 t0) "ms"))

; ==> Chaining took 209 ms

I would have expected the calls to d/chain to be async, and the sleeps to run in parallel, on the ForkJoinPool. But instead they run in sequence and block each other.

It's also a problem if the code in the chained callbacks requires thread-local context that is provided by the executor, for example MDC.

@kingpong
Copy link
Author

@dm3 since I've explicitly set the executor with (d/onto ...), doesn't the anonymous function you mention run on the executor (line 384)?
https://github.com/ztellman/manifold/blob/eabe63e42af4ee9454751ad272fb418701e90f53/src/manifold/deferred.clj#L376-L385

@ztellman
Copy link
Collaborator

As @dm3 said, if you chain on a deferred which is already realized, it executes immediately and does not register any sort of callback. Since you are simply using (d/future "a value") without sleeping or anything, at least some of the time the future has executed before being passed into d/chain.

@kingpong
Copy link
Author

@ztellman Is this the desired behavior, though? In my second example, code that I would expect not to block at all actually takes 200ms. Imagine that this is happening in a UI thread, and I want to ensure that the callbacks, which may block, are run in a separate thread to avoid hanging the UI. The initial deferred came from a library function, and it may (or may not) be realized by the time it gets to me.

@ztellman
Copy link
Collaborator

It's a fair question. This code is effectively using deferreds as a way to convey blocking work onto another thread, which I would accomplish with d/future-with, but I can see why you might expect that would be done for you. Let me give it some thought.

@kingpong
Copy link
Author

Thanks for considering. The actual problem that is biting me is that the code in my callbacks requires thread-local context that is automatically provided by the executor I specified, and the thread-local context isn't always there. I think the unexpected blocking case is more compelling to most users of the library, though.

@ztellman
Copy link
Collaborator

ztellman commented Feb 23, 2018 via email

@mping
Copy link

mping commented May 6, 2020

@ztellman any update on this?

@KingMob KingMob added bug Confirmed bug and removed bug Confirmed bug labels Jul 4, 2021
@ferdinand-beyer
Copy link

I've stumbled over this as well, trying to use an executor to set up a thread-local context.

On my machine, this snippet constantly yields the REPL thread, so the executor passed to onto is essentially ignored:

(-> (d/future 1)
    (d/onto (executor/fixed-thread-executor 2))
    (d/chain (fn [_] (.getName (Thread/currentThread)))))

This means that the onto examples in the documentation are mostly wrong (probably depending on the computer).

I guess this is not really an issue if all one cares about is fast execution, but this means that one cannot use custom executors for other purposes.

@KingMob -- if you decide this is not a bug, maybe we should document this, stating that there is no guarantee? The current docstring Returns a deferred whose callbacks will be run on executor. is definitely misleading.

@arnaudgeiser
Copy link
Collaborator

Yes, if the deferred is already realized, it will continue on the same thread and the specified Executor (onto) won't be used.
Here is a bit of context on Aleph regarding this behaviour (I'm not saying the documentation cannot be improved).

clj-commons/aleph#427 (comment)

@arnaudgeiser
Copy link
Collaborator

arnaudgeiser commented Feb 21, 2023

(-> (d/future 1)
    (d/onto (e/fixed-thread-executor 2))
    (d/chain (fn [_] (prn (.getName (Thread/currentThread))))))

=> "nREPL-session-8ba7d6cf-90f9-428b-9505-bba48850ff05"
(or sometimes if unlucky) => "manifold-execute-3"
(-> (d/future (Thread/sleep 100) 1)
    (d/onto (e/fixed-thread-executor 2))
    (d/chain (fn [_] (prn (.getName (Thread/currentThread))))))

=> "manifold-pool-37-1"

@ferdinand-beyer
Copy link

Yep, tried the same with Thread/sleep.

I'm not saying this is necessarily wrong behaviour, but certainly surprising and misleading. I lost two days trying to use custom executors to trace my async program. :)

@KingMob
Copy link
Collaborator

KingMob commented Feb 22, 2023

@ferdinand-beyer

At a bare minimum, we should improve the docs, since someone asks this question at least twice a year.

Would you be willing to contribute a PR improving them? For both the docstrings and the markdown files? Can you document the current behavior of chain, add a caveat to onto, and mention using future-with if you want to guarantee everything will run on a custom executor? (Unfortunately, we don't have access to aleph.io, so we can't update it, which is why we've removed all references to it.)

I'm not in love with the current state of things, but it's risky to try and make it more consistent.

Let's ignore the discussion in clj-commons/aleph#427 about trying to improve performance by detecting when you wish to schedule on the same executor, and consider only whether we can safely alter the chain fns to always schedule the callback on an executor. The immediate issues I see are:

  1. May need to use bound-fn again to avoid callback breakage when relying on dynamic vars, though to be fair, people should be binding their own fns in that case
  2. Using bound-fn, while correct, would be slower, sometimes by a lot, as Arnaud has found.
  3. Executor overhead will be slower

I think we can skip using bound-fn ourselves, but we need to measure the executor overhead before we can implement this. Anyone with a lot of chained fns may see their CPU usage go up. Hopefully, if we set the executor early on in the chain, and rely on a nil executor for later chained fns to mean "continue on this thread", it should be ok.

Thoughts? Anyone have an Aleph server in a staging env they can volunteer as a guinea pig?

@ferdinand-beyer
Copy link

Would you be willing to contribute a PR improving them? For both the docstrings and the markdown files?

Sure, happy to give this a try!

Thoughts? Anyone have an Aleph server in a staging env they can volunteer as a guinea pig?

Unfortunately I cannot volunteer as a guinea pig right now. In general, I think it is a good idea to not use bound-fn but require users to use it when they need it. Pay for what you need.

@arnaudgeiser
Copy link
Collaborator

Let's ignore the discussion in clj-commons/aleph#427 about trying to improve performance by detecting when you wish to schedule on the same executor, and consider only whether we can safely alter the chain fns to always schedule the callback on an executor. The immediate issues I see are:

I don't think it's a good idea... It means that every deferred will be rescheduled on the Executor using Aleph [1], right?
It might impact the performance of some workloads in a bad way.

[1] : https://github.com/exoscale/aleph/blob/master/src/aleph/http/server.clj#L595-L598

@KingMob
Copy link
Collaborator

KingMob commented Feb 23, 2023

@arnaudgeiser I'm not clear how your linked code sample relates.

What I'm contemplating is this:

We alter the code in chain-/chain'- to stop invoking the callback immediately in (let [x'' (f x')] if the incoming deferred parameter is already realized. Instead, we make it use the same logic as the deferred implementations: if the executor is nil, run on the same thread, and if not, dispatch to the executor. Everywhere else in manifold already uses this logic; chain is the only major exception. (And catch/finally, which use chain).

In terms of performance, I don't think it will be as bad as we fear. For starters, it won't change the perf of any chained fn where the previous link in the chain is usually unrealized at call time, because those will already be scheduled on an executor anyway (or same thread if executor is nil). And for slower fns in a chain, the executor overhead is probably not that bad. This will mostly show up as an issue for tiny fns, but Aleph doesn't chain too many of those.

We can also mitigate any performance impact by using (onto d nil) or (future-with nil..., which will force the subsequent deferreds to run on the same thread. It's work, but it turns something hidden into something explicit. We can also combine tiny fns if needed.

FWIW, I took a stab at converting chain to see how it performs, and at least with a crude time lein test, there was no noticeable difference on my machine.


Regardless, I'm not convinced it'll be worth all the work in the end, as compared to just documenting the existing behavior.

One issue is, chained fns run on unwrapped values, so I had to change unwrap to also record the last seen executor. But it's not clear that's the right thing to do, since the chained fns aren't technically deferred listeners. It could also choose to use the current thread local executor value. I'm not sure the complication is worth it.

@arnaudgeiser
Copy link
Collaborator

@arnaudgeiser I'm not clear how your linked code sample relates.

Sorry for the delay.
It's not all fresh on my mind, but as soon as you create your executor with manifold.executor/thread-factory, you end up having an executor on your thread-local. [1]
Then, every deferred created with manifold.deferred/deferred will have this executor set [2].

Today, if the deferred is realized, you continue on the current thread regardless of the presence of that executor on the thread local. Which means you can blindly (-> d (d/chain .. ) (d/chain ...) (d/chain ...) for synchronous functions with almost no impact performance wise.
Tomorrow, every d/chain will start rescheduling that synchronous function on the pipeline. That's not something I really would like.

[1] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L69
[2] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L69

@arnaudgeiser
Copy link
Collaborator

On a side note, I was having a look at how @mpenet's auspex library implemented the manifold support for qbits.auspex/then when an executor is provided.

And it seems he just ignored the parameter and calls manifold.deferred/chain [1].
We cannot suggest him using d/onto since it won't guarantee the deferred will be scheduled on the provided executor. So I'm wondering what is the recommended way to schedule a realized deferred on a specific executor?

future-with was mentioned, but it's not doing what some would expect there as everything is now running on the provided executor.

(-> (d/future 1)
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread))))) ;; manifold-pool-1-1
    (->> (d/future-with (manifold.executor/fixed-thread-executor 2)))
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))) ;; manifold-pool-1-1

How the following (non-working code) should be expressed to make the second
chain running on the provided executor?

(-> (d/future 1)
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread))))) ;; current-thread
    (d/onto (manifold.executor/fixed-thread-executor 2))
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))) ;; manifold-pool-1-1

[1] : https://github.com/mpenet/auspex/blob/master/src/qbits/auspex/manifold.clj#L55-L56

@ztellman
Copy link
Collaborator

ztellman commented Feb 28, 2023

Hi, just to weigh in on this, the behavior in Manifold is shaped by two considerations:

  • for performance reasons, we don't want to have each callback be re-enqueued onto some thread-pool if it can be invoked immediately
  • we want to retain some control on where computation is occurring

I don't think the current behavior properly balances these two considerations, for the reasons already demonstrated in this thread. Luckily, I think the behavior here can be pretty easily addressed by some minor changes to the success-error-unrealized macro (https://github.com/clj-commons/manifold/blob/master/src/manifold/deferred.clj#L62), such that if the deferred in question has an affinity to a thread pool and the macro isn't being executed within that thread pool, then you simply call the unrealized clause rather than executing the callback inline. I should note that it's been quite some time since I last messed around in this codebase, so it's entirely possible there's some issue I'm overlooking, but the point of the macro was to have there be a single place that deals with these sorts of "immediate execution" semantics, so I'm optimistic.

@arnaudgeiser
Copy link
Collaborator

arnaudgeiser commented Mar 1, 2023

Hello Zach (and welcome back)!
Thanks for the insights, those are precious.

What you propose kind of match what Matthew had in mind initially when we were talking about this on Aleph [1] :

  1. Select an executor
    a. If an executor is specified on the deferred, choose that.
    b. Otherwise, if an executor is set on the executor threadlocal, choose that.
  2. If the deferred is not realized, schedule the callback on the executor
  3. If the deferred IS realized:
    a. Execute immediately on the current thread if it belongs to the selected executor
    b. Otherwise, schedule the callback to run on the executor

Considering that selected executor === thread pool, what exists from a Java API to know the membership of a java.lang.Thread to an java.util.concurrent.ExecutorService?
java.lang.Thread/getThreadGroup [2] exists but how does it correlate to an ExecutorService?

If this can be easily addressed on the success-error-unrealized macro with minor modifications, I would definitely give it a shot.

[1] : clj-commons/aleph#427 (comment)
[2] : https://docs.oracle.com/en/java/javase/17/docs/api/index.html#getThreadGroup()

@KingMob
Copy link
Collaborator

KingMob commented Mar 1, 2023

Zach, thanks for chiming in! You're always welcome here.

I think the proposed change of checking the executor and continuing to run in the current thread if it's already in that executor's pool is good, but in the past, we've always held off out of caution, and uncertainty if the benefits are worth it. Is anything different?

To recap what it will entail:

The first major change would be how with-executor behaves. Currently, it (inefficiently) forces each fn in a chain to be submitted to the executor instead of running them all in one thread. This is slower, but maybe someone's code relies on it, even if unintentionally. @alexander-yakushev alluded to cases where this is undesirable, but the conversation went elsewhere before we got his example.

The next major change is that callbacks on already-realized deferreds would switch from running on the current thread (no matter where it is) to only running on the current thread if it's in the same pool (or nil). This has always been unexpected behavior, and someone complains about it at least twice a year.

Generally, I would be most happy if people's first guess as to which pools things are running on is correct. I would love to say "If you specify a pool, it will always run on that pool. If you're already in that pool, it will continue on the current thread. If you need to submit a job to the pool you're already on, use future/future-with/$SOME_ESCAPE_HATCH"


@arnaudgeiser FYI, @alexander-yakushev suggested the following for determining the executor in clj-commons/aleph#427 (comment):

I suppose, if you compare Thread.currentThread().getThreadGroup() to the executor's ThreadGroup from its ThreadFactory, you can avoid redundant rescheduling in Manifold itself, and thus keep with-executor as is.

I didn't see the APIs required to do this with Netty pools, but it should work just fine for determining if a thread is in one of our pools.

@arnaudgeiser
Copy link
Collaborator

arnaudgeiser commented Mar 1, 2023

I suppose, if you compare Thread.currentThread().getThreadGroup() to the executor's ThreadGroup from its ThreadFactory, you can avoid redundant rescheduling in Manifold itself, and thus keep with-executor as is.

This is where I'm a bit confused.
For starters, the threads we are creating don't have any ThreadGroup attached to them [1]. So I guess it will require modifications, even on our own pools. This is the same for all Executors created directly from the JDK [2]. Manifold users might not want to use the Dirigiste Executors but plain JDK Executors.

Regarding :

to the executor's ThreadGroup from its ThreadFactory

You cannot get the ThreadFactory from an ExecutorService, but from a ThreadPoolExecutor [3]. Dirigiste executor doesn't give you access to its internal ThreadFactory [4].
Even so... having access to the ThreadFactory is not giving us access to the ThreadGroup the threads are created with.

Pretty sure I'm missing the whole thing there. I understand what we are trying to achieve but not how.
If you can shed some light, that would be great.

I was initially thinking about that slight change on success-error-unrealized :

(if (inside-executor? (.executor d))
   ~success-clause
   ~unrealized-clause)

EDIT :

We alter the code in chain-/chain'- to stop invoking the callback immediately in (let [x'' (f x')] if the incoming deferred parameter is already realized.

I tend to concur, this is where the decision is took to call f synchronously.
It seems that success-error-unrealized is only called on a special arity function of chain'-.
If the thread factories from manifold were not setting that thread-local executor, it will be simpler....

[1] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L71
[2] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/Executors.java#L687-L689
[3] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1519
[4] : https://github.com/clj-commons/dirigiste/blob/master/src/io/aleph/dirigiste/Executor.java#L121

@KingMob
Copy link
Collaborator

KingMob commented Mar 2, 2023 via email

@ztellman
Copy link
Collaborator

ztellman commented Mar 2, 2023

I stand corrected on success-error-unrealized being the thing we'd need to change. As already stated elsewhere, we'd need to add an additional check to the various arities of chain- and chain'-. This means the change will be a bit more verbose (sorry), but otherwise pretty straightforward.

Having taken a longer look at everything, here's what I believe needs to change:

  • add a second thread-local to manifold.executor which isn't contingent on onto? being true in the executor constructor
  • add a mechanism in manifold.executor for wrapping a preexisting executor such that submitted Runnables are wrapped with thread-local handlers (ideally this will be idempotent and not double-wrap executors like instrumented-executor which already has the necessary stuff)
  • make the necessary changes to check (identical? (manifold.deferred/executor d) (.get manifold.executor/second-thread-local)) before directly invoking functions
  • in Aleph, make the various client/server constructors wrap the provided executor

Is there anything I'm overlooking?

@KingMob
Copy link
Collaborator

KingMob commented Mar 4, 2023

Zach, IIUC, you're suggesting the second thread local contain the executor the thread is actually running in, and that it will never change (Which is unlike how the instrumented-executor's :onto? param can set the first thread local, and d/onto changes it). Thus, the first thread local will represent what executor we want new jobs/threads/deferreds to run on, and the second what it's actually on.

Let's call it something like current-executor.

We'll wrap non-Manifold/dirigiste executors so their .execute fns set current-executor.

Then, we need to check the code to find out where it'll have to change. A search for calls to .execute turns up these likely candidates:

  • success-error-unrealized
  • set-deferred
  • addListener
  • onRealized in SuccessDeferred and ErrorDeferred

I think the future-with, wait-for, and go-off-with macros should always execute a new job, even on the same executor, so they should be left alone.

If we change the above locations, I don't think we also have to fool with the chain* fns, since they bottom out in calls to on-realized, whose bodies will all be handled by one of the above.

Is this a good summary?

@arnaudgeiser
Copy link
Collaborator

arnaudgeiser commented Mar 4, 2023

I think we are overlooking people using Aleph/Manifold/Dirigiste might also use others kind of asynchronous clients which use JDK Executors. The behavior of the Deferreds will then depend on which world the threads have been created from, which will raise even more questions related to this.

I'm scared about the following question:

  • When I use d/onto here, it's working as expected, but two lines below, after my HTTP/Cassandra/GRPC call, I use the exact same pattern and the second call is not rescheduled on that specific Executor. Why?

This kind of behavior is something you can experience today using Aleph. If you ever decide to change the Dirigiste Executor (the thread-factory to be precise) [1] with a JDK Executor, you will lose the onto [2] and thus the automatic rescheduling on that Executor (if not realized already). You will have a code that behaves completely differently and you might be in cases where you will lock some Threads from your various application Executors (Netty EventLoopGroup, tiny FixedThreadPool) you don't want to block...

And then... you realize that some defaults of Aleph is actually preventing you from shooting yourself in the foot.

(def manifold-executor (ex/fixed-thread-executor 4 {:onto? true}))

(.submit manifold-executor
         #(-> (d/future (Thread/sleep 100))
              (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))))

;; => "manifold-pool-13-2"

(def executor (java.util.concurrent.Executors/newFixedThreadPool 4))

(.submit executor
         #(-> (d/future (Thread/sleep 100))
              (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))))

;; => "manifold-execute-9"

Do we really want more of this?

Anyway, just wanted to share my concerns. But implementation wise, what you propose will probably work the way we expect.


I would propose to explore another solution to not have this split between the Manifold/JDK world.
We do not change anything but we introduce a manifold.deferred/chain-with function.

(defn chain-with
  ([x executor f]
   ...)
  ([x executor f g]
   ...))
  ([x executor f g & fs]
   ...))

And then, we "just" force the application of f on that executor as such :

(-> (d/future 1)
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread))))) ;; current-thread
    (d/chain-with' (manifold.executor/fixed-thread-executor 2) (fn [_] (prn (.getName (Thread/currentThread)))))) ;; manifold-pool-1-1

With that, chain stays :

  • Select an executor
    a. If an executor is specified on the deferred, choose that.
    b. If an executor is set on the executor threadlocal, choose that.
    c. Otherwise, ignore the executor
  • If the deferred is not realized, schedule the callback on the executor (or continue on the thread that realized the deferred if no executor defined)
  • If the deferred IS realized execute immediately on the current thread

chain-with comes with :

  • Schedule the callback on the provided executor regardless the state of the deferred (realized or not)

And onto is still doing the same :

  • Schedule the callback on the provided executor if the deferred is not realized

I will be happier with this proposition.
Thoughts?

[1] : https://github.com/clj-commons/aleph/blob/master/src/aleph/http/server.clj#L612
[2] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L69

@KingMob
Copy link
Collaborator

KingMob commented Mar 12, 2023

FYI, bit busy at the moment, I won't be able to get back to this until the following week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants