Skip to content

Commit

Permalink
test(core): don't fetch DebugLinkState by default
Browse files Browse the repository at this point in the history
In case the main workflow is restarting after a `DebugLinkDecision`,
the next `DebugLinkGetState` handling becomes inherently racy.

I would suggest making the state fetching explicit,
in order to avoid the "restart" race condition.

[no changelog]
  • Loading branch information
romanz committed Feb 4, 2025
1 parent 9d1d062 commit 12790ee
Show file tree
Hide file tree
Showing 20 changed files with 269 additions and 326 deletions.
4 changes: 2 additions & 2 deletions core/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ test_emu_persistence_ui: ## run persistence tests with UI testing

test_emu_ui: ## run ui integration tests
$(EMU_TEST) $(PYTEST) $(TESTPATH)/device_tests $(TESTOPTS) \
--ui=test --ui-check-missing --record-text-layout --do-master-diff \
--ui=test --ui-check-missing --do-master-diff \
--lang=$(TEST_LANG)

test_emu_ui_multicore: ## run ui integration tests using multiple cores
$(PYTEST) -n $(MULTICORE) $(TESTPATH)/device_tests $(TESTOPTS) --timeout $(PYTEST_TIMEOUT) \
--ui=test --ui-check-missing --record-text-layout --do-master-diff \
--ui=test --ui-check-missing --do-master-diff \
--control-emulators --model=core --random-order-seed=$(RANDOM) \
--lang=$(TEST_LANG)

Expand Down
135 changes: 29 additions & 106 deletions python/src/trezorlib/debuglink.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class InputFunc(Protocol):
def __call__(
self,
hold_ms: int | None = None,
wait: bool | None = None,
) -> "LayoutContent": ...
) -> "None": ...

InputFlowType = Generator[None, messages.ButtonRequest, None]

Expand Down Expand Up @@ -416,11 +415,10 @@ def _make_input_func(
def input_func(
self: "DebugLink",
hold_ms: int | None = None,
wait: bool | None = None,
) -> LayoutContent:
) -> None:
__tracebackhide__ = True # for pytest # pylint: disable=W0612
decision.hold_ms = hold_ms
return self._decision(decision, wait=wait)
self._decision(decision)

return input_func # type: ignore [Parameter name mismatch]

Expand All @@ -442,12 +440,7 @@ def __init__(self, transport: "Transport", auto_interact: bool = True) -> None:
self.t1_screenshot_directory: Path | None = None
self.t1_screenshot_counter = 0

# Optional file for saving text representation of the screen
self.screen_text_file: Path | None = None
self.last_screen_content = ""

self.waiting_for_layout_change = False
self.layout_dirty = True

self.input_wait_type = DebugWaitType.IMMEDIATE

Expand Down Expand Up @@ -477,11 +470,6 @@ def layout_type(self) -> LayoutType:
assert self.model is not None
return LayoutType.from_model(self.model)

def set_screen_text_file(self, file_path: Path | None) -> None:
if file_path is not None:
file_path.write_bytes(b"")
self.screen_text_file = file_path

def open(self) -> None:
self.transport.begin_session()

Expand Down Expand Up @@ -543,8 +531,19 @@ def state(self, wait_type: DebugWaitType | None = None) -> messages.DebugLinkSta
raise TrezorFailure(result)
return result

def read_layout(self) -> LayoutContent:
return LayoutContent(self.state().tokens)
def read_layout(self, wait: bool | None = None) -> LayoutContent:
"""
Force waiting for the layout by setting `wait=True`. Force not waiting by
setting `wait=False` -- useful when, e.g., you are causing the next layout to be
deliberately delayed.
"""
if wait is True:
wait_type = DebugWaitType.CURRENT_LAYOUT
elif wait is False:
wait_type = DebugWaitType.IMMEDIATE
else:
wait_type = None
return LayoutContent(self.state(wait_type=wait_type).tokens)

def wait_layout(self, wait_for_external_change: bool = False) -> LayoutContent:
# Next layout change will be caused by external event
Expand All @@ -558,18 +557,12 @@ def wait_layout(self, wait_for_external_change: bool = False) -> LayoutContent:
obj = self._call(
messages.DebugLinkGetState(wait_layout=DebugWaitType.NEXT_LAYOUT)
)
self.layout_dirty = True
if isinstance(obj, messages.Failure):
raise TrezorFailure(obj)
return LayoutContent(obj.tokens)

@contextmanager
def wait_for_layout_change(self) -> Iterator[LayoutContent]:
# set up a dummy layout content object to be yielded
layout_content = LayoutContent(
["DUMMY CONTENT, WAIT UNTIL THE END OF THE BLOCK :("]
)

def wait_for_layout_change(self) -> Iterator[None]:
# make sure some current layout is up by issuing a dummy GetState
self.state()

Expand All @@ -579,18 +572,14 @@ def wait_for_layout_change(self) -> Iterator[LayoutContent]:
# allow the block to proceed
self.waiting_for_layout_change = True
try:
yield layout_content
yield
finally:
self.waiting_for_layout_change = False
self.layout_dirty = True

# wait for the reply
resp = self._read()
assert isinstance(resp, messages.DebugLinkState)

# replace contents of the yielded object with the new thing
layout_content.__init__(resp.tokens)

def reset_debug_events(self) -> None:
# Only supported on TT and above certain version
if (self.model is not models.T1B1) and not self.legacy_debug:
Expand Down Expand Up @@ -634,44 +623,20 @@ def read_reset_word(self) -> str:
state = self._call(messages.DebugLinkGetState(wait_word_list=True))
return state.reset_word

def _decision(
self, decision: messages.DebugLinkDecision, wait: bool | None = None
) -> LayoutContent:
"""Send a debuglink decision and returns the resulting layout.
def _decision(self, decision: messages.DebugLinkDecision) -> None:
"""Send a debuglink decision.
If hold_ms is set, an additional 200ms is added to account for processing
delays. (This is needed for hold-to-confirm to trigger reliably.)
If `wait` is unset, the following wait mode is used:
- `IMMEDIATE`, when in normal tests, which never deadlocks the device, but may
return an empty layout in case the next one didn't come up immediately. (E.g.,
in SignTx flow, the device is waiting for more TxRequest/TxAck exchanges
before showing the next UI layout.)
- `CURRENT_LAYOUT`, when in tests running through a `DeviceHandler`. This mode
returns the current layout or waits for some layout to come up if there is
none at the moment. The assumption is that wirelink is communicating on
another thread and won't be blocked by waiting on debuglink.
Force waiting for the layout by setting `wait=True`. Force not waiting by
setting `wait=False` -- useful when, e.g., you are causing the next layout to be
deliberately delayed.
"""
if not self.allow_interactions:
return self.wait_layout()
self.wait_layout()
return

if decision.hold_ms is not None:
decision.hold_ms += 200

self._write(decision)
self.layout_dirty = True
if wait is True:
wait_type = DebugWaitType.CURRENT_LAYOUT
elif wait is False:
wait_type = DebugWaitType.IMMEDIATE
else:
wait_type = self.input_wait_type
return self._snapshot_core(wait_type)

press_yes = _make_input_func(button=messages.DebugButton.YES)
"""Confirm current layout. See `_decision` for more details."""
Expand All @@ -698,58 +663,14 @@ def _decision(
)
"""Press right button. See `_decision` for more details."""

def input(self, word: str, wait: bool | None = None) -> LayoutContent:
def input(self, word: str) -> None:
"""Send text input to the device. See `_decision` for more details."""
return self._decision(messages.DebugLinkDecision(input=word), wait)
self._decision(messages.DebugLinkDecision(input=word))

def click(
self,
click: Tuple[int, int],
hold_ms: int | None = None,
wait: bool | None = None,
) -> LayoutContent:
def click(self, click: Tuple[int, int], hold_ms: int | None = None) -> None:
"""Send a click to the device. See `_decision` for more details."""
x, y = click
return self._decision(
messages.DebugLinkDecision(x=x, y=y, hold_ms=hold_ms), wait
)

def _snapshot_core(
self, wait_type: DebugWaitType = DebugWaitType.IMMEDIATE
) -> LayoutContent:
"""Save text and image content of the screen to relevant directories."""
# skip the snapshot if we are on T1
if self.model is models.T1B1:
return LayoutContent([])

# take the snapshot
state = self.state(wait_type)
layout = LayoutContent(state.tokens)

if state.tokens and self.layout_dirty:
# save it, unless we already did or unless it's empty
self.save_debug_screen(layout.visible_screen())
self.layout_dirty = False

# return the layout
return layout

def save_debug_screen(self, screen_content: str) -> None:
if self.screen_text_file is None:
return

if not self.screen_text_file.exists():
self.screen_text_file.write_bytes(b"")

# Not writing the same screen twice
if screen_content == self.last_screen_content:
return

self.last_screen_content = screen_content

with open(self.screen_text_file, "a") as f:
f.write(screen_content)
f.write("\n" + 80 * "/" + "\n")
self._decision(messages.DebugLinkDecision(x=x, y=y, hold_ms=hold_ms))

def stop(self) -> None:
self._write(messages.DebugLinkStop())
Expand Down Expand Up @@ -882,7 +803,9 @@ def _default_input_flow(self, br: messages.ButtonRequest) -> None:
# Paginating (going as further as possible) and pressing Yes
if br.pages is not None:
for _ in range(br.pages - 1):
self.debuglink.swipe_up(wait=True)
self.debuglink.swipe_up()
self.debuglink.state(DebugWaitType.CURRENT_LAYOUT)

if self.debuglink.model is models.T3T1:
layout = self.debuglink.read_layout()
if "PromptScreen" in layout.all_components():
Expand Down
29 changes: 17 additions & 12 deletions tests/click_tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ def get_char_category(char: str) -> PassphraseCategory:

def go_next(debug: "DebugLink") -> LayoutContent:
if debug.layout_type is LayoutType.Bolt:
return debug.click(buttons.OK)
debug.click(buttons.OK)
elif debug.layout_type is LayoutType.Caesar:
return debug.press_right()
debug.press_right()
elif debug.layout_type is LayoutType.Delizia:
return debug.swipe_up()
debug.swipe_up()
else:
raise RuntimeError("Unknown model")
return debug.read_layout()


def tap_to_confirm(debug: "DebugLink") -> LayoutContent:
Expand All @@ -64,21 +65,23 @@ def tap_to_confirm(debug: "DebugLink") -> LayoutContent:
elif debug.layout_type is LayoutType.Caesar:
return debug.read_layout()
elif debug.layout_type is LayoutType.Delizia:
return debug.click(buttons.TAP_TO_CONFIRM)
debug.click(buttons.TAP_TO_CONFIRM)
return debug.read_layout()
else:
raise RuntimeError("Unknown model")


def go_back(debug: "DebugLink", r_middle: bool = False) -> LayoutContent:
if debug.layout_type in (LayoutType.Bolt, LayoutType.Delizia):
return debug.click(buttons.CANCEL)
debug.click(buttons.CANCEL)
elif debug.layout_type is LayoutType.Caesar:
if r_middle:
return debug.press_middle()
debug.press_middle()
else:
return debug.press_left()
debug.press_left()
else:
raise RuntimeError("Unknown model")
return debug.read_layout()


def navigate_to_action_and_press(
Expand Down Expand Up @@ -108,13 +111,14 @@ def navigate_to_action_and_press(

if steps < 0:
for _ in range(-steps):
layout = debug.press_left()
debug.press_left()
else:
for _ in range(steps):
layout = debug.press_right()
debug.press_right()

# Press or hold
debug.press_middle(hold_ms=hold_ms)
debug.read_layout() # TODO: make sure the press above takes action


def _carousel_steps(current_index: int, wanted_index: int, length: int) -> int:
Expand All @@ -125,13 +129,14 @@ def _carousel_steps(current_index: int, wanted_index: int, length: int) -> int:

def unlock_gesture(debug: "DebugLink") -> LayoutContent:
if debug.layout_type is LayoutType.Bolt:
return debug.click(buttons.OK)
debug.click(buttons.OK)
elif debug.layout_type is LayoutType.Caesar:
return debug.press_right()
debug.press_right()
elif debug.layout_type is LayoutType.Delizia:
return debug.click(buttons.TAP_TO_CONFIRM)
debug.click(buttons.TAP_TO_CONFIRM)
else:
raise RuntimeError("Unknown model")
return debug.read_layout()


def _get_action_index(wanted_action: str, all_actions: AllActionsType) -> int:
Expand Down
Loading

0 comments on commit 12790ee

Please sign in to comment.