diff --git a/pikatasks/settings.py b/pikatasks/settings.py index 6717e8fbd2ece509743cfef8350a7c42dec2217a..d1a04cc8af2be81f85b753c33093a2a0e465d075 100644 --- a/pikatasks/settings.py +++ b/pikatasks/settings.py @@ -35,7 +35,6 @@ WORKER_GRACEFUL_STOP_TIMEOUT = timedelta(seconds=60) # stuff you probably don't want to touch: BLOCKED_CONNECTION_TIMEOUT = timedelta(seconds=20) # weird stuff to avoid deadlocks, see pika documentation -WORKER_CHECK_SUBPROCESSES_PERIOD = timedelta(seconds=2) # merge these settings with django.conf.settings diff --git a/pikatasks/worker.py b/pikatasks/worker.py index 651f2be0e33fda989b8c7321a6db65a8e49f94ed..0f7e28c0c352fe365ca6a93761330a27362d5686 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -6,13 +6,14 @@ import os import pika from pika.exceptions import AMQPChannelError, AMQPConnectionError from queue import Empty -from datetime import datetime +from datetime import datetime, timedelta from . import settings from . import utils from . import django_compat -SIGNAL_CHECK_FREQUENCY = 2 # seconds +MASTER_IPC_PERIOD = timedelta(seconds=0.2) # how often master checks own signals and minion processes +MINION_IPC_PERIOD = timedelta(seconds=0.2) # how often minions check their signals class _SignalHandler: @@ -45,8 +46,10 @@ def start(tasks=utils.all_tasks, number_of_processes=None): """ logger = logging.getLogger("pikatasks.worker.master") + processes = list() - def remove_ended_processes(processes, expect_exited_processes): + def remove_ended_processes(expect_exited_processes): + nonlocal processes alive = [p for p in processes if p.is_alive()] exited = set(processes) - set(alive) for p in exited: @@ -68,22 +71,25 @@ def start(tasks=utils.all_tasks, number_of_processes=None): logger.info("Started new minion (PID={0}).".format(p.pid)) return p - def stop_minion(processes): - deadline = datetime.now() + settings.WORKER_GRACEFUL_STOP_TIMEOUT - while datetime.now() < deadline: + def stop_minions(): + nonlocal processes + last_reminder_dt = datetime.now() + deadline_dt = datetime.now() + settings.WORKER_GRACEFUL_STOP_TIMEOUT + while processes and datetime.now() < deadline_dt: for p in processes: - os.kill(p.pid, signal.SIGTERM) - time.sleep(SIGNAL_CHECK_FREQUENCY) - remove_ended_processes(processes, expect_exited_processes=True) - if processes: - logger.info("Stopping... Minions still running: {n}. Deadline in: {d}.".format(d=deadline - datetime.now(), n=len(processes))) - else: - break - - def force_kill_minion(processes): + os.kill(p.pid, signal.SIGTERM) # SIGTERM = ask nicely + time.sleep((MINION_IPC_PERIOD / 2).total_seconds()) + remove_ended_processes(expect_exited_processes=True) + if datetime.now() > last_reminder_dt + timedelta(seconds=5): + last_reminder_dt = datetime.now() + logger.info("Stopping... Minions still running: {n}. Deadline in: {d}.".format(d=deadline_dt - datetime.now(), n=len(processes))) + + def force_kill_minions(): + nonlocal processes for p in processes: logger.warning("Killing minion (PID={pid})".format(pid=p.pid)) os.kill(p.pid, signal.SIGKILL) + processes.remove(p) def queue_exists(queue_name): conn = None @@ -111,22 +117,21 @@ def start(tasks=utils.all_tasks, number_of_processes=None): logger.info("Starting pikatasks worker...") tasks = filter_tasks(tasks=tasks) logger.info("Tasks: {0}".format(repr([t.task_name for t in tasks]))) - processes = list() if not tasks: raise ValueError("Empty task list.") # the main loop (exits with SIGINT) watches worker processes signal_handler = _SignalHandler(logger=logger, this_process_name="master") while not signal_handler.stop_is_requested: - remove_ended_processes(processes, expect_exited_processes=False) + remove_ended_processes(expect_exited_processes=False) while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES): processes.append(create_minion(tasks)) - time.sleep(settings.WORKER_CHECK_SUBPROCESSES_PERIOD.total_seconds()) + time.sleep(MASTER_IPC_PERIOD.total_seconds()) # stopping logger.info("Stopping minions...") - stop_minion(processes) + stop_minions() if processes: logger.error("{n} minions failed to stop gracefully.".format(n=len(processes))) - force_kill_minion(processes) + force_kill_minions() else: logger.info("All minions have stopped gracefully.") logger.info("Stopped pikatasks worker.") @@ -152,7 +157,7 @@ def _task_process(tasks, parent_pid): if stop: channel.stop_consuming() logger.debug("Stopping consuming messages from queues.") - conn.add_timeout(SIGNAL_CHECK_FREQUENCY, control_beat) # run this function again soon + conn.add_timeout(MINION_IPC_PERIOD.total_seconds(), control_beat) # run this function again soon try: logger.debug("Opening a connection...")