From 0f3f32348c0c047d30f01342f643e5fc9fd04573 Mon Sep 17 00:00:00 2001 From: Art Lukyanchyk <artiom.lukyanchyk@hs-hannover.de> Date: Thu, 12 Jul 2018 13:14:52 +0200 Subject: [PATCH] Many small improvements, that should not affect functionality - replace most assertions with raising exceptions - replace most cls.__name__ with cls.__qualname__ - fix several minor bugs - improve some error texts and comments --- pikatasks/__init__.py | 15 +++++++++------ pikatasks/django_compat.py | 16 +++++++--------- pikatasks/utils.py | 5 ++--- pikatasks/worker.py | 13 +++++++------ setup.py | 8 +++----- 5 files changed, 28 insertions(+), 29 deletions(-) diff --git a/pikatasks/__init__.py b/pikatasks/__init__.py index 99f7153..b4548ce 100644 --- a/pikatasks/__init__.py +++ b/pikatasks/__init__.py @@ -51,7 +51,7 @@ def run(_task_name, **kwargs): ) except Exception as e: logger.debug(traceback.format_exc()) - logger.error("{}: {}".format(e.__class__.__name__, str(e))) + logger.error("{}: {}".format(e.__class__.__qualname__, str(e))) raise RPCMessageQueueError(e) @@ -112,7 +112,7 @@ def rpc(_task_name, **kwargs): channel.close() # after stopped consuming by either of the callbacks except Exception as e: logger.debug(traceback.format_exc()) - logger.error("{}: {}".format(e.__class__.__name__, str(e))) + logger.error("{}: {}".format(e.__class__.__qualname__, str(e))) exception = RPCMessageQueueError(e) # done, return the result or indicate a problem if exception: @@ -128,20 +128,22 @@ def task(name): Property .as_callback is a callable ready to be consumed by pika's functions like Channel.basic_consume """ if isinstance(name, str): + # used with an argument: @task("task_name") func = None task_name = name elif isinstance(name, types.FunctionType): + # used without an argument: @task func = name task_name = func.__name__ else: - raise AssertionError("Bad arguments for the @task decorator") + raise TypeError("Cannot decorate this: {0}".format(repr(name))) def decorator(func): """ Creates an actual decorator. """ def as_callback(channel, method, properties, body): """ - Creates a callback to be used by pika. + Creates a callback that will be invoked by pika. More info: http://pika.readthedocs.io/en/0.10.0/modules/channel.html#pika.channel.Channel.basic_consume """ nonlocal task_name, func @@ -153,10 +155,11 @@ def task(name): django_compat.check_fix_db_connection() try: task_kwargs = utils.deserialize(body) - assert isinstance(task_kwargs, dict), "Bad task kwargs." + if not isinstance(task_kwargs, dict): + raise TypeError("Bad kwargs type: {0}".format(task_kwargs.__class__.__qualname__)) func_result = func(**task_kwargs) except Exception as e: - ec = e.__class__.__name__ + ec = e.__class__.__qualname__ logger.error(traceback.format_exc()) logger.error("Task {task_name} function raised {ec}: {e}".format(**locals())) func_error = "Task {task_name} raised {ec} (see worker log for details).".format(**locals()) # sort of anonymized diff --git a/pikatasks/django_compat.py b/pikatasks/django_compat.py index c5dcdce..521a657 100644 --- a/pikatasks/django_compat.py +++ b/pikatasks/django_compat.py @@ -3,11 +3,12 @@ import importlib from . import utils try: + import django from django import db as django_db from django import conf as django_conf - DJANGO = True + DJANGO = django.VERSION except ImportError: - DJANGO = False + DJANGO = None logger = logging.getLogger("pika-tasks") @@ -26,17 +27,14 @@ def close_db_connections(): try: django_db.connections.close_all() except Exception as e: - logger.warning("Failed to close django db connections: {e.__class__.__name__}: {e}".format(e=e)) + logger.warning("Failed to close django db connections: {e.__class__.__qualname__}: {e}".format(e=e)) def check_worker_db_settings(): assert DJANGO - try: - t = int(django_conf.settings.CONN_MAX_AGE) - assert t, "disabled: {t}".format(t=t) - assert not (t > 20 * 60), "way too high value: {t}".format(t=t) - except Exception as e: - logger.error("When using django on the worker side, CONN_MAX_AGE is totally absolutely necessary. {e.__class__.__name__}: {e}".format(e=e)) + t = int(django_conf.settings.CONN_MAX_AGE) + if not t or t > 20 * 60: + raise ValueError("When using django, CONN_MAX_AGE must be set to a sane value. The current value: {t} seconds.".format(t=t)) def check_fix_db_connection(): diff --git a/pikatasks/utils.py b/pikatasks/utils.py index 33cc9df..7f978da 100644 --- a/pikatasks/utils.py +++ b/pikatasks/utils.py @@ -24,11 +24,11 @@ def deserialize(binary): def serialize_datetime(dt): - assert isinstance(dt, datetime) return datetime.strftime(dt, DATETIME_FORMAT) def deserialize_datetime(text): + """ Throws ValueError when fails to parse the text """ return datetime.strptime(text, DATETIME_FORMAT) @@ -50,8 +50,7 @@ def get_pika_connection_parameters(): username=settings.USERNAME, password=settings.PASSWORD ), - # TODO: causes a warning when closing connections - blocked_connection_timeout=settings.BLOCKED_CONNECTION_TIMEOUT.total_seconds(), + blocked_connection_timeout=settings.BLOCKED_CONNECTION_TIMEOUT.total_seconds(), # sometimes causes warnings with duplicate listeners?... ssl_options=get_ssl_options(settings) if settings.SSL_ENABLED else None, ) diff --git a/pikatasks/worker.py b/pikatasks/worker.py index 0f7e28c..f595f1b 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -5,7 +5,6 @@ import signal import os import pika from pika.exceptions import AMQPChannelError, AMQPConnectionError -from queue import Empty from datetime import datetime, timedelta from . import settings from . import utils @@ -99,7 +98,7 @@ def start(tasks=utils.all_tasks, number_of_processes=None): channel.queue_declare(queue=queue_name, passive=True) exists = True except AMQPChannelError as e: - logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__name__}: {e}".format(**locals())) + logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__qualname__}: {e}".format(**locals())) exists = False finally: if conn and conn.is_open: @@ -141,7 +140,8 @@ def _task_process(tasks, parent_pid): """ This is a single process, that performs tasks. """ logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid())) signal_handler = _SignalHandler(logger=logger, this_process_name="minion") - assert tasks + if not tasks: + raise RuntimeError("Got empty list of tasks") conn, channel = None, None def control_beat(): @@ -166,7 +166,8 @@ def _task_process(tasks, parent_pid): for task in tasks: try: callback = getattr(task, "as_callback", None) - assert callback and callable(callback), "Not a valid task: {0}".format(task) + if not callback or not callable(callback): + raise ValueError("Not a valid task: {0}".format(task)) if pika.__version__ >= "1.0": # multiple breaking changes in version 1.0.0b1 # if this is broken again, also fix the other basic_consume in this project @@ -175,13 +176,13 @@ def _task_process(tasks, parent_pid): channel.basic_consume(consumer_callback=callback, queue=task.task_queue) logger.debug("Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) except Exception as e: - logger.error("Could not register task \"{t}\". {e.__class__.__name__}: {e}".format(t=task.task_name, e=e)) + logger.error("Could not register task \"{t}\". {e.__class__.__qualname__}: {e}".format(t=task.task_name, e=e)) if isinstance(e, (AMQPChannelError, AMQPConnectionError,)): raise e # does not make sense to try registering other tasks control_beat() # initial channel.start_consuming() except Exception as e: - logger.error("{e.__class__.__name__}: {e}".format(e=e)) + logger.error("{e.__class__.__qualname__}: {e}".format(e=e)) finally: if channel and channel.is_open: logger.info("Closing the channel: {0}".format(channel)) diff --git a/setup.py b/setup.py index a95a261..cf46bf5 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( packages=find_packages(), include_package_data=True, license='BSD', - description='Minimal RPC with pika. Skookum as frig.', + description='Minimal RPC with pika, 100.02% skookum.', long_description=README, url='https://lab.it.hs-hannover.de/django/pikatasks', author='Art Lukyanchyk', @@ -28,11 +28,9 @@ setup( 'Intended Audience :: Developers', 'License :: OSI Approved :: BSD License', 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3.4', - 'Topic :: System :: Networking', + 'Programming Language :: Python :: 3', + 'Topic :: System :: Networking', 'Topic :: Software Development :: Libraries', ], ) - -- GitLab