Skip to content

Commit

Permalink
perf: Optimize workflow logic (#1996)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3e327d5)
shaohuzhang1 committed Jan 8, 2025
1 parent c6e9a99 commit e73414d
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions apps/application/flow/workflow_manage.py
Original file line number Diff line number Diff line change
@@ -342,15 +342,19 @@ def run_stream(self, current_node, node_result_future):
self.run_chain_async(current_node, node_result_future)
return tools.to_stream_response_simple(self.await_result())

def is_run(self, timeout=0.1):
self.lock.acquire()
def is_run(self, timeout=0.5):
future_list_len = len(self.future_list)
try:
r = concurrent.futures.wait(self.future_list, timeout)
return len(r.not_done) > 0
if len(r.not_done) > 0:
return True
else:
if future_list_len == len(self.future_list):
return False
else:
return True
except Exception as e:
return True
finally:
self.lock.release()

def await_result(self):
try:
@@ -403,12 +407,8 @@ def run_chain_manage(self, current_node, node_result_future):
# 获取到可执行的子节点
result_list = [{'node': node, 'future': executor.submit(self.run_chain_manage, node, None)} for node in
sorted_node_run_list]
try:
self.lock.acquire()
for r in result_list:
self.future_list.append(r.get('future'))
finally:
self.lock.release()
for r in result_list:
self.future_list.append(r.get('future'))

def run_chain(self, current_node, node_result_future=None):
if node_result_future is None:

0 comments on commit e73414d

Please sign in to comment.