Skip to content
Snippets Groups Projects
Commit 2d13d6b0 authored by Art's avatar Art :lizard:
Browse files

Improve handling of exceptions and other nonroutine situations.

parent 72ae0ef4
No related branches found
No related tags found
No related merge requests found
......@@ -2,15 +2,29 @@ import pika
import traceback
from . import utils
from . import settings
from . import django_compat
from . import worker # keep it here for pikatasks.worker.start() being possible
from .utils import logger
import datetime
class RPCError(Exception):
""" Something RPC-related went wrong. """
pass
class RPCRemoteError(RPCError):
""" Something went wrong on the remote worker. """
pass
class RPCTimedOut(RPCError):
""" RPC timed out. The task might or might not be executed remotely. """
pass
class RPCMessageQueueError(RPCError):
""" Problems with connection, AMQ auth, etc... """
pass
......@@ -24,10 +38,18 @@ def run(name, **kwargs):
:param kwargs: kwargs for the task
:return: None
"""
try:
with utils.get_connection() as conn:
channel = conn.channel()
channel.confirm_delivery()
channel.basic_publish(exchange=settings.CLIENT_EXCHANGE_NAME, routing_key=name, body=utils.serialize(kwargs), properties=pika.BasicProperties())
channel.basic_publish(
exchange=settings.CLIENT_EXCHANGE_NAME,
routing_key=name,
body=utils.serialize(kwargs),
properties=pika.BasicProperties()
)
except Exception as e:
raise RPCMessageQueueError(e)
def rpc(name, **kwargs):
......@@ -48,7 +70,7 @@ def rpc(name, **kwargs):
reply = utils.deserialize(body)
reply_result, reply_error = reply.get("result"), reply.get("error")
if reply_error:
exception = RPCError(reply_error)
exception = RPCRemoteError(reply_error)
else:
result = reply_result
except Exception as e:
......@@ -58,7 +80,7 @@ def rpc(name, **kwargs):
""" run by pika simple timer """
nonlocal exception
channel.stop_consuming()
exception = RPCTimedOut("RPC timed out. Channel: {0}".format(channel))
exception = RPCTimedOut("RPC timed out. Task: {t}. Channel: {c}".format(t=name, c=channel))
try:
with utils.get_connection() as conn:
......@@ -81,7 +103,7 @@ def rpc(name, **kwargs):
channel.start_consuming()
channel.close() # after stopped consuming by either of the callbacks
except Exception as e:
exception = RPCError("Failed to run task {name}: {e.__class__.__name__}: {e}".format(**locals()))
exception = RPCMessageQueueError(e)
# done, return the result or indicate a problem
if exception:
raise exception
......@@ -104,9 +126,12 @@ def task(name):
def as_callback(channel, method, properties, body):
""" Creates a callback to be used by pika. """
nonlocal name, func
task_started_time = datetime.datetime.utcnow()
func_result, func_error = None, None
channel.basic_ack(delivery_tag=method.delivery_tag)
logger.debug("Received task {name}".format(**locals())) # don't log the body, private data
if django_compat.DJANGO:
django_compat.check_fix_db_connection()
try:
task_kwargs = utils.deserialize(body)
assert isinstance(task_kwargs, dict), "Bad task kwargs."
......@@ -128,7 +153,8 @@ def task(name):
logger.error("Could not reply to the {properties.reply_to}. {e.__class__.__name__}: {e}".format(**locals()))
else:
if func_result:
logger.warning("Task {name} returned a result but the client doesn't want to receive it.".format(name=name))
logger.warning("Task {name} returned a result but the client doesn't want to receive it.".format(**locals()))
logger.info("Finished task {name} in {t}.".format(name=name, t=datetime.datetime.utcnow() - task_started_time))
func.as_callback = as_callback
func.task_name = name
......
......@@ -15,8 +15,13 @@ def close_db_connections():
"""
Closes all Django db connections.
This must be done before forking task processes.
See:
- https://code.djangoproject.com/ticket/20562
- https://code.djangoproject.com/ticket/15802
"""
if DJANGO:
logger.debug("Closing django db connections.")
check_worker_db_settings()
try:
django_db.connections.close_all()
except Exception as e:
......@@ -25,14 +30,37 @@ def close_db_connections():
logger.debug("No django, no db connections to close.")
def update_settings_from_django_settings():
def check_worker_db_settings():
assert DJANGO
try:
t = int(django_conf.settings.CONN_MAX_AGE)
assert t, "disabled: {t}".format(t=t)
assert not (t > 20 * 60), "way too high value: {t}".format(t=t)
except Exception as e:
logger.error("When using django on the worker side, CONN_MAX_AGE is totally absolutely necessary. {e.__class__.__name__}: {e}".format(e=e))
def check_fix_db_connection():
"""
Configure pikatasks from django.conf.settings, so you can easily keep all your stuff in one place.
I leave multiple options here to help solving possible future issues.
This should fix OperationalError when nothing helps (starting with CONN_MAX_AGE).
Theis function has to be run *before* executing *each* task.
"""
if not DJANGO:
return
try:
assert DJANGO
# Option 1:
django_db.close_old_connections()
# # Option 2 (If Option 1 does not help):
# for name in django_db.connections:
# conn = django_db.connections[name]
# try:
# cursor = conn.cursor() # test
# except django_db.OperationalError: # probably closed
# conn.close() # let django reopen it if needed
# # Option 3 (If Option 2 does not help):
# django_db.connections.close_all()
pass
except Exception as e:
logger.warning("Failed to configure pikatasks from django.conf.settings")
......@@ -6,7 +6,7 @@ import logging
import time
import signal
import os
from . import django as django_support
from . import django_compat
from datetime import datetime
......@@ -58,7 +58,7 @@ def _remove_ended_processes(processes, expect_exited_processes=False):
def _create_worker_process(tasks, control_queue):
django_support.close_db_connections()
django_compat.close_db_connections()
p = multiprocessing.Process(
target=_task_process,
kwargs=dict(
......@@ -121,11 +121,14 @@ def _task_process(tasks, control_queue):
with utils.get_connection() as conn:
channel = conn.channel()
for task in tasks:
try:
callback = getattr(task, "as_callback", None)
assert callback and callable(callback), "Not a valid task: {0}".format(task)
subprocess_logger.debug(log_prefix + "Registering task {t} on queue {q}".format(t=task.task_name, q=task.task_queue))
channel.basic_consume(callback, queue=task.task_queue)
check_control_queue()
subprocess_logger.debug(log_prefix + "Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue))
except Exception as e:
logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e))
check_control_queue() # initial
channel.start_consuming()
channel.close()
except Exception as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment