Skip to content

Commit

Permalink
Reorganize system tests for CAN (mdabrowski1990#290)
Browse files Browse the repository at this point in the history
- abstraction layer for system tests defined for easier reuse
- system tests grouped in test classes
- fixed issue with default value of N_Bs timeout always being used
- added a method to CAN Addressing Information class get the other's end Addressing Information
  • Loading branch information
mdabrowski1990 authored Oct 29, 2024
1 parent 8a42282 commit f42d5a5
Show file tree
Hide file tree
Showing 16 changed files with 2,779 additions and 1,978 deletions.
7 changes: 1 addition & 6 deletions examples/can/python-can/kvaser/send_and_receive_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ def main():
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=ai_receive.addressing_format,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
ai_send = ai_receive.get_other_end()

# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ async def main():
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=ai_receive.addressing_format,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
ai_send = ai_receive.get_other_end()

# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
Expand Down
7 changes: 1 addition & 6 deletions examples/can/python-can/kvaser/send_and_receive_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ def main():
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=ai_receive.addressing_format,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
ai_send = ai_receive.get_other_end()

# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ async def main():
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
ai_send = ai_receive.get_other_end()

# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
Expand Down
15 changes: 13 additions & 2 deletions tests/software_tests/can/test_abstract_addressing_information.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from mock import Mock, patch
from mock import MagicMock, Mock, patch

from uds.can.abstract_addressing_information import AbstractCanAddressingInformation, AddressingType

Expand All @@ -15,7 +15,11 @@ def setup_method(self):
ADDRESSING_TYPE_NAME="addressing_type",
TARGET_ADDRESS_NAME="target_address",
SOURCE_ADDRESS_NAME="source_address",
ADDRESS_EXTENSION_NAME="address_extension")
ADDRESS_EXTENSION_NAME="address_extension",
tx_packets_physical_ai=MagicMock(),
rx_packets_physical_ai=MagicMock(),
tx_packets_functional_ai=MagicMock(),
rx_packets_functional_ai=MagicMock())
# patching
self._patcher_deepcopy = patch(f"{SCRIPT_LOCATION}.deepcopy")
self.mock_deepcopy = self._patcher_deepcopy.start()
Expand Down Expand Up @@ -108,3 +112,10 @@ def test_tx_packets_functional_ai__set(self, value):
self.mock_addressing_information.validate_packet_ai(addressing_type=AddressingType.FUNCTIONAL, **value)
assert self.mock_addressing_information._AbstractCanAddressingInformation__tx_packets_functional_ai \
== self.mock_addressing_information.validate_packet_ai.return_value

# get_other_end

def test_get_other_end(self):
assert (AbstractCanAddressingInformation.get_other_end(self.mock_addressing_information)
== self.mock_deepcopy.return_value)
self.mock_deepcopy.assert_called_once_with(self.mock_addressing_information)
7 changes: 6 additions & 1 deletion tests/software_tests/can/test_addressing_information.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,12 @@ class TestCanAddressingInformationIntegration:
"source_address": 0xFF,
"address_extension": 0xA1}}),
])
def test_new(self, input_params, expected_attributes):
def test_new_and_other_end(self, input_params, expected_attributes):
ai = CanAddressingInformation(**input_params)
for attr_name, attr_value in expected_attributes.items():
assert getattr(ai, attr_name) == attr_value
ai_other_end = ai.get_other_end()
assert ai.rx_packets_physical_ai == ai_other_end.tx_packets_physical_ai
assert ai.tx_packets_physical_ai == ai_other_end.rx_packets_physical_ai
assert ai.rx_packets_functional_ai == ai_other_end.tx_packets_functional_ai
assert ai.tx_packets_functional_ai == ai_other_end.rx_packets_functional_ai
32 changes: 16 additions & 16 deletions tests/software_tests/transport_interface/can/test_python_can.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ def test_send_message__multiple_packets__st_min__block_size_0(self, message, st_
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.receive_packet.assert_called_once_with(
timeout=self.mock_can_transport_interface.N_BS_TIMEOUT)
timeout=self.mock_can_transport_interface.n_bs_timeout)
self.mock_can_transport_interface._send_cf_packets_block.assert_called_once_with(
cf_packets_block=mock_segmented_message[1:],
delay=self.mock_can_st_min_handler.decode.return_value)
Expand Down Expand Up @@ -1207,7 +1207,7 @@ def test_send_message__multiple_packets__n_cs__block_size_1(self, message, n_cs,
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.receive_packet.assert_has_calls([
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT) for _ in mock_segmented_message[1:]
call(timeout=self.mock_can_transport_interface.n_bs_timeout) for _ in mock_segmented_message[1:]
])
self.mock_can_transport_interface._send_cf_packets_block.assert_has_calls([
call(cf_packets_block=[packet], delay=n_cs) for packet in mock_segmented_message[1:]
Expand Down Expand Up @@ -1245,8 +1245,8 @@ def test_send_message__multiple_packets__wait(self, message):
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.receive_packet.assert_has_calls([
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT),
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT)
call(timeout=self.mock_can_transport_interface.n_bs_timeout),
call(timeout=self.mock_can_transport_interface.n_bs_timeout)
])
self.mock_uds_message_record.assert_called_once_with([
self.mock_can_transport_interface.send_packet.return_value,
Expand Down Expand Up @@ -1282,8 +1282,8 @@ def test_send_message__multiple_packets__unexpected_packet(self, message):
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.receive_packet.assert_has_calls([
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT),
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT)
call(timeout=self.mock_can_transport_interface.n_bs_timeout),
call(timeout=self.mock_can_transport_interface.n_bs_timeout)
])
self.mock_uds_message_record.assert_called_once_with([
self.mock_can_transport_interface.send_packet.return_value,
Expand Down Expand Up @@ -1311,7 +1311,7 @@ def test_send_message__multiple_packets__overflow(self, message):
PyCanTransportInterface.send_message(self.mock_can_transport_interface, message)
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.receive_packet.assert_called_once_with(
timeout=self.mock_can_transport_interface.N_BS_TIMEOUT)
timeout=self.mock_can_transport_interface.n_bs_timeout)
self.mock_warn.assert_not_called()
self.mock_can_transport_interface._update_n_bs_measured.assert_not_called()

Expand All @@ -1332,7 +1332,7 @@ def test_send_message__multiple_packets__unknown_flow_status(self, message):
PyCanTransportInterface.send_message(self.mock_can_transport_interface, message)
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.receive_packet.assert_called_once_with(
timeout=self.mock_can_transport_interface.N_BS_TIMEOUT)
timeout=self.mock_can_transport_interface.n_bs_timeout)
self.mock_warn.assert_not_called()
self.mock_can_transport_interface._update_n_bs_measured.assert_not_called()

Expand Down Expand Up @@ -1379,7 +1379,7 @@ async def test_async_send_message__multiple_packets__st_min__block_size_0(self,
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.async_receive_packet.assert_called_once_with(
timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None)
timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None)
self.mock_can_transport_interface._async_send_cf_packets_block.assert_called_once_with(
cf_packets_block=mock_segmented_message[1:],
delay=self.mock_can_st_min_handler.decode.return_value,
Expand Down Expand Up @@ -1421,7 +1421,7 @@ async def test_async_send_message__multiple_packets__n_cs__block_size_1(self, me
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.async_receive_packet.assert_has_calls([
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None) for _ in mock_segmented_message[1:]
call(timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None) for _ in mock_segmented_message[1:]
])
self.mock_can_transport_interface._async_send_cf_packets_block.assert_has_calls([
call(cf_packets_block=[packet], delay=n_cs, loop=None) for packet in mock_segmented_message[1:]
Expand Down Expand Up @@ -1460,8 +1460,8 @@ async def test_async_send_message__multiple_packets__wait(self, message):
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.async_receive_packet.assert_has_calls([
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None),
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None)
call(timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None),
call(timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None)
])
self.mock_uds_message_record.assert_called_once_with([
self.mock_can_transport_interface.async_send_packet.return_value,
Expand Down Expand Up @@ -1497,8 +1497,8 @@ async def test_async_send_message__multiple_packets__unexpected_packet(self, mes
self.mock_can_transport_interface, message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.async_receive_packet.assert_has_calls([
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None),
call(timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None)
call(timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None),
call(timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None)
])
self.mock_uds_message_record.assert_called_once_with([
self.mock_can_transport_interface.async_send_packet.return_value,
Expand Down Expand Up @@ -1528,7 +1528,7 @@ async def test_async_send_message__multiple_packets__overflow(self, message):
await PyCanTransportInterface.async_send_message(self.mock_can_transport_interface, message)
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.async_receive_packet.assert_called_once_with(
timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None)
timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None)
self.mock_warn.assert_not_called()
self.mock_can_transport_interface._update_n_bs_measured.assert_not_called()

Expand All @@ -1551,7 +1551,7 @@ async def test_async_send_message__multiple_packets__unknown_flow_status(self, m
await PyCanTransportInterface.async_send_message(self.mock_can_transport_interface, message)
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.async_receive_packet.assert_called_once_with(
timeout=self.mock_can_transport_interface.N_BS_TIMEOUT, loop=None)
timeout=self.mock_can_transport_interface.n_bs_timeout, loop=None)
self.mock_warn.assert_not_called()
self.mock_can_transport_interface._update_n_bs_measured.assert_not_called()

Expand Down
19 changes: 19 additions & 0 deletions tests/system_tests/transport_interface/can/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod

from uds.utilities import TimeMillisecondsAlias


class AbstractCanTests(ABC):
"""Abstract class for system tests (with hardware) for Diagnostic over CAN (DoCAN)."""

TASK_TIMING_TOLERANCE: TimeMillisecondsAlias = 30. # ms
DELAY_AFTER_RECEIVING_FRAME: TimeMillisecondsAlias # ms
DELAY_AFTER_RECEIVING_MESSAGE: TimeMillisecondsAlias # ms

@abstractmethod
def setup_class(self):
"""Configure CAN bus connections."""

@abstractmethod
def teardown_class(self):
"""Safely close CAN bus connections."""
Empty file.
Loading

0 comments on commit f42d5a5

Please sign in to comment.