From e73414d9a945ff53229220ba3d428cd3a943eb5c Mon Sep 17 00:00:00 2001 From: shaohuzhang1 <80892890+shaohuzhang1@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:05:27 +0800 Subject: [PATCH] perf: Optimize workflow logic (#1996) (cherry picked from commit 3e327d52d6b83cde854c0f7554e412ccfa8127f2) --- apps/application/flow/workflow_manage.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/application/flow/workflow_manage.py b/apps/application/flow/workflow_manage.py index 9cf5a625a29..5bc31b4188f 100644 --- a/apps/application/flow/workflow_manage.py +++ b/apps/application/flow/workflow_manage.py @@ -342,15 +342,19 @@ def run_stream(self, current_node, node_result_future): self.run_chain_async(current_node, node_result_future) return tools.to_stream_response_simple(self.await_result()) - def is_run(self, timeout=0.1): - self.lock.acquire() + def is_run(self, timeout=0.5): + future_list_len = len(self.future_list) try: r = concurrent.futures.wait(self.future_list, timeout) - return len(r.not_done) > 0 + if len(r.not_done) > 0: + return True + else: + if future_list_len == len(self.future_list): + return False + else: + return True except Exception as e: return True - finally: - self.lock.release() def await_result(self): try: @@ -403,12 +407,8 @@ def run_chain_manage(self, current_node, node_result_future): # 获取到可执行的子节点 result_list = [{'node': node, 'future': executor.submit(self.run_chain_manage, node, None)} for node in sorted_node_run_list] - try: - self.lock.acquire() - for r in result_list: - self.future_list.append(r.get('future')) - finally: - self.lock.release() + for r in result_list: + self.future_list.append(r.get('future')) def run_chain(self, current_node, node_result_future=None): if node_result_future is None: