diff --git a/pikatasks/__init__.py b/pikatasks/__init__.py index 10477abda1fcc29d1a2ed91df1b9606bd66a0fbc..b4f5d00a4e9ae277dba5fa288cc36a775d8604d0 100644 --- a/pikatasks/__init__.py +++ b/pikatasks/__init__.py @@ -163,10 +163,10 @@ def task(name): raise TypeError("Bad kwargs type: {0}".format(task_kwargs.__class__.__qualname__)) func_result = func(**task_kwargs) except Exception as e: - ec = e.__class__.__qualname__ - logger.error(traceback.format_exc()) - logger.error("Task {task_name} function raised {ec}: {e}".format(**locals())) - func_error = "Task {task_name} raised {ec} (see worker log for details).".format(**locals()) # sort of anonymized + e_class = e.__class__.__qualname__ + e_details = traceback.format_exc().replace("\n", " -- ") + logger.error("Task {task_name} function raised {e_class}: {e}. {e_details}".format(**locals())) + func_error = "Task {task_name} raised {e_class} (see worker log for details).".format(**locals()) # sort of anonymized if properties.reply_to: try: logger.debug("Sending the result of {task_name} to {properties.reply_to}.".format(**locals())) diff --git a/pikatasks/django_compat.py b/pikatasks/django_compat.py index f321edaead4b716b418c1042547f15d0ff7f2a10..86e54c23ffcd5fe308cd3e49d30dac5ed3698a97 100644 --- a/pikatasks/django_compat.py +++ b/pikatasks/django_compat.py @@ -97,9 +97,9 @@ def worker_start_with_autoreload(**worker_kwargs): Neat django autoreload feature for pikatasks worker. :return: raises SystemExit (but you don't want to catch it). """ - from django.utils import autoreload if not django_conf.settings.DEBUG: raise RuntimeError("Nope.") + from django.utils import autoreload stop_handler = worker.SwitchStopHandler() try: autoreload.run_with_reloader( diff --git a/pikatasks/worker.py b/pikatasks/worker.py index a2ab9e6f4fd32f0e651b23a1334b985eb8283767..c872000c46a03d432a3b7e9162303330fd013d86 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -17,7 +17,7 @@ class SignalStopHandler: """ Helps processes to determine when to stop using common signals. """ SIGNALS = (signal.SIGTERM, signal.SIGINT,) - def __init__(self, logger, this_process_name, signals=SIGNALS): + def __init__(self, logger, this_process_name, stop_on_signals=SIGNALS): self.stop_is_requested = False # use this for checks def signal_callback(signal_num, *args, **kwargs): @@ -30,7 +30,7 @@ class SignalStopHandler: logger.debug("Requested to stop {this_process_name} (PID={pid}) using {signal_name}".format(**locals())) self.stop_is_requested = True - for s in signals: + for s in stop_on_signals: signal.signal(s, signal_callback) @@ -50,19 +50,21 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None): :param tasks: list of tasks to process (or "all" for all registered + auto-discovered) :param number_of_processes: number of worker processes :param stop_signal_handler: this signal handler will be used for the main process - :return: + :return: """ logger = logging.getLogger("pikatasks.worker.master") processes = list() - def remove_ended_processes(expect_exited_processes): + def remove_ended_processes(): nonlocal processes alive = [p for p in processes if p.is_alive()] exited = set(processes) - set(alive) for p in exited: - if not expect_exited_processes: - logger.error("Minion (PID={0}) disappeared unexpectedly.".format(p.pid)) + if p.exitcode == 0: + logger.info("Minion (PID={0}) stopped.".format(p.pid)) + if p.exitcode != 0: + logger.error("Minion (PID={0}) disappeared with exit code {1}.".format(p.pid, p.exitcode)) processes.remove(p) def create_minion(tasks): @@ -87,7 +89,7 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None): for p in processes: os.kill(p.pid, signal.SIGTERM) # SIGTERM = ask minions nicely to stop time.sleep(IPC_PERIOD.total_seconds()) - remove_ended_processes(expect_exited_processes=True) + remove_ended_processes() if datetime.now() > last_reminder_dt + timedelta(seconds=5): # log reminder every 5 seconds last_reminder_dt = datetime.now() logger.info("Stopping... Minions still running: {n}. Deadline in: {d}.".format(d=deadline_dt - datetime.now(), n=len(processes))) @@ -107,7 +109,7 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None): channel.queue_declare(queue=queue_name, passive=True) exists = True except AMQPChannelError as e: - logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__qualname__}: {e}".format(**locals())) + logger.debug("Cannot access queue \"{queue_name}\". {e.__class__.__qualname__}: {e}".format(**locals())) exists = False finally: if conn and conn.is_open: @@ -135,7 +137,7 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None): if not worker_stop_handler: worker_stop_handler= SignalStopHandler(logger=logger, this_process_name="master") while not worker_stop_handler.stop_is_requested: - remove_ended_processes(expect_exited_processes=False) + remove_ended_processes() while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES): processes.append(create_minion(tasks)) time.sleep(IPC_PERIOD.total_seconds())