Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, events: reduce memory usage when batch deleting objects #12436

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

rissson
Copy link
Member

@rissson rissson commented Dec 20, 2024

Details

REPLACE ME


Checklist

  • Local tests pass (ak test authentik/)
  • The code has been formatted (make lint-fix)

If an API change has been made

  • The API schema has been updated (make gen-build)

If changes to the frontend have been made

  • The code has been formatted (make web)

If applicable

  • The documentation has been updated
  • The documentation has been formatted (make website)

@rissson rissson self-assigned this Dec 20, 2024
@rissson rissson requested a review from a team as a code owner December 20, 2024 10:58
Copy link

netlify bot commented Dec 20, 2024

Deploy Preview for authentik-docs canceled.

Name Link
🔨 Latest commit 7d0abbf
🔍 Latest deploy log https://app.netlify.com/sites/authentik-docs/deploys/67654dbc84fbad0008ef7843

Copy link

netlify bot commented Dec 20, 2024

Deploy Preview for authentik-storybook canceled.

Name Link
🔨 Latest commit 7d0abbf
🔍 Latest deploy log https://app.netlify.com/sites/authentik-storybook/deploys/67654dbccae31b000889e1a7

Copy link

codecov bot commented Dec 20, 2024

❌ 7 Tests Failed:

Tests completed Failed Passed Skipped
1648 7 1641 2
View the top 3 failed tests by shortest run time
authentik.enterprise.providers.google_workspace.tests.test_users.GoogleWorkspaceUserTests::test_user_create_delete
Stack Traces | 0.245s run time
self = <unittest.case._Outcome object at 0x7fa008e172f0>
test_case = <authentik.enterprise.providers.google_workspace.tests.test_users.GoogleWorkspaceUserTests testMethod=test_user_create_delete>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.12.8............/x64/lib/python3.12/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.enterprise.providers.google_workspace.tests.test_users.GoogleWorkspaceUserTests testMethod=test_user_create_delete>
result = <TestCaseFunction test_user_create_delete>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.12.8............/x64/lib/python3.12/unittest/case.py:634: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.enterprise.providers.google_workspace.tests.test_users.GoogleWorkspaceUserTests testMethod=test_user_create_delete>
method = <bound method GoogleWorkspaceUserTests.test_user_create_delete of <authentik.enterprise.providers.google_workspace.tests.test_users.GoogleWorkspaceUserTests testMethod=test_user_create_delete>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.12.8............/x64/lib/python3.12/unittest/case.py:589: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.enterprise.providers.google_workspace.tests.test_users.GoogleWorkspaceUserTests testMethod=test_user_create_delete>

    def test_user_create_delete(self):
        """Test user deletion"""
        uid = generate_id()
        http = MockHTTP()
        http.add_response(
            f"https://admin.googleapis..../customer/my_customer/domains?key={self.api_key}&alt=json",
            domains_list_v1_mock,
        )
        http.add_response(
            f"https://admin.googleapis..../directory/v1/users?key={self.api_key}&alt=json",
            method="POST",
            body={"primaryEmail": f"{uid}@goauthentik.io"},
        )
        http.add_response(
            f"https://admin.googleapis..../directory/v1/users/{uid}%40goauthentik.io?key={self.api_key}",
            method="DELETE",
        )
        with patch(
            "authentik.enterprise.providers.google_workspace.models.GoogleWorkspaceProvider.google_credentials",
            MagicMock(return_value={"developerKey": self.api_key, "http": http}),
        ):
            user = User.objects.create(
                username=uid,
                name=f"{uid} {uid}",
                email=f"{uid}@goauthentik.io",
            )
            google_user = GoogleWorkspaceProviderUser.objects.filter(
                provider=self.provider, user=user
            ).first()
            self.assertIsNotNone(google_user)
    
            user.delete()
>           self.assertFalse(Event.objects.filter(action=EventAction.SYSTEM_EXCEPTION).exists())

.../google_workspace/tests/test_users.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.enterprise.providers.google_workspace.tests.test_users.GoogleWorkspaceUserTests testMethod=test_user_create_delete>
expr = True, msg = 'True is not false'

    def assertFalse(self, expr, msg=None):
        """Check that the expression is false."""
        if expr:
            msg = self._formatMessage(msg, "%s is not false" % safe_repr(expr))
>           raise self.failureException(msg)
E           AssertionError: True is not false

