From a6d069766825c4a1cf89884f0dfc9df06c5d38f7 Mon Sep 17 00:00:00 2001
From: Art Lukyanchyk <artiom.lukyanchyk@hs-hannover.de>
Date: Tue, 21 May 2019 18:45:46 +0200
Subject: [PATCH] Add worker_start_with_autoreload(). Woo-fucking-Hoo!

---
 pikatasks/django_compat.py | 24 ++++++++++++++++++++++++
 pikatasks/worker.py        | 32 +++++++++++++++++++++-----------
 2 files changed, 45 insertions(+), 11 deletions(-)

diff --git a/pikatasks/django_compat.py b/pikatasks/django_compat.py
index 3c4d3c1..f321eda 100644
--- a/pikatasks/django_compat.py
+++ b/pikatasks/django_compat.py
@@ -1,6 +1,7 @@
 import importlib
 import itertools
 from . import utils
+from . import worker
 from .utils import logger
 
 try:
@@ -89,3 +90,26 @@ def autodiscover_tasks(apps=None, modules=("tasks",)):
                 raise e
     return utils.known_tasks
 
+
+@requires_django
+def worker_start_with_autoreload(**worker_kwargs):
+    """
+    Neat django autoreload feature for pikatasks worker.
+    :return: raises SystemExit (but you don't want to catch it).
+    """
+    from django.utils import autoreload
+    if not django_conf.settings.DEBUG:
+        raise RuntimeError("Nope.")
+    stop_handler = worker.SwitchStopHandler()
+    try:
+        autoreload.run_with_reloader(
+            main_func=worker.start,
+            worker_stop_handler=stop_handler,
+            **worker_kwargs
+        )
+    except SystemExit as e:
+        if e.code == 3:
+            logger.info("Django autoreloader feels an urge to autoreload. Will cooperate.")
+            stop_handler.stop()
+        raise
+
diff --git a/pikatasks/worker.py b/pikatasks/worker.py
index 1ff26f6..a2ab9e6 100644
--- a/pikatasks/worker.py
+++ b/pikatasks/worker.py
@@ -3,7 +3,6 @@ import logging
 import time
 import signal
 import os
-import pika
 from pika.exceptions import AMQPChannelError, AMQPConnectionError
 from datetime import datetime, timedelta
 from . import settings
@@ -14,11 +13,11 @@ from . import django_compat
 IPC_PERIOD = timedelta(seconds=0.2)  # how often processes check their signals and other processes
 
 
-class _SignalHandler:
-    """ Instance of this class will intercept KILL_SIGNALS. Use instance.kill_is_requested for checks. """
-    STOP_SIGNALS = [signal.SIGTERM, signal.SIGINT]
+class SignalStopHandler:
+    """ Helps processes to determine when to stop using common signals. """
+    SIGNALS = (signal.SIGTERM, signal.SIGINT,)
 
-    def __init__(self, logger, this_process_name, signals=STOP_SIGNALS):
+    def __init__(self, logger, this_process_name, signals=SIGNALS):
         self.stop_is_requested = False  # use this for checks
 
         def signal_callback(signal_num, *args, **kwargs):
@@ -35,11 +34,22 @@ class _SignalHandler:
             signal.signal(s, signal_callback)
 
 
-def start(tasks="all", number_of_processes=None):
+class SwitchStopHandler:
+    """ Tells to stop after its stop() method has been called. """
+
+    def __init__(self):
+        self.stop_is_requested = False
+
+    def stop(self, *args, **kwargs):
+        self.stop_is_requested = True
+
+
+def start(tasks="all", number_of_processes=None, worker_stop_handler=None):
     """
     Use this to launch a worker.
     :param tasks: list of tasks to process (or "all" for all registered + auto-discovered)
     :param number_of_processes: number of worker processes
+    :param stop_signal_handler: this signal handler will be used for the main process
     :return: 
     """
 
@@ -122,9 +132,9 @@ def start(tasks="all", number_of_processes=None):
     logger.info("Tasks: {0}".format(repr([t.task_name for t in tasks])))
     if not tasks:
         raise ValueError("Empty task list.")
-    # the main loop (exits with SIGINT) watches worker processes
-    signal_handler = _SignalHandler(logger=logger, this_process_name="master")
-    while not signal_handler.stop_is_requested:
+    if not worker_stop_handler:
+        worker_stop_handler= SignalStopHandler(logger=logger, this_process_name="master")
+    while not worker_stop_handler.stop_is_requested:
         remove_ended_processes(expect_exited_processes=False)
         while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES):
             processes.append(create_minion(tasks))
@@ -143,7 +153,7 @@ def start(tasks="all", number_of_processes=None):
 def _minion_process(tasks, parent_pid):
     """ This is a single process, that performs tasks. """
     logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid()))
-    signal_handler = _SignalHandler(logger=logger, this_process_name="minion")
+    minion_stop_handler = SignalStopHandler(logger=logger, this_process_name="minion")
     if not tasks:
         raise RuntimeError("Got empty list of tasks")
     conn, channel = None, None
@@ -155,7 +165,7 @@ def _minion_process(tasks, parent_pid):
         if os.getppid() != parent_pid:  # got adopted, new owner is probably init
             logger.error("Master (PID={0}) has disappeared :( Stopping.".format(parent_pid))
             stop = True
-        if signal_handler.stop_is_requested:
+        if minion_stop_handler.stop_is_requested:
             logger.info("Minion (PID={0}) is requested to stop.".format(os.getpid()))
             stop = True
         if stop:
-- 
GitLab