diff --git a/pikatasks/worker.py b/pikatasks/worker.py index c872000c46a03d432a3b7e9162303330fd013d86..f370054a7afc1d3204d4efafb220be475ae569df 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -11,6 +11,7 @@ from . import django_compat IPC_PERIOD = timedelta(seconds=0.2) # how often processes check their signals and other processes +RESPAWN_DEAD_MINIONS_DELAY = timedelta(seconds=10) # avoid converting CPU time directly into gigabytes of logs when minions die on start (network failures etc...) class SignalStopHandler: @@ -54,6 +55,7 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None): """ logger = logging.getLogger("pikatasks.worker.master") + number_of_processes = number_of_processes or settings.WORKER_TASK_PROCESSES processes = list() def remove_ended_processes(): @@ -135,11 +137,14 @@ def start(tasks="all", number_of_processes=None, worker_stop_handler=None): if not tasks: raise ValueError("Empty task list.") if not worker_stop_handler: - worker_stop_handler= SignalStopHandler(logger=logger, this_process_name="master") + worker_stop_handler = SignalStopHandler(logger=logger, this_process_name="master") + minions_respawned_at = datetime.utcnow() - RESPAWN_DEAD_MINIONS_DELAY * 2 while not worker_stop_handler.stop_is_requested: remove_ended_processes() - while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES): - processes.append(create_minion(tasks)) + if datetime.utcnow() > minions_respawned_at + RESPAWN_DEAD_MINIONS_DELAY: + while len(processes) < number_of_processes: + processes.append(create_minion(tasks)) + minions_respawned_at = datetime.utcnow() time.sleep(IPC_PERIOD.total_seconds()) # stopping logger.info("Stopping minions...")