.../hostedtoolcache/Python/3.12.8............/x64/lib/python3.12/unittest/case.py:721: AssertionError
authentik.events.tests.test_tasks.TestSystemTasks::test_tasks
Stack Traces | 0.388s run time
self = <unittest.case._Outcome object at 0x7fa00a2734a0>
test_case = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.12.8........./x64/lib/python3.12/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks>
result = <TestCaseFunction test_tasks>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.12.8........./x64/lib/python3.12/unittest/case.py:634: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks>
method = <bound method TestSystemTasks.test_tasks of <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.12.8........./x64/lib/python3.12/unittest/case.py:589: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks>

    def test_tasks(self):
        """Test Task API"""
>       clean_expired_models.delay().get()

.../events/tests/test_tasks.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <EagerResult: a29bff88-a8f8-46d4-a5f7-8113c0f04670>, timeout = None
propagate = True, disable_sync_subtasks = True, kwargs = {}

    def get(self, timeout=None, propagate=True,
            disable_sync_subtasks=True, **kwargs):
        if disable_sync_subtasks:
            assert_will_not_block()
    
        if self.successful():
            return self.result
        elif self.state in states.PROPAGATE_STATES:
            if propagate:
>               raise self.result if isinstance(
                    self.result, Exception) else Exception(self.result)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../site-packages/celery/result.py:1026: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

uuid = 'a29bff88-a8f8-46d4-a5f7-8113c0f04670', args = (), kwargs = {}
request = {'callbacks': None, 'delivery_info': {'exchange': None, 'is_eager': True, 'priority': None, 'routing_key': None}, 'errbacks': None, 'headers': {'_schema_name': 'public'}, ...}

    def trace_task(uuid, args, kwargs, request=None):
        # R      - is the possibly prepared return value.
        # I      - is the Info object.
        # T      - runtime
        # Rstr   - textual representation of return value
        # retval - is the always unmodified return value.
        # state  - is the resulting task state.
    
        # This function is very long because we've unrolled all the calls
        # for performance reasons, and because the function is so long
        # we want the main variables (I, and R) to stand out visually from the
        # the rest of the variables, so breaking PEP8 is worth it ;)
        R = I = T = Rstr = retval = state = None
        task_request = None
        time_start = monotonic()
        try:
            try:
                kwargs.items
            except AttributeError:
                raise InvalidTaskError(
                    'Task keyword arguments is not a mapping')
    
            task_request = Context(request or {}, args=args,
                                   called_directly=False, kwargs=kwargs)
    
            redelivered = (task_request.delivery_info
                           and task_request.delivery_info.get('redelivered', False))
            if deduplicate_successful_tasks and redelivered:
                if task_request.id in successful_requests:
                    return trace_ok_t(R, I, T, Rstr)
                r = AsyncResult(task_request.id, app=app)
    
                try:
                    state = r.state
                except BackendGetMetaError:
                    pass
                else:
                    if state == SUCCESS:
                        info(LOG_IGNORED, {
                            'id': task_request.id,
                            'name': get_task_name(task_request, name),
                            'description': 'Task already completed successfully.'
                        })
                        return trace_ok_t(R, I, T, Rstr)
    
            push_task(task)
            root_id = task_request.root_id or uuid
            task_priority = task_request.delivery_info.get('priority') if \
                inherit_parent_priority else None
            push_request(task_request)
            try:
                # -*- PRE -*-
                if prerun_receivers:
                    send_prerun(sender=task, task_id=uuid, task=task,
                                args=args, kwargs=kwargs)
                loader_task_init(uuid, task)
                if track_started:
                    task.backend.store_result(
                        uuid, {'pid': pid, 'hostname': hostname}, STARTED,
                        request=task_request,
                    )
    
                # -*- TRACE -*-
                try:
                    if task_before_start:
                        task_before_start(uuid, args, kwargs)
    
>                   R = retval = fun(*args, **kwargs)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../celery/app/trace.py:453: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12....../site-packages/sentry_sdk/utils.py:1860: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @ensure_integration_enabled(CeleryIntegration, f)
    def _inner(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        try:
            with sentry_sdk.start_span(
                op=OP.QUEUE_PROCESS,
                name=task.name,
                origin=CeleryIntegration.origin,
            ) as span:
                _set_messaging_destination_name(task, span)
    
                latency = None
                with capture_internal_exceptions():
                    if (
                        task.request.headers is not None
                        and "sentry-task-enqueued-time" in task.request.headers
                    ):
                        latency = _now_seconds_since_epoch() - task.request.headers.pop(
                            "sentry-task-enqueued-time"
                        )
    
                if latency is not None:
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
    
                with capture_internal_exceptions():
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
                    )
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_SYSTEM,
                        task.app.connection().transport.driver_type,
                    )
    
                return f(*args, **kwargs)
        except Exception:
            exc_info = sys.exc_info()
            with capture_internal_exceptions():
                _capture_exception(task, exc_info)
>           reraise(*exc_info)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../integrations/celery/__init__.py:416: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def reraise(tp, value, tb=None):
        # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[Any]) -> NoReturn
        assert value is not None
        if value.__traceback__ is not tb:
            raise value.with_traceback(tb)
>       raise value

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12....../site-packages/sentry_sdk/utils.py:1795: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @ensure_integration_enabled(CeleryIntegration, f)
    def _inner(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        try:
            with sentry_sdk.start_span(
                op=OP.QUEUE_PROCESS,
                name=task.name,
                origin=CeleryIntegration.origin,
            ) as span:
                _set_messaging_destination_name(task, span)
    
                latency = None
                with capture_internal_exceptions():
                    if (
                        task.request.headers is not None
                        and "sentry-task-enqueued-time" in task.request.headers
                    ):
                        latency = _now_seconds_since_epoch() - task.request.headers.pop(
                            "sentry-task-enqueued-time"
                        )
    
                if latency is not None:
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
    
                with capture_internal_exceptions():
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
                    )
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_SYSTEM,
                        task.app.connection().transport.driver_type,
                    )
    
>               return f(*args, **kwargs)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../integrations/celery/__init__.py:411: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @CELERY_APP.task(bind=True, base=SystemTask)
    @prefill_task
    def clean_expired_models(self: SystemTask):
        """Remove expired objects"""
        messages = []
        for cls in ExpiringModel.__subclasses__():
            cls: ExpiringModel
            objects = (
                cls.objects.all().exclude(expiring=False).exclude(expiring=True, expires__gt=now())
            )
            amount = objects.count()
>           for obj in qs_batch_iter(objects):

authentik/core/tasks.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def qs_batch_iter(qs: QuerySet, batch_size: int = 10_000, gc_collect: bool = True):
        pk_iter = qs.values_list("pk", flat=True).order_by("pk").distinct().iterator()
        eof = False
        while not eof:
            pk_buffer = []
            i = 0
            try:
                while i < batch_size:
>                   pk_buffer.append(pk_iter.next())
E                   AttributeError: 'generator' object has no attribute 'next'

.../lib/utils/db.py:16: AttributeError
authentik.events.tests.test_tasks.TestSystemTasks::test_tasks_run
Stack Traces | 0.392s run time
self = <unittest.case._Outcome object at 0x7fa00a00aa80>
test_case = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks_run>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.12.8........./x64/lib/python3.12/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks_run>
result = <TestCaseFunction test_tasks_run>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.12.8........./x64/lib/python3.12/unittest/case.py:634: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks_run>
method = <bound method TestSystemTasks.test_tasks_run of <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks_run>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.12.8........./x64/lib/python3.12/unittest/case.py:589: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.events.tests.test_tasks.TestSystemTasks testMethod=test_tasks_run>

    def test_tasks_run(self):
        """Test Task API (run)"""
>       clean_expired_models.delay().get()

.../events/tests/test_tasks.py:85: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <EagerResult: 69a885b3-fca9-4b85-9d99-d616bb63b365>, timeout = None
propagate = True, disable_sync_subtasks = True, kwargs = {}

    def get(self, timeout=None, propagate=True,
            disable_sync_subtasks=True, **kwargs):
        if disable_sync_subtasks:
            assert_will_not_block()
    
        if self.successful():
            return self.result
        elif self.state in states.PROPAGATE_STATES:
            if propagate:
>               raise self.result if isinstance(
                    self.result, Exception) else Exception(self.result)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../site-packages/celery/result.py:1026: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

uuid = '69a885b3-fca9-4b85-9d99-d616bb63b365', args = (), kwargs = {}
request = {'callbacks': None, 'delivery_info': {'exchange': None, 'is_eager': True, 'priority': None, 'routing_key': None}, 'errbacks': None, 'headers': {'_schema_name': 'public'}, ...}

    def trace_task(uuid, args, kwargs, request=None):
        # R      - is the possibly prepared return value.
        # I      - is the Info object.
        # T      - runtime
        # Rstr   - textual representation of return value
        # retval - is the always unmodified return value.
        # state  - is the resulting task state.
    
        # This function is very long because we've unrolled all the calls
        # for performance reasons, and because the function is so long
        # we want the main variables (I, and R) to stand out visually from the
        # the rest of the variables, so breaking PEP8 is worth it ;)
        R = I = T = Rstr = retval = state = None
        task_request = None
        time_start = monotonic()
        try:
            try:
                kwargs.items
            except AttributeError:
                raise InvalidTaskError(
                    'Task keyword arguments is not a mapping')
    
            task_request = Context(request or {}, args=args,
                                   called_directly=False, kwargs=kwargs)
    
            redelivered = (task_request.delivery_info
                           and task_request.delivery_info.get('redelivered', False))
            if deduplicate_successful_tasks and redelivered:
                if task_request.id in successful_requests:
                    return trace_ok_t(R, I, T, Rstr)
                r = AsyncResult(task_request.id, app=app)
    
                try:
                    state = r.state
                except BackendGetMetaError:
                    pass
                else:
                    if state == SUCCESS:
                        info(LOG_IGNORED, {
                            'id': task_request.id,
                            'name': get_task_name(task_request, name),
                            'description': 'Task already completed successfully.'
                        })
                        return trace_ok_t(R, I, T, Rstr)
    
            push_task(task)
            root_id = task_request.root_id or uuid
            task_priority = task_request.delivery_info.get('priority') if \
                inherit_parent_priority else None
            push_request(task_request)
            try:
                # -*- PRE -*-
                if prerun_receivers:
                    send_prerun(sender=task, task_id=uuid, task=task,
                                args=args, kwargs=kwargs)
                loader_task_init(uuid, task)
                if track_started:
                    task.backend.store_result(
                        uuid, {'pid': pid, 'hostname': hostname}, STARTED,
                        request=task_request,
                    )
    
                # -*- TRACE -*-
                try:
                    if task_before_start:
                        task_before_start(uuid, args, kwargs)
    
>                   R = retval = fun(*args, **kwargs)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../celery/app/trace.py:453: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12....../site-packages/sentry_sdk/utils.py:1860: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @ensure_integration_enabled(CeleryIntegration, f)
    def _inner(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        try:
            with sentry_sdk.start_span(
                op=OP.QUEUE_PROCESS,
                name=task.name,
                origin=CeleryIntegration.origin,
            ) as span:
                _set_messaging_destination_name(task, span)
    
                latency = None
                with capture_internal_exceptions():
                    if (
                        task.request.headers is not None
                        and "sentry-task-enqueued-time" in task.request.headers
                    ):
                        latency = _now_seconds_since_epoch() - task.request.headers.pop(
                            "sentry-task-enqueued-time"
                        )
    
                if latency is not None:
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
    
                with capture_internal_exceptions():
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
                    )
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_SYSTEM,
                        task.app.connection().transport.driver_type,
                    )
    
                return f(*args, **kwargs)
        except Exception:
            exc_info = sys.exc_info()
            with capture_internal_exceptions():
                _capture_exception(task, exc_info)
>           reraise(*exc_info)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../integrations/celery/__init__.py:416: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def reraise(tp, value, tb=None):
        # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[Any]) -> NoReturn
        assert value is not None
        if value.__traceback__ is not tb:
            raise value.with_traceback(tb)
>       raise value

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12....../site-packages/sentry_sdk/utils.py:1795: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @ensure_integration_enabled(CeleryIntegration, f)
    def _inner(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        try:
            with sentry_sdk.start_span(
                op=OP.QUEUE_PROCESS,
                name=task.name,
                origin=CeleryIntegration.origin,
            ) as span:
                _set_messaging_destination_name(task, span)
    
                latency = None
                with capture_internal_exceptions():
                    if (
                        task.request.headers is not None
                        and "sentry-task-enqueued-time" in task.request.headers
                    ):
                        latency = _now_seconds_since_epoch() - task.request.headers.pop(
                            "sentry-task-enqueued-time"
                        )
    
                if latency is not None:
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
    
                with capture_internal_exceptions():
                    span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
                    )
    
                with capture_internal_exceptions():
                    span.set_data(
                        SPANDATA.MESSAGING_SYSTEM,
                        task.app.connection().transport.driver_type,
                    )
    
>               return f(*args, **kwargs)

../../../..../pypoetry/virtualenvs/authentik-xvtLQ9eE-py3.12/lib/python3.12.../integrations/celery/__init__.py:411: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @CELERY_APP.task(bind=True, base=SystemTask)
    @prefill_task
    def clean_expired_models(self: SystemTask):
        """Remove expired objects"""
        messages = []
        for cls in ExpiringModel.__subclasses__():
            cls: ExpiringModel
            objects = (
                cls.objects.all().exclude(expiring=False).exclude(expiring=True, expires__gt=now())
            )
            amount = objects.count()
>           for obj in qs_batch_iter(objects):

authentik/core/tasks.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def qs_batch_iter(qs: QuerySet, batch_size: int = 10_000, gc_collect: bool = True):
        pk_iter = qs.values_list("pk", flat=True).order_by("pk").distinct().iterator()
        eof = False
        while not eof:
            pk_buffer = []
            i = 0
            try:
                while i < batch_size:
>                   pk_buffer.append(pk_iter.next())
E                   AttributeError: 'generator' object has no attribute 'next'

.../lib/utils/db.py:16: AttributeError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant