Skip to content
Snippets Groups Projects
Commit 5b6ba1c6 authored by Art's avatar Art :lizard:
Browse files

Minor tweaks

parent a6d06976
No related branches found
No related tags found
No related merge requests found
......@@ -163,10 +163,10 @@ def task(name):
raise TypeError("Bad kwargs type: {0}".format(task_kwargs.__class__.__qualname__))
func_result = func(**task_kwargs)
except Exception as e:
ec = e.__class__.__qualname__
logger.error(traceback.format_exc())
logger.error("Task {task_name} function raised {ec}: {e}".format(**locals()))
func_error = "Task {task_name} raised {ec} (see worker log for details).".format(**locals()) # sort of anonymized
e_class = e.__class__.__qualname__
e_details = traceback.format_exc().replace("\n", " -- ")
logger.error("Task {task_name} function raised {e_class}: {e}. {e_details}".format(**locals()))
func_error = "Task {task_name} raised {e_class} (see worker log for details).".format(**locals()) # sort of anonymized
if properties.reply_to:
try:
logger.debug("Sending the result of {task_name} to {properties.reply_to}.".format(**locals()))
......
......@@ -97,9 +97,9 @@ def worker_start_with_autoreload(**worker_kwargs):
Neat django autoreload feature for pikatasks worker.
:return: raises SystemExit (but you don't want to catch it).
"""
from django.utils import autoreload
if not django_conf.settings.DEBUG:
raise RuntimeError("Nope.")
from django.utils import autoreload
stop_handler = worker.SwitchStopHandler()
try:
autoreload.run_with_reloader(
......
......@@ -17,7 +17,7 @@ class SignalStopHandler:
""" Helps processes to determine when to stop using common signals. """
SIGNALS = (signal.SIGTERM, signal.SIGINT,)
def __init__(self, logger, this_process_name, signals=SIGNALS):
def __init__(self, logger, this_process_name, stop_on_signals=SIGNALS):
self.stop_is_requested = False # use this for checks
def signal_callback(signal_num, *args, **kwargs):
......@@ -30,7 +30,7 @@ class SignalStopHandler:
logger.debug("Requested to stop {this_process_name} (PID={pid}) using {signal_name}".format(**locals()))
self.stop_is_requested = True
for s in signals:
for s in stop_on_signals:
signal.signal(s, signal_callback)
......@@ -56,13 +56,15 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None):
logger = logging.getLogger("pikatasks.worker.master")
processes = list()
def remove_ended_processes(expect_exited_processes):
def remove_ended_processes():
nonlocal processes
alive = [p for p in processes if p.is_alive()]
exited = set(processes) - set(alive)
for p in exited:
if not expect_exited_processes:
logger.error("Minion (PID={0}) disappeared unexpectedly.".format(p.pid))
if p.exitcode == 0:
logger.info("Minion (PID={0}) stopped.".format(p.pid))
if p.exitcode != 0:
logger.error("Minion (PID={0}) disappeared with exit code {1}.".format(p.pid, p.exitcode))
processes.remove(p)
def create_minion(tasks):
......@@ -87,7 +89,7 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None):
for p in processes:
os.kill(p.pid, signal.SIGTERM) # SIGTERM = ask minions nicely to stop
time.sleep(IPC_PERIOD.total_seconds())
remove_ended_processes(expect_exited_processes=True)
remove_ended_processes()
if datetime.now() > last_reminder_dt + timedelta(seconds=5): # log reminder every 5 seconds
last_reminder_dt = datetime.now()
logger.info("Stopping... Minions still running: {n}. Deadline in: {d}.".format(d=deadline_dt - datetime.now(), n=len(processes)))
......@@ -107,7 +109,7 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None):
channel.queue_declare(queue=queue_name, passive=True)
exists = True
except AMQPChannelError as e:
logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__qualname__}: {e}".format(**locals()))
logger.debug("Cannot access queue \"{queue_name}\". {e.__class__.__qualname__}: {e}".format(**locals()))
exists = False
finally:
if conn and conn.is_open:
......@@ -135,7 +137,7 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None):
if not worker_stop_handler:
worker_stop_handler= SignalStopHandler(logger=logger, this_process_name="master")
while not worker_stop_handler.stop_is_requested:
remove_ended_processes(expect_exited_processes=False)
remove_ended_processes()
while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES):
processes.append(create_minion(tasks))
time.sleep(IPC_PERIOD.total_seconds())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment