Skip to content
Snippets Groups Projects
Commit b2203ae4 authored by Art's avatar Art :lizard:
Browse files

Make more sense of timings, plus some little refactoring

That's everything that is left from my attempt to add autoreload :(
parent 5bcf3c33
Branches
No related tags found
No related merge requests found
...@@ -35,7 +35,6 @@ WORKER_GRACEFUL_STOP_TIMEOUT = timedelta(seconds=60) ...@@ -35,7 +35,6 @@ WORKER_GRACEFUL_STOP_TIMEOUT = timedelta(seconds=60)
# stuff you probably don't want to touch: # stuff you probably don't want to touch:
BLOCKED_CONNECTION_TIMEOUT = timedelta(seconds=20) # weird stuff to avoid deadlocks, see pika documentation BLOCKED_CONNECTION_TIMEOUT = timedelta(seconds=20) # weird stuff to avoid deadlocks, see pika documentation
WORKER_CHECK_SUBPROCESSES_PERIOD = timedelta(seconds=2)
# merge these settings with django.conf.settings # merge these settings with django.conf.settings
......
...@@ -6,13 +6,14 @@ import os ...@@ -6,13 +6,14 @@ import os
import pika import pika
from pika.exceptions import AMQPChannelError, AMQPConnectionError from pika.exceptions import AMQPChannelError, AMQPConnectionError
from queue import Empty from queue import Empty
from datetime import datetime from datetime import datetime, timedelta
from . import settings from . import settings
from . import utils from . import utils
from . import django_compat from . import django_compat
SIGNAL_CHECK_FREQUENCY = 2 # seconds MASTER_IPC_PERIOD = timedelta(seconds=0.2) # how often master checks own signals and minion processes
MINION_IPC_PERIOD = timedelta(seconds=0.2) # how often minions check their signals
class _SignalHandler: class _SignalHandler:
...@@ -45,8 +46,10 @@ def start(tasks=utils.all_tasks, number_of_processes=None): ...@@ -45,8 +46,10 @@ def start(tasks=utils.all_tasks, number_of_processes=None):
""" """
logger = logging.getLogger("pikatasks.worker.master") logger = logging.getLogger("pikatasks.worker.master")
processes = list()
def remove_ended_processes(processes, expect_exited_processes): def remove_ended_processes(expect_exited_processes):
nonlocal processes
alive = [p for p in processes if p.is_alive()] alive = [p for p in processes if p.is_alive()]
exited = set(processes) - set(alive) exited = set(processes) - set(alive)
for p in exited: for p in exited:
...@@ -68,22 +71,25 @@ def start(tasks=utils.all_tasks, number_of_processes=None): ...@@ -68,22 +71,25 @@ def start(tasks=utils.all_tasks, number_of_processes=None):
logger.info("Started new minion (PID={0}).".format(p.pid)) logger.info("Started new minion (PID={0}).".format(p.pid))
return p return p
def stop_minion(processes): def stop_minions():
deadline = datetime.now() + settings.WORKER_GRACEFUL_STOP_TIMEOUT nonlocal processes
while datetime.now() < deadline: last_reminder_dt = datetime.now()
deadline_dt = datetime.now() + settings.WORKER_GRACEFUL_STOP_TIMEOUT
while processes and datetime.now() < deadline_dt:
for p in processes: for p in processes:
os.kill(p.pid, signal.SIGTERM) os.kill(p.pid, signal.SIGTERM) # SIGTERM = ask nicely
time.sleep(SIGNAL_CHECK_FREQUENCY) time.sleep((MINION_IPC_PERIOD / 2).total_seconds())
remove_ended_processes(processes, expect_exited_processes=True) remove_ended_processes(expect_exited_processes=True)
if processes: if datetime.now() > last_reminder_dt + timedelta(seconds=5):
logger.info("Stopping... Minions still running: {n}. Deadline in: {d}.".format(d=deadline - datetime.now(), n=len(processes))) last_reminder_dt = datetime.now()
else: logger.info("Stopping... Minions still running: {n}. Deadline in: {d}.".format(d=deadline_dt - datetime.now(), n=len(processes)))
break
def force_kill_minions():
def force_kill_minion(processes): nonlocal processes
for p in processes: for p in processes:
logger.warning("Killing minion (PID={pid})".format(pid=p.pid)) logger.warning("Killing minion (PID={pid})".format(pid=p.pid))
os.kill(p.pid, signal.SIGKILL) os.kill(p.pid, signal.SIGKILL)
processes.remove(p)
def queue_exists(queue_name): def queue_exists(queue_name):
conn = None conn = None
...@@ -111,22 +117,21 @@ def start(tasks=utils.all_tasks, number_of_processes=None): ...@@ -111,22 +117,21 @@ def start(tasks=utils.all_tasks, number_of_processes=None):
logger.info("Starting pikatasks worker...") logger.info("Starting pikatasks worker...")
tasks = filter_tasks(tasks=tasks) tasks = filter_tasks(tasks=tasks)
logger.info("Tasks: {0}".format(repr([t.task_name for t in tasks]))) logger.info("Tasks: {0}".format(repr([t.task_name for t in tasks])))
processes = list()
if not tasks: if not tasks:
raise ValueError("Empty task list.") raise ValueError("Empty task list.")
# the main loop (exits with SIGINT) watches worker processes # the main loop (exits with SIGINT) watches worker processes
signal_handler = _SignalHandler(logger=logger, this_process_name="master") signal_handler = _SignalHandler(logger=logger, this_process_name="master")
while not signal_handler.stop_is_requested: while not signal_handler.stop_is_requested:
remove_ended_processes(processes, expect_exited_processes=False) remove_ended_processes(expect_exited_processes=False)
while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES): while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES):
processes.append(create_minion(tasks)) processes.append(create_minion(tasks))
time.sleep(settings.WORKER_CHECK_SUBPROCESSES_PERIOD.total_seconds()) time.sleep(MASTER_IPC_PERIOD.total_seconds())
# stopping # stopping
logger.info("Stopping minions...") logger.info("Stopping minions...")
stop_minion(processes) stop_minions()
if processes: if processes:
logger.error("{n} minions failed to stop gracefully.".format(n=len(processes))) logger.error("{n} minions failed to stop gracefully.".format(n=len(processes)))
force_kill_minion(processes) force_kill_minions()
else: else:
logger.info("All minions have stopped gracefully.") logger.info("All minions have stopped gracefully.")
logger.info("Stopped pikatasks worker.") logger.info("Stopped pikatasks worker.")
...@@ -152,7 +157,7 @@ def _task_process(tasks, parent_pid): ...@@ -152,7 +157,7 @@ def _task_process(tasks, parent_pid):
if stop: if stop:
channel.stop_consuming() channel.stop_consuming()
logger.debug("Stopping consuming messages from queues.") logger.debug("Stopping consuming messages from queues.")
conn.add_timeout(SIGNAL_CHECK_FREQUENCY, control_beat) # run this function again soon conn.add_timeout(MINION_IPC_PERIOD.total_seconds(), control_beat) # run this function again soon
try: try:
logger.debug("Opening a connection...") logger.debug("Opening a connection...")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment