Skip to content

Commit

Permalink
Revise error handling to avoid duplicate failed validation messages (…
Browse files Browse the repository at this point in the history
…arising from retries when exceptions prevent message acknowledgement)
  • Loading branch information
ndebuhr committed Oct 18, 2022
1 parent 34a7a39 commit 41e382e
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions orchestration/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os
import re
import requests
Expand Down Expand Up @@ -295,34 +296,38 @@ def interpolate(self, input_message, output_message, regex_expressions):
def fail_interpolation(self):
if "interpolation failure message" in self.action.keys():
self.send_message(self.action["interpolation failure message"])
abort(400)
raise Exception


def callback(message):
orchestration = Orchestration(message)
orchestration.insert_post()
orchestration.confirmation()
if not orchestration.confirmed:
orchestration.get_action()
orchestration.send_confirmation()
elif orchestration.user_is_confirming():
orchestration.get_action()
if orchestration.is_gbash_async():
orchestration.send_gbash_async()
elif orchestration.is_gbash_sync():
orchestration.send_gbash_sync()
elif orchestration.is_link():
orchestration.send_link()
elif orchestration.is_repeater():
orchestration.send_repeater()
elif orchestration.is_kubebash():
orchestration.send_kubebash()
# Fallback is to defer to async task system
elif orchestration.is_async():
orchestration.send_task()
elif not orchestration.user_is_confirming():
orchestration.send_rejection()
message.ack()
try:
orchestration = Orchestration(message)
orchestration.insert_post()
orchestration.confirmation()
if not orchestration.confirmed:
orchestration.get_action()
orchestration.send_confirmation()
elif orchestration.user_is_confirming():
orchestration.get_action()
if orchestration.is_gbash_async():
orchestration.send_gbash_async()
elif orchestration.is_gbash_sync():
orchestration.send_gbash_sync()
elif orchestration.is_link():
orchestration.send_link()
elif orchestration.is_repeater():
orchestration.send_repeater()
elif orchestration.is_kubebash():
orchestration.send_kubebash()
# Fallback is to defer to async task system
elif orchestration.is_async():
orchestration.send_task()
elif not orchestration.user_is_confirming():
orchestration.send_rejection()
message.ack()
except Exception as e:
logging.error("Error at %s", "subscription", exc_info=e)
message.ack()


streaming_pull_future = subscriber.subscribe(
Expand Down

0 comments on commit 41e382e

Please sign in to comment.