Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1194,11 +1194,16 @@ async def reboot(
(self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)

await asyncio.gather(
results = await asyncio.gather(
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
*[listener(EventMigratingData()) for listener in migrating_listeners],
return_exceptions=True,
)

for result in results:
if isinstance(result, Exception):
self.log.exception('A pre-reboot event listener failed', exc_info=result)

if not self.configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

Expand Down
55 changes: 55 additions & 0 deletions tests/unit/actor/test_actor_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import logging
import warnings
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -344,3 +345,57 @@ async def test_get_remaining_time_returns_positive_when_timeout_in_future() -> N
assert result is not None
assert result > timedelta(0)
assert result <= timedelta(minutes=5)


async def test_reboot_runs_all_listeners_even_when_one_fails(
apify_client_async_patcher: ApifyClientAsyncPatcher,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that a failing pre-reboot event listener does not prevent other listeners from running.

Directly injects raw async callables into the event manager's _listeners_to_wrappers
to simulate exceptions escaping the wrapper layer (the scenario return_exceptions=True guards against).
"""
apify_client_async_patcher.patch('run', 'reboot', return_value=None)

persist_state_called = False
migrating_called = False

async def failing_listener(*_args: object) -> None:
raise RuntimeError('persist_state listener error')

async def successful_persist_state_listener(*_args: object) -> None:
nonlocal persist_state_called
persist_state_called = True

async def successful_migrating_listener(*_args: object) -> None:
nonlocal migrating_called
migrating_called = True

async with Actor:
Actor.configuration.is_at_home = True
Actor.configuration.actor_run_id = 'some-run-id'

# Inject raw listeners directly into the event manager's internal structure,
# bypassing crawlee's wrapper that would catch exceptions on its own.
listeners_map = Actor.event_manager._listeners_to_wrappers
listeners_map[Event.PERSIST_STATE] = {
failing_listener: [failing_listener],
successful_persist_state_listener: [successful_persist_state_listener],
}
listeners_map[Event.MIGRATING] = {
successful_migrating_listener: [successful_migrating_listener],
}

with caplog.at_level(logging.ERROR):
await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1))

# All listeners ran despite the failure in one of them.
assert persist_state_called
assert migrating_called

# The exception was logged.
assert any('A pre-reboot event listener failed' in r.message for r in caplog.records)

# The reboot API call was still made.
assert len(apify_client_async_patcher.calls['run']['reboot']) == 1
Loading