diff --git a/pikatasks/django_compat.py b/pikatasks/django_compat.py index 3c4d3c14d5adc59b4e79b2b7edf2371260e9d808..f321edaead4b716b418c1042547f15d0ff7f2a10 100644 --- a/pikatasks/django_compat.py +++ b/pikatasks/django_compat.py @@ -1,6 +1,7 @@ import importlib import itertools from . import utils +from . import worker from .utils import logger try: @@ -89,3 +90,26 @@ def autodiscover_tasks(apps=None, modules=("tasks",)): raise e return utils.known_tasks + +@requires_django +def worker_start_with_autoreload(**worker_kwargs): + """ + Neat django autoreload feature for pikatasks worker. + :return: raises SystemExit (but you don't want to catch it). + """ + from django.utils import autoreload + if not django_conf.settings.DEBUG: + raise RuntimeError("Nope.") + stop_handler = worker.SwitchStopHandler() + try: + autoreload.run_with_reloader( + main_func=worker.start, + worker_stop_handler=stop_handler, + **worker_kwargs + ) + except SystemExit as e: + if e.code == 3: + logger.info("Django autoreloader feels an urge to autoreload. Will cooperate.") + stop_handler.stop() + raise + diff --git a/pikatasks/worker.py b/pikatasks/worker.py index 1ff26f610ac969e64c8a2c606f3c2cdcdd267f95..a2ab9e6f4fd32f0e651b23a1334b985eb8283767 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -3,7 +3,6 @@ import logging import time import signal import os -import pika from pika.exceptions import AMQPChannelError, AMQPConnectionError from datetime import datetime, timedelta from . import settings @@ -14,11 +13,11 @@ from . import django_compat IPC_PERIOD = timedelta(seconds=0.2) # how often processes check their signals and other processes -class _SignalHandler: - """ Instance of this class will intercept KILL_SIGNALS. Use instance.kill_is_requested for checks. """ - STOP_SIGNALS = [signal.SIGTERM, signal.SIGINT] +class SignalStopHandler: + """ Helps processes to determine when to stop using common signals. """ + SIGNALS = (signal.SIGTERM, signal.SIGINT,) - def __init__(self, logger, this_process_name, signals=STOP_SIGNALS): + def __init__(self, logger, this_process_name, signals=SIGNALS): self.stop_is_requested = False # use this for checks def signal_callback(signal_num, *args, **kwargs): @@ -35,11 +34,22 @@ class _SignalHandler: signal.signal(s, signal_callback) -def start(tasks="all", number_of_processes=None): +class SwitchStopHandler: + """ Tells to stop after its stop() method has been called. """ + + def __init__(self): + self.stop_is_requested = False + + def stop(self, *args, **kwargs): + self.stop_is_requested = True + + +def start(tasks="all", number_of_processes=None, worker_stop_handler=None): """ Use this to launch a worker. :param tasks: list of tasks to process (or "all" for all registered + auto-discovered) :param number_of_processes: number of worker processes + :param stop_signal_handler: this signal handler will be used for the main process :return: """ @@ -122,9 +132,9 @@ def start(tasks="all", number_of_processes=None): logger.info("Tasks: {0}".format(repr([t.task_name for t in tasks]))) if not tasks: raise ValueError("Empty task list.") - # the main loop (exits with SIGINT) watches worker processes - signal_handler = _SignalHandler(logger=logger, this_process_name="master") - while not signal_handler.stop_is_requested: + if not worker_stop_handler: + worker_stop_handler= SignalStopHandler(logger=logger, this_process_name="master") + while not worker_stop_handler.stop_is_requested: remove_ended_processes(expect_exited_processes=False) while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES): processes.append(create_minion(tasks)) @@ -143,7 +153,7 @@ def start(tasks="all", number_of_processes=None): def _minion_process(tasks, parent_pid): """ This is a single process, that performs tasks. """ logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid())) - signal_handler = _SignalHandler(logger=logger, this_process_name="minion") + minion_stop_handler = SignalStopHandler(logger=logger, this_process_name="minion") if not tasks: raise RuntimeError("Got empty list of tasks") conn, channel = None, None @@ -155,7 +165,7 @@ def _minion_process(tasks, parent_pid): if os.getppid() != parent_pid: # got adopted, new owner is probably init logger.error("Master (PID={0}) has disappeared :( Stopping.".format(parent_pid)) stop = True - if signal_handler.stop_is_requested: + if minion_stop_handler.stop_is_requested: logger.info("Minion (PID={0}) is requested to stop.".format(os.getpid())) stop = True if stop: