Skip to content

Commit

Permalink
Fix ruff suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
micahflee authored and legoktm committed May 23, 2024
1 parent 98f2669 commit 59155c6
Show file tree
Hide file tree
Showing 27 changed files with 114 additions and 146 deletions.
5 changes: 4 additions & 1 deletion dom0/remove-tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
Removes tags used for exempting VMs from default SecureDrop Workstation
RPC policies from all VMs (including non-SecureDrop ones).
"""

import sys

import qubesadmin

q = qubesadmin.Qubes()
Expand All @@ -21,7 +24,7 @@ def main():
except Exception as error:
print(f"Error removing tag: '{error}'")
print("Aborting.")
exit(1)
sys.exit(1)
tags_removed = True

if tags_removed is False:
Expand Down
4 changes: 3 additions & 1 deletion files/destroy-vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Utility to quickly destroy a Qubes VM managed by the Workstation
salt config, for use in repeated builds during development.
"""

import argparse
import subprocess
import sys
Expand Down Expand Up @@ -38,7 +39,8 @@ def destroy_vm(vm):
Destroys a single VM instance. Requires arg to be
QubesVM object.
"""
assert SDW_DEFAULT_TAG in vm.tags
if SDW_DEFAULT_TAG not in vm.tags:
raise Exception("VM does not have the 'sd-workstation' tag")
if vm.is_running():
vm.kill()
print(f"Destroying VM '{vm.name}'... ", end="")
Expand Down
10 changes: 4 additions & 6 deletions files/sdw-admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
packages only puts the files in place `/srv/salt` but does not apply the state, nor
does it handle the config.
"""

import argparse
import os
import subprocess
Expand Down Expand Up @@ -59,9 +60,7 @@ def parse_args():
action="store_true",
help="During uninstall action, don't prompt for confirmation, proceed immediately",
)
args = parser.parse_args()

return args
return parser.parse_args()


def install_pvh_support():
Expand All @@ -70,9 +69,9 @@ def install_pvh_support():
TODO: install this via package requirements instead if possible
"""
try:
subprocess.run(["sudo", "qubes-dom0-update", "-y", "-q", "grub2-xen-pvh"], check=False)
subprocess.check_call(["sudo", "qubes-dom0-update", "-y", "-q", "grub2-xen-pvh"])
except subprocess.CalledProcessError:
raise SDWAdminException("Error installing grub2-xen-pvah: local PVH not available.")
raise SDWAdminException("Error installing grub2-xen-pvh: local PVH not available.")


def copy_config():
Expand Down Expand Up @@ -141,7 +140,6 @@ def refresh_salt():


def perform_uninstall(keep_template_rpm=False):

try:
subprocess.check_call(["sudo", "qubesctl", "state.sls", "sd-clean-default-dispvm"])
print("Destroying all VMs")
Expand Down
3 changes: 1 addition & 2 deletions files/validate_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ def confirm_submission_privkey_fingerprint(self):

def read_config_file(self):
with open(self.config_filepath) as f:
j = json.load(f)
return j
return json.load(f)

def validate_existing_size(self):
"""This method checks for existing private volume size and new
Expand Down
3 changes: 1 addition & 2 deletions launcher/tests/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def test_warning_shown_if_updater_never_ran(mocked_info, mocked_warning, mocked_
"""
# We're going to look for a nonexistent file in an existing temporary directoryr
with mock.patch("sdw_notify.Notify.LAST_UPDATED_FILE", tmp_path / "not-a-file"):

warning_should_be_shown = Notify.is_update_check_necessary()

# No handled errors should occur
Expand All @@ -61,7 +60,7 @@ def test_warning_shown_if_updater_never_ran(mocked_info, mocked_warning, mocked_


@pytest.mark.parametrize(
"uptime,warning_expected",
("uptime", "warning_expected"),
[(Notify.UPTIME_GRACE_PERIOD + 1, True), (Notify.UPTIME_GRACE_PERIOD - 1, False)],
)
@mock.patch("sdw_notify.Notify.sdlog.error")
Expand Down
10 changes: 6 additions & 4 deletions launcher/tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ def test_write_last_updated_flags_to_disk(mocked_info, mocked_error, mocked_call
mocked_call.assert_called_once_with(subprocess_command)
assert not mocked_error.called
assert os.path.exists(flag_file_dom0)
contents = open(flag_file_dom0).read()
with open(flag_file_dom0) as f:
contents = f.read()

assert contents == current_time


Expand Down Expand Up @@ -294,7 +296,7 @@ def test_apply_updates_vms(mocked_info, mocked_error, mocked_call, vm):
result = Updater._apply_updates_vm(vm)
assert result == UpdateStatus.UPDATES_OK

if vm.startswith("fedora") or vm.startswith("whonix"):
if vm.startswith(("fedora", "whonix")):
expected_salt_state = "update.qubes-vm"
else:
expected_salt_state = "fpf-apt-repo"
Expand Down Expand Up @@ -638,7 +640,7 @@ def test_last_required_reboot_performed_not_required(mocked_info, mocked_error,


@pytest.mark.parametrize(
"status, rebooted, expect_status_change, expect_updater",
("status", "rebooted", "expect_status_change", "expect_updater"),
[
(UpdateStatus.UPDATES_OK, True, False, True),
(UpdateStatus.UPDATES_REQUIRED, True, False, True),
Expand Down Expand Up @@ -674,7 +676,7 @@ def test_should_run_updater_status_interval_expired(


@pytest.mark.parametrize(
"status, rebooted, expect_status_change, expect_updater",
("status", "rebooted", "expect_status_change", "expect_updater"),
[
(UpdateStatus.UPDATES_OK, True, False, False),
(UpdateStatus.UPDATES_REQUIRED, True, False, True),
Expand Down
14 changes: 5 additions & 9 deletions launcher/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def test_obtain_lock(mocked_info, mocked_warning, mocked_error, tmp_path):
Test whether we can successfully obtain an exclusive lock
"""
with mock.patch("sdw_util.Util.LOCK_DIRECTORY", tmp_path):

basename = "test-obtain-lock.lock"
pid_str = str(os.getpid())
lh = Util.obtain_lock(basename)
Expand Down Expand Up @@ -67,7 +66,6 @@ def test_cannot_obtain_exclusive_lock_when_busy(
from being instantiated.
"""
with mock.patch("sdw_util.Util.LOCK_DIRECTORY", tmp_path):

basename = "test-exclusive-lock.lock"
Util.obtain_lock(basename)

Expand All @@ -94,7 +92,6 @@ def test_cannot_obtain_shared_lock_when_busy(mocked_info, mocked_warning, mocked
from being displayed when the preflight updater is already open.
"""
with mock.patch("sdw_util.Util.LOCK_DIRECTORY", tmp_path):

basename = "test-conflict.lock"
Util.obtain_lock(basename)

Expand Down Expand Up @@ -147,7 +144,6 @@ def test_stale_lockfile_has_no_effect(mocked_info, mocked_warning, mocked_error,
is accessing it.
"""
with mock.patch("sdw_util.Util.LOCK_DIRECTORY", tmp_path):

# Because we're not assigning the return value, it will be immediately released
basename = "test-stale.lock"
Util.obtain_lock(basename)
Expand All @@ -172,7 +168,7 @@ def test_log(tmp_path):
assert count == 3


@pytest.mark.parametrize("return_code,expected_result", [(0, True), (1, False)])
@pytest.mark.parametrize(("return_code", "expected_result"), [(0, True), (1, False)])
@mock.patch("sdw_util.Util.sdlog.error")
@mock.patch("sdw_util.Util.sdlog.warning")
@mock.patch("sdw_util.Util.sdlog.info")
Expand All @@ -199,7 +195,7 @@ def test_for_conflicting_process(


@pytest.mark.parametrize(
"os_release_fixture,version_contains",
("os_release_fixture", "version_contains"),
[
("os-release-qubes-4.1", "4.1"),
("os-release-ubuntu", None),
Expand Down Expand Up @@ -240,7 +236,7 @@ def test_get_logger():


@pytest.mark.parametrize(
"os_release_fixture,version_contains",
("os_release_fixture", "version_contains"),
[
("os-release-qubes-4.1", "4.1"),
("os-release-ubuntu", None),
Expand All @@ -265,7 +261,7 @@ def test_is_sdapp_halted_yes(os_release_fixture, version_contains):


@pytest.mark.parametrize(
"os_release_fixture,version_contains",
("os_release_fixture", "version_contains"),
[
("os-release-qubes-4.1", "4.1"),
("os-release-ubuntu", None),
Expand All @@ -290,7 +286,7 @@ def test_is_sdapp_halted_no(os_release_fixture, version_contains):


@pytest.mark.parametrize(
"os_release_fixture,version_contains",
("os_release_fixture", "version_contains"),
[
("os-release-qubes-4.1", "4.1"),
("os-release-ubuntu", None),
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ select = [
]
ignore = [
# code complexity checks that we fail
"PLR0911",
"PLR0913",
"PLR0915",
# magic-value-comparison, too many violations for now
"PLR2004",
# hardcoded passwords, false positive
"S105",
# it's fine to use /tmp in dom0, since it's not a multiuser environment
"S108",
# flags every instance of subprocess
"S603",
# we trust $PATH isn't hijacked
Expand All @@ -77,6 +80,8 @@ ignore = [
"**/tests/**.py" = [
# Use of `assert` detected
"S101",
# Tests use /tmp
"S108",
# Use a regular `assert` instead of unittest-style `assertEqual`
"PT009",
]
Expand Down
29 changes: 14 additions & 15 deletions sdw_notify/Notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def is_update_check_necessary():
"Updater may never have run. Showing security warning."
)
return True
elif updated_seconds_ago > WARNING_THRESHOLD:
if updated_seconds_ago > WARNING_THRESHOLD:
if uptime_seconds > UPTIME_GRACE_PERIOD:
sdlog.warning(
f"Last successful update ({updated_hours_ago:.1f} hours ago) is above "
Expand All @@ -87,25 +87,24 @@ def is_update_check_necessary():
"Showing security warning."
)
return True
else:
sdlog.info(
f"Last successful update ({updated_hours_ago:.1f} hours ago) is above "
f"warning threshold ({warning_threshold_hours:.1f} hours). Uptime grace period "
f"of {grace_period_hours:.1f} hours has not elapsed yet (uptime: {uptime_hours:.1f} "
"hours). Exiting without warning."
)
return False
else:

sdlog.info(
f"Last successful update ({updated_hours_ago:.1f} hours ago) "
f"is below the warning threshold ({warning_threshold_hours:.1f} hours). "
"Exiting without warning."
f"Last successful update ({updated_hours_ago:.1f} hours ago) is above "
f"warning threshold ({warning_threshold_hours:.1f} hours). Uptime grace period "
f"of {grace_period_hours:.1f} hours has not elapsed yet (uptime: {uptime_hours:.1f} "
"hours). Exiting without warning."
)
return False

sdlog.info(
f"Last successful update ({updated_hours_ago:.1f} hours ago) "
f"is below the warning threshold ({warning_threshold_hours:.1f} hours). "
"Exiting without warning."
)
return False


def get_uptime_seconds():
# Obtain current uptime
with open("/proc/uptime") as f:
uptime_seconds = float(f.readline().split()[0])
return uptime_seconds
return float(f.readline().split()[0])
22 changes: 10 additions & 12 deletions sdw_notify/NotifyApp.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,16 @@ def run(self) -> NotifyStatus:
result = self.exec_()

if result == QMessageBox.Ok:
sdlog.info(
f"NotfyDialog returned {result}, user has opted to check for updates"
)
sdlog.info(f"NotfyDialog returned {result}, user has opted to check for updates")
return NotifyStatus.CHECK_UPDATES
elif result == QMessageBox.No:
if result == QMessageBox.No:
sdlog.info(f"NotfyDialog returned {result}, user has opted to defer updates")
return NotifyStatus.DEFER_UPDATES
else:
# Should not occur, as this dialog which can only return
# one of two states.
sdlog.error(
f"NotfyDialog returned unexpected value {result}; consult "
"QMessageBox API for more information"
)
return NotifyStatus.ERROR_UNKNOWN

# Should not occur, as this dialog which can only return
# one of two states.
sdlog.error(
f"NotfyDialog returned unexpected value {result}; consult "
"QMessageBox API for more information"
)
return NotifyStatus.ERROR_UNKNOWN
Loading

0 comments on commit 59155c6

Please sign in to comment.