Skip to content
Snippets Groups Projects
Commit 0f3f3234 authored by Art's avatar Art :lizard:
Browse files

Many small improvements, that should not affect functionality

  - replace most assertions with raising exceptions
  - replace most cls.__name__ with cls.__qualname__
  - fix several minor bugs
  - improve some error texts and comments
parent b2203ae4
No related branches found
No related tags found
No related merge requests found
...@@ -51,7 +51,7 @@ def run(_task_name, **kwargs): ...@@ -51,7 +51,7 @@ def run(_task_name, **kwargs):
) )
except Exception as e: except Exception as e:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error("{}: {}".format(e.__class__.__name__, str(e))) logger.error("{}: {}".format(e.__class__.__qualname__, str(e)))
raise RPCMessageQueueError(e) raise RPCMessageQueueError(e)
...@@ -112,7 +112,7 @@ def rpc(_task_name, **kwargs): ...@@ -112,7 +112,7 @@ def rpc(_task_name, **kwargs):
channel.close() # after stopped consuming by either of the callbacks channel.close() # after stopped consuming by either of the callbacks
except Exception as e: except Exception as e:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error("{}: {}".format(e.__class__.__name__, str(e))) logger.error("{}: {}".format(e.__class__.__qualname__, str(e)))
exception = RPCMessageQueueError(e) exception = RPCMessageQueueError(e)
# done, return the result or indicate a problem # done, return the result or indicate a problem
if exception: if exception:
...@@ -128,20 +128,22 @@ def task(name): ...@@ -128,20 +128,22 @@ def task(name):
Property .as_callback is a callable ready to be consumed by pika's functions like Channel.basic_consume Property .as_callback is a callable ready to be consumed by pika's functions like Channel.basic_consume
""" """
if isinstance(name, str): if isinstance(name, str):
# used with an argument: @task("task_name")
func = None func = None
task_name = name task_name = name
elif isinstance(name, types.FunctionType): elif isinstance(name, types.FunctionType):
# used without an argument: @task
func = name func = name
task_name = func.__name__ task_name = func.__name__
else: else:
raise AssertionError("Bad arguments for the @task decorator") raise TypeError("Cannot decorate this: {0}".format(repr(name)))
def decorator(func): def decorator(func):
""" Creates an actual decorator. """ """ Creates an actual decorator. """
def as_callback(channel, method, properties, body): def as_callback(channel, method, properties, body):
""" """
Creates a callback to be used by pika. Creates a callback that will be invoked by pika.
More info: http://pika.readthedocs.io/en/0.10.0/modules/channel.html#pika.channel.Channel.basic_consume More info: http://pika.readthedocs.io/en/0.10.0/modules/channel.html#pika.channel.Channel.basic_consume
""" """
nonlocal task_name, func nonlocal task_name, func
...@@ -153,10 +155,11 @@ def task(name): ...@@ -153,10 +155,11 @@ def task(name):
django_compat.check_fix_db_connection() django_compat.check_fix_db_connection()
try: try:
task_kwargs = utils.deserialize(body) task_kwargs = utils.deserialize(body)
assert isinstance(task_kwargs, dict), "Bad task kwargs." if not isinstance(task_kwargs, dict):
raise TypeError("Bad kwargs type: {0}".format(task_kwargs.__class__.__qualname__))
func_result = func(**task_kwargs) func_result = func(**task_kwargs)
except Exception as e: except Exception as e:
ec = e.__class__.__name__ ec = e.__class__.__qualname__
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
logger.error("Task {task_name} function raised {ec}: {e}".format(**locals())) logger.error("Task {task_name} function raised {ec}: {e}".format(**locals()))
func_error = "Task {task_name} raised {ec} (see worker log for details).".format(**locals()) # sort of anonymized func_error = "Task {task_name} raised {ec} (see worker log for details).".format(**locals()) # sort of anonymized
......
...@@ -3,11 +3,12 @@ import importlib ...@@ -3,11 +3,12 @@ import importlib
from . import utils from . import utils
try: try:
import django
from django import db as django_db from django import db as django_db
from django import conf as django_conf from django import conf as django_conf
DJANGO = True DJANGO = django.VERSION
except ImportError: except ImportError:
DJANGO = False DJANGO = None
logger = logging.getLogger("pika-tasks") logger = logging.getLogger("pika-tasks")
...@@ -26,17 +27,14 @@ def close_db_connections(): ...@@ -26,17 +27,14 @@ def close_db_connections():
try: try:
django_db.connections.close_all() django_db.connections.close_all()
except Exception as e: except Exception as e:
logger.warning("Failed to close django db connections: {e.__class__.__name__}: {e}".format(e=e)) logger.warning("Failed to close django db connections: {e.__class__.__qualname__}: {e}".format(e=e))
def check_worker_db_settings(): def check_worker_db_settings():
assert DJANGO assert DJANGO
try:
t = int(django_conf.settings.CONN_MAX_AGE) t = int(django_conf.settings.CONN_MAX_AGE)
assert t, "disabled: {t}".format(t=t) if not t or t > 20 * 60:
assert not (t > 20 * 60), "way too high value: {t}".format(t=t) raise ValueError("When using django, CONN_MAX_AGE must be set to a sane value. The current value: {t} seconds.".format(t=t))
except Exception as e:
logger.error("When using django on the worker side, CONN_MAX_AGE is totally absolutely necessary. {e.__class__.__name__}: {e}".format(e=e))
def check_fix_db_connection(): def check_fix_db_connection():
......
...@@ -24,11 +24,11 @@ def deserialize(binary): ...@@ -24,11 +24,11 @@ def deserialize(binary):
def serialize_datetime(dt): def serialize_datetime(dt):
assert isinstance(dt, datetime)
return datetime.strftime(dt, DATETIME_FORMAT) return datetime.strftime(dt, DATETIME_FORMAT)
def deserialize_datetime(text): def deserialize_datetime(text):
""" Throws ValueError when fails to parse the text """
return datetime.strptime(text, DATETIME_FORMAT) return datetime.strptime(text, DATETIME_FORMAT)
...@@ -50,8 +50,7 @@ def get_pika_connection_parameters(): ...@@ -50,8 +50,7 @@ def get_pika_connection_parameters():
username=settings.USERNAME, username=settings.USERNAME,
password=settings.PASSWORD password=settings.PASSWORD
), ),
# TODO: causes a warning when closing connections blocked_connection_timeout=settings.BLOCKED_CONNECTION_TIMEOUT.total_seconds(), # sometimes causes warnings with duplicate listeners?...
blocked_connection_timeout=settings.BLOCKED_CONNECTION_TIMEOUT.total_seconds(),
ssl_options=get_ssl_options(settings) if settings.SSL_ENABLED else None, ssl_options=get_ssl_options(settings) if settings.SSL_ENABLED else None,
) )
......
...@@ -5,7 +5,6 @@ import signal ...@@ -5,7 +5,6 @@ import signal
import os import os
import pika import pika
from pika.exceptions import AMQPChannelError, AMQPConnectionError from pika.exceptions import AMQPChannelError, AMQPConnectionError
from queue import Empty
from datetime import datetime, timedelta from datetime import datetime, timedelta
from . import settings from . import settings
from . import utils from . import utils
...@@ -99,7 +98,7 @@ def start(tasks=utils.all_tasks, number_of_processes=None): ...@@ -99,7 +98,7 @@ def start(tasks=utils.all_tasks, number_of_processes=None):
channel.queue_declare(queue=queue_name, passive=True) channel.queue_declare(queue=queue_name, passive=True)
exists = True exists = True
except AMQPChannelError as e: except AMQPChannelError as e:
logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__name__}: {e}".format(**locals())) logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__qualname__}: {e}".format(**locals()))
exists = False exists = False
finally: finally:
if conn and conn.is_open: if conn and conn.is_open:
...@@ -141,7 +140,8 @@ def _task_process(tasks, parent_pid): ...@@ -141,7 +140,8 @@ def _task_process(tasks, parent_pid):
""" This is a single process, that performs tasks. """ """ This is a single process, that performs tasks. """
logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid())) logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid()))
signal_handler = _SignalHandler(logger=logger, this_process_name="minion") signal_handler = _SignalHandler(logger=logger, this_process_name="minion")
assert tasks if not tasks:
raise RuntimeError("Got empty list of tasks")
conn, channel = None, None conn, channel = None, None
def control_beat(): def control_beat():
...@@ -166,7 +166,8 @@ def _task_process(tasks, parent_pid): ...@@ -166,7 +166,8 @@ def _task_process(tasks, parent_pid):
for task in tasks: for task in tasks:
try: try:
callback = getattr(task, "as_callback", None) callback = getattr(task, "as_callback", None)
assert callback and callable(callback), "Not a valid task: {0}".format(task) if not callback or not callable(callback):
raise ValueError("Not a valid task: {0}".format(task))
if pika.__version__ >= "1.0": if pika.__version__ >= "1.0":
# multiple breaking changes in version 1.0.0b1 # multiple breaking changes in version 1.0.0b1
# if this is broken again, also fix the other basic_consume in this project # if this is broken again, also fix the other basic_consume in this project
...@@ -175,13 +176,13 @@ def _task_process(tasks, parent_pid): ...@@ -175,13 +176,13 @@ def _task_process(tasks, parent_pid):
channel.basic_consume(consumer_callback=callback, queue=task.task_queue) channel.basic_consume(consumer_callback=callback, queue=task.task_queue)
logger.debug("Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) logger.debug("Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue))
except Exception as e: except Exception as e:
logger.error("Could not register task \"{t}\". {e.__class__.__name__}: {e}".format(t=task.task_name, e=e)) logger.error("Could not register task \"{t}\". {e.__class__.__qualname__}: {e}".format(t=task.task_name, e=e))
if isinstance(e, (AMQPChannelError, AMQPConnectionError,)): if isinstance(e, (AMQPChannelError, AMQPConnectionError,)):
raise e # does not make sense to try registering other tasks raise e # does not make sense to try registering other tasks
control_beat() # initial control_beat() # initial
channel.start_consuming() channel.start_consuming()
except Exception as e: except Exception as e:
logger.error("{e.__class__.__name__}: {e}".format(e=e)) logger.error("{e.__class__.__qualname__}: {e}".format(e=e))
finally: finally:
if channel and channel.is_open: if channel and channel.is_open:
logger.info("Closing the channel: {0}".format(channel)) logger.info("Closing the channel: {0}".format(channel))
......
...@@ -13,7 +13,7 @@ setup( ...@@ -13,7 +13,7 @@ setup(
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
license='BSD', license='BSD',
description='Minimal RPC with pika. Skookum as frig.', description='Minimal RPC with pika, 100.02% skookum.',
long_description=README, long_description=README,
url='https://lab.it.hs-hannover.de/django/pikatasks', url='https://lab.it.hs-hannover.de/django/pikatasks',
author='Art Lukyanchyk', author='Art Lukyanchyk',
...@@ -28,11 +28,9 @@ setup( ...@@ -28,11 +28,9 @@ setup(
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License', 'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent', 'Operating System :: OS Independent',
'Programming Language :: Python', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Topic :: System :: Networking', 'Topic :: System :: Networking',
'Topic :: Software Development :: Libraries', 'Topic :: Software Development :: Libraries',
], ],
) )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment