Skip to content

Commit

Permalink
refactor commands, add subcommand support
Browse files Browse the repository at this point in the history
  • Loading branch information
arily committed Dec 10, 2024
1 parent f9263dd commit 2d8efe3
Showing 1 changed file with 121 additions and 29 deletions.
150 changes: 121 additions & 29 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,21 @@ class Command(NamedTuple):


class CommandSet:
trigger: str
doc: str
commands: list[Command]
subcommands: list[CommandSet]
parent: CommandSet | None

def __init__(self, trigger: str, doc: str) -> None:
self.trigger = trigger
self.doc = doc

self.commands: list[Command] = []

self.subcommands: list[CommandSet] = []
self.parent = None

def add(
self,
priv: Privileges,
Expand All @@ -110,9 +119,10 @@ def wrapper(f: Callback) -> Callback:
Command(
# NOTE: this method assumes that functions without any
# triggers will be named like '{self.trigger}_{trigger}'.
triggers=(
[f.__name__.removeprefix(f"{self.trigger}_").strip()] + aliases
),
triggers=[
self.remove_prefix(f.__name__),
*aliases,
],
callback=f,
priv=priv,
hidden=hidden,
Expand All @@ -124,6 +134,47 @@ def wrapper(f: Callback) -> Callback:

return wrapper

def subcommand(self, set: CommandSet) -> CommandSet:
self.subcommands.append(set)
set.parent = self

return set

def remove_prefix(self, name: str) -> str:
_name = (
self.parent.remove_prefix(name).removeprefix(self.trigger + "_").strip()
if self.parent is not None
else name.removeprefix(f"{self.trigger}_").strip()
)

return _name


def help_pure(
ctx: Context,
cmd_or_sets: list[Command | CommandSet],
prefix: str = app.settings.COMMAND_PREFIX,
) -> str:
"""Show all documented streamer commands the player can access."""
cb = []
cmds = []

for cmd in cmd_or_sets:
match cmd:
case Command():
if not cmd.doc or ctx.player.priv & cmd.priv != cmd.priv:
# no doc, or insufficient permissions.
continue
if cmd.callback in cb:
continue

cmds.append(f"{prefix} {cmd.triggers[0]}: {cmd.doc}")
cb.append(cmd.callback)
case CommandSet():
cmds.append(f"{prefix} {cmd.trigger}: {cmd.doc}")

return "\n".join(cmds)


mp_commands = CommandSet("mp", "Multiplayer commands.")
pool_commands = CommandSet("pool", "Mappool commands.")
Expand Down Expand Up @@ -2497,37 +2548,78 @@ async def process_commands(
# case-insensitive triggers
trigger = trigger.lower()

commands = cmd_set.commands
commands = [*cmd_set.subcommands, *cmd_set.commands]
break
else:
# no set commands matched, check normal commands.
commands = regular_commands

for cmd in commands:
if trigger in cmd.triggers and player.priv & cmd.priv == cmd.priv:
# found matching trigger with sufficient privs
try:
res = await cmd.callback(
Context(
player=player,
trigger=trigger,
args=args,
recipient=target,
),
)
except Exception:
# print exception info to the console,
# but do not break the player's session.
traceback.print_exc()
res, cmd = await process_commands_pure(
player=player,
target=target,
msg=msg,
head=trigger,
args=args,
cmd_or_sets=[*commands],
)
if res is not None:
# we have a message to return, include elapsed time
elapsed = app.logging.magnitude_fmt_time(clock_ns() - start_time)
return {
"resp": f"{res} | Elapsed: {elapsed}",
"hidden": cmd.hidden if cmd else False,
}
else:
# no message to return
return {"resp": None, "hidden": False}

res = "An exception occurred when running the command."

if res is not None:
# we have a message to return, include elapsed time
elapsed = app.logging.magnitude_fmt_time(clock_ns() - start_time)
return {"resp": f"{res} | Elapsed: {elapsed}", "hidden": cmd.hidden}
else:
# no message to return
return {"resp": None, "hidden": False}
async def process_commands_pure(
player: Player,
target: Channel | Player,
msg: str,
head: str,
args: list[str],
cmd_or_sets: list[Command | CommandSet],
) -> tuple[str | None, Command | None]:
for cmd in cmd_or_sets:
match cmd:
case Command():
if head in cmd.triggers and player.priv & cmd.priv == cmd.priv:
# found matching trigger with sufficient privs
try:
res = await cmd.callback(
Context(
player=player,
trigger=head,
args=args,
recipient=target,
),
)
return (res, cmd)
except Exception:
# print exception info to the console,
# but do not break the player's session.
traceback.print_exc()

res = "An exception occurred when running the command."
return (res, None)
case CommandSet():
if head != cmd.trigger:
continue

# found matching command set
if not args:
args = [""]
head, *args = args

return await process_commands_pure(
player=player,
target=target,
msg=msg,
head=head,
args=args,
cmd_or_sets=[*cmd.subcommands, *cmd.commands],
)

return None
return (None, None)

0 comments on commit 2d8efe3

Please sign in to comment.