Skip to content
Snippets Groups Projects
Commit 16c3200c authored by Art's avatar Art :lizard:
Browse files

Task subprocesses now stop gracefully when parent process disappears.

parent ada432a9
No related branches found
No related tags found
No related merge requests found
...@@ -10,7 +10,7 @@ from . import django_compat ...@@ -10,7 +10,7 @@ from . import django_compat
from datetime import datetime from datetime import datetime
MSG_SIGINT = "MSG_SIGINT" MSG_STOP = "MSG_STOP"
MSG_CHECK_FREQ = 2 # seconds MSG_CHECK_FREQ = 2 # seconds
logger = logging.getLogger("pikatasks.worker") logger = logging.getLogger("pikatasks.worker")
...@@ -64,6 +64,7 @@ def _create_worker_process(tasks, control_queue): ...@@ -64,6 +64,7 @@ def _create_worker_process(tasks, control_queue):
kwargs=dict( kwargs=dict(
tasks=tasks, tasks=tasks,
control_queue=control_queue, control_queue=control_queue,
parent_pid=os.getpid(),
) )
) )
p.start() p.start()
...@@ -75,7 +76,7 @@ def _stop_worker_processes(processes, control_queue): ...@@ -75,7 +76,7 @@ def _stop_worker_processes(processes, control_queue):
deadline = datetime.now() + settings.WORKER_GRACEFUL_STOP_TIMEOUT deadline = datetime.now() + settings.WORKER_GRACEFUL_STOP_TIMEOUT
while datetime.now() < deadline: while datetime.now() < deadline:
while control_queue.qsize() < len(processes) * 2: while control_queue.qsize() < len(processes) * 2:
control_queue.put(MSG_SIGINT) control_queue.put(MSG_STOP)
time.sleep(1) time.sleep(1)
_remove_ended_processes(processes, expect_exited_processes=True) _remove_ended_processes(processes, expect_exited_processes=True)
if processes: if processes:
...@@ -94,7 +95,7 @@ def _start_ignoring_sigint(): ...@@ -94,7 +95,7 @@ def _start_ignoring_sigint():
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
def _task_process(tasks, control_queue): def _task_process(tasks, control_queue, parent_pid):
""" This is a single process, that performs tasks. """ """ This is a single process, that performs tasks. """
_start_ignoring_sigint() # no interruptions in the middle of the task, graceful exit is controlled by the main process _start_ignoring_sigint() # no interruptions in the middle of the task, graceful exit is controlled by the main process
own_pid = os.getpid() own_pid = os.getpid()
...@@ -103,18 +104,26 @@ def _task_process(tasks, control_queue): ...@@ -103,18 +104,26 @@ def _task_process(tasks, control_queue):
assert tasks assert tasks
conn, channel = None, None conn, channel = None, None
def check_control_queue(): def control_beat():
# this function registers itself to be called again # this function registers itself to be called again
stop = False
# check whether parent process is alive
if os.getppid() != parent_pid: # got adopted, new owner is probably init
logger.error("Parent process disappeared :( Stopping.")
stop = True
try: try:
# check whether graceful stop is requested
msg = control_queue.get_nowait() msg = control_queue.get_nowait()
if msg == MSG_SIGINT: if msg == MSG_STOP:
subprocess_logger.debug(log_prefix + "Stopping consuming messages from queues.") stop = True
channel.stop_consuming()
else: else:
subprocess_logger.error(log_prefix + "Don't know what to do with the control message: {msg}".format(msg=msg)) subprocess_logger.error(log_prefix + "Don't know what to do with the control message: {msg}".format(msg=msg))
except Empty: except Empty:
pass pass
conn.add_timeout(MSG_CHECK_FREQ, check_control_queue) # run this function again soon if stop:
channel.stop_consuming()
subprocess_logger.debug(log_prefix + "Stopping consuming messages from queues.")
conn.add_timeout(MSG_CHECK_FREQ, control_beat) # run this function again soon
try: try:
subprocess_logger.debug(log_prefix + "Opening a connection...") subprocess_logger.debug(log_prefix + "Opening a connection...")
...@@ -128,7 +137,7 @@ def _task_process(tasks, control_queue): ...@@ -128,7 +137,7 @@ def _task_process(tasks, control_queue):
subprocess_logger.debug(log_prefix + "Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) subprocess_logger.debug(log_prefix + "Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue))
except Exception as e: except Exception as e:
logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e)) logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e))
check_control_queue() # initial control_beat() # initial
channel.start_consuming() channel.start_consuming()
channel.close() channel.close()
except Exception as e: except Exception as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment