From 16c3200cda44cdf779dfdffc3523fd4a99375d9e Mon Sep 17 00:00:00 2001 From: Art Lukyanchyk <artiom.lukyanchyk@hs-hannover.de> Date: Thu, 26 Oct 2017 19:50:07 +0200 Subject: [PATCH] Task subprocesses now stop gracefully when parent process disappears. --- pikatasks/worker.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/pikatasks/worker.py b/pikatasks/worker.py index b8a6188..44d8a55 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -10,7 +10,7 @@ from . import django_compat from datetime import datetime -MSG_SIGINT = "MSG_SIGINT" +MSG_STOP = "MSG_STOP" MSG_CHECK_FREQ = 2 # seconds logger = logging.getLogger("pikatasks.worker") @@ -64,6 +64,7 @@ def _create_worker_process(tasks, control_queue): kwargs=dict( tasks=tasks, control_queue=control_queue, + parent_pid=os.getpid(), ) ) p.start() @@ -75,7 +76,7 @@ def _stop_worker_processes(processes, control_queue): deadline = datetime.now() + settings.WORKER_GRACEFUL_STOP_TIMEOUT while datetime.now() < deadline: while control_queue.qsize() < len(processes) * 2: - control_queue.put(MSG_SIGINT) + control_queue.put(MSG_STOP) time.sleep(1) _remove_ended_processes(processes, expect_exited_processes=True) if processes: @@ -94,7 +95,7 @@ def _start_ignoring_sigint(): signal.signal(signal.SIGINT, signal.SIG_IGN) -def _task_process(tasks, control_queue): +def _task_process(tasks, control_queue, parent_pid): """ This is a single process, that performs tasks. """ _start_ignoring_sigint() # no interruptions in the middle of the task, graceful exit is controlled by the main process own_pid = os.getpid() @@ -103,18 +104,26 @@ def _task_process(tasks, control_queue): assert tasks conn, channel = None, None - def check_control_queue(): + def control_beat(): # this function registers itself to be called again + stop = False + # check whether parent process is alive + if os.getppid() != parent_pid: # got adopted, new owner is probably init + logger.error("Parent process disappeared :( Stopping.") + stop = True try: + # check whether graceful stop is requested msg = control_queue.get_nowait() - if msg == MSG_SIGINT: - subprocess_logger.debug(log_prefix + "Stopping consuming messages from queues.") - channel.stop_consuming() + if msg == MSG_STOP: + stop = True else: subprocess_logger.error(log_prefix + "Don't know what to do with the control message: {msg}".format(msg=msg)) except Empty: pass - conn.add_timeout(MSG_CHECK_FREQ, check_control_queue) # run this function again soon + if stop: + channel.stop_consuming() + subprocess_logger.debug(log_prefix + "Stopping consuming messages from queues.") + conn.add_timeout(MSG_CHECK_FREQ, control_beat) # run this function again soon try: subprocess_logger.debug(log_prefix + "Opening a connection...") @@ -128,7 +137,7 @@ def _task_process(tasks, control_queue): subprocess_logger.debug(log_prefix + "Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) except Exception as e: logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e)) - check_control_queue() # initial + control_beat() # initial channel.start_consuming() channel.close() except Exception as e: -- GitLab