Skip to content
Snippets Groups Projects
Commit a6d06976 authored by Art's avatar Art :lizard:
Browse files

Add worker_start_with_autoreload(). Woo-fucking-Hoo!

parent 4ef59600
Branches
Tags
No related merge requests found
import importlib import importlib
import itertools import itertools
from . import utils from . import utils
from . import worker
from .utils import logger from .utils import logger
try: try:
...@@ -89,3 +90,26 @@ def autodiscover_tasks(apps=None, modules=("tasks",)): ...@@ -89,3 +90,26 @@ def autodiscover_tasks(apps=None, modules=("tasks",)):
raise e raise e
return utils.known_tasks return utils.known_tasks
@requires_django
def worker_start_with_autoreload(**worker_kwargs):
"""
Neat django autoreload feature for pikatasks worker.
:return: raises SystemExit (but you don't want to catch it).
"""
from django.utils import autoreload
if not django_conf.settings.DEBUG:
raise RuntimeError("Nope.")
stop_handler = worker.SwitchStopHandler()
try:
autoreload.run_with_reloader(
main_func=worker.start,
worker_stop_handler=stop_handler,
**worker_kwargs
)
except SystemExit as e:
if e.code == 3:
logger.info("Django autoreloader feels an urge to autoreload. Will cooperate.")
stop_handler.stop()
raise
...@@ -3,7 +3,6 @@ import logging ...@@ -3,7 +3,6 @@ import logging
import time import time
import signal import signal
import os import os
import pika
from pika.exceptions import AMQPChannelError, AMQPConnectionError from pika.exceptions import AMQPChannelError, AMQPConnectionError
from datetime import datetime, timedelta from datetime import datetime, timedelta
from . import settings from . import settings
...@@ -14,11 +13,11 @@ from . import django_compat ...@@ -14,11 +13,11 @@ from . import django_compat
IPC_PERIOD = timedelta(seconds=0.2) # how often processes check their signals and other processes IPC_PERIOD = timedelta(seconds=0.2) # how often processes check their signals and other processes
class _SignalHandler: class SignalStopHandler:
""" Instance of this class will intercept KILL_SIGNALS. Use instance.kill_is_requested for checks. """ """ Helps processes to determine when to stop using common signals. """
STOP_SIGNALS = [signal.SIGTERM, signal.SIGINT] SIGNALS = (signal.SIGTERM, signal.SIGINT,)
def __init__(self, logger, this_process_name, signals=STOP_SIGNALS): def __init__(self, logger, this_process_name, signals=SIGNALS):
self.stop_is_requested = False # use this for checks self.stop_is_requested = False # use this for checks
def signal_callback(signal_num, *args, **kwargs): def signal_callback(signal_num, *args, **kwargs):
...@@ -35,11 +34,22 @@ class _SignalHandler: ...@@ -35,11 +34,22 @@ class _SignalHandler:
signal.signal(s, signal_callback) signal.signal(s, signal_callback)
def start(tasks="all", number_of_processes=None): class SwitchStopHandler:
""" Tells to stop after its stop() method has been called. """
def __init__(self):
self.stop_is_requested = False
def stop(self, *args, **kwargs):
self.stop_is_requested = True
def start(tasks="all", number_of_processes=None, worker_stop_handler=None):
""" """
Use this to launch a worker. Use this to launch a worker.
:param tasks: list of tasks to process (or "all" for all registered + auto-discovered) :param tasks: list of tasks to process (or "all" for all registered + auto-discovered)
:param number_of_processes: number of worker processes :param number_of_processes: number of worker processes
:param stop_signal_handler: this signal handler will be used for the main process
:return: :return:
""" """
...@@ -122,9 +132,9 @@ def start(tasks="all", number_of_processes=None): ...@@ -122,9 +132,9 @@ def start(tasks="all", number_of_processes=None):
logger.info("Tasks: {0}".format(repr([t.task_name for t in tasks]))) logger.info("Tasks: {0}".format(repr([t.task_name for t in tasks])))
if not tasks: if not tasks:
raise ValueError("Empty task list.") raise ValueError("Empty task list.")
# the main loop (exits with SIGINT) watches worker processes if not worker_stop_handler:
signal_handler = _SignalHandler(logger=logger, this_process_name="master") worker_stop_handler= SignalStopHandler(logger=logger, this_process_name="master")
while not signal_handler.stop_is_requested: while not worker_stop_handler.stop_is_requested:
remove_ended_processes(expect_exited_processes=False) remove_ended_processes(expect_exited_processes=False)
while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES): while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES):
processes.append(create_minion(tasks)) processes.append(create_minion(tasks))
...@@ -143,7 +153,7 @@ def start(tasks="all", number_of_processes=None): ...@@ -143,7 +153,7 @@ def start(tasks="all", number_of_processes=None):
def _minion_process(tasks, parent_pid): def _minion_process(tasks, parent_pid):
""" This is a single process, that performs tasks. """ """ This is a single process, that performs tasks. """
logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid())) logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid()))
signal_handler = _SignalHandler(logger=logger, this_process_name="minion") minion_stop_handler = SignalStopHandler(logger=logger, this_process_name="minion")
if not tasks: if not tasks:
raise RuntimeError("Got empty list of tasks") raise RuntimeError("Got empty list of tasks")
conn, channel = None, None conn, channel = None, None
...@@ -155,7 +165,7 @@ def _minion_process(tasks, parent_pid): ...@@ -155,7 +165,7 @@ def _minion_process(tasks, parent_pid):
if os.getppid() != parent_pid: # got adopted, new owner is probably init if os.getppid() != parent_pid: # got adopted, new owner is probably init
logger.error("Master (PID={0}) has disappeared :( Stopping.".format(parent_pid)) logger.error("Master (PID={0}) has disappeared :( Stopping.".format(parent_pid))
stop = True stop = True
if signal_handler.stop_is_requested: if minion_stop_handler.stop_is_requested:
logger.info("Minion (PID={0}) is requested to stop.".format(os.getpid())) logger.info("Minion (PID={0}) is requested to stop.".format(os.getpid()))
stop = True stop = True
if stop: if stop:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment