diff --git a/pikatasks/__init__.py b/pikatasks/__init__.py index 99f7153d6b83a6e0d864c046a25511dd9ad0b8a9..b4548cee0e719631dfd24aeec9c5b05d2d1855ae 100644 --- a/pikatasks/__init__.py +++ b/pikatasks/__init__.py @@ -51,7 +51,7 @@ def run(_task_name, **kwargs): ) except Exception as e: logger.debug(traceback.format_exc()) - logger.error("{}: {}".format(e.__class__.__name__, str(e))) + logger.error("{}: {}".format(e.__class__.__qualname__, str(e))) raise RPCMessageQueueError(e) @@ -112,7 +112,7 @@ def rpc(_task_name, **kwargs): channel.close() # after stopped consuming by either of the callbacks except Exception as e: logger.debug(traceback.format_exc()) - logger.error("{}: {}".format(e.__class__.__name__, str(e))) + logger.error("{}: {}".format(e.__class__.__qualname__, str(e))) exception = RPCMessageQueueError(e) # done, return the result or indicate a problem if exception: @@ -128,20 +128,22 @@ def task(name): Property .as_callback is a callable ready to be consumed by pika's functions like Channel.basic_consume """ if isinstance(name, str): + # used with an argument: @task("task_name") func = None task_name = name elif isinstance(name, types.FunctionType): + # used without an argument: @task func = name task_name = func.__name__ else: - raise AssertionError("Bad arguments for the @task decorator") + raise TypeError("Cannot decorate this: {0}".format(repr(name))) def decorator(func): """ Creates an actual decorator. """ def as_callback(channel, method, properties, body): """ - Creates a callback to be used by pika. + Creates a callback that will be invoked by pika. More info: http://pika.readthedocs.io/en/0.10.0/modules/channel.html#pika.channel.Channel.basic_consume """ nonlocal task_name, func @@ -153,10 +155,11 @@ def task(name): django_compat.check_fix_db_connection() try: task_kwargs = utils.deserialize(body) - assert isinstance(task_kwargs, dict), "Bad task kwargs." + if not isinstance(task_kwargs, dict): + raise TypeError("Bad kwargs type: {0}".format(task_kwargs.__class__.__qualname__)) func_result = func(**task_kwargs) except Exception as e: - ec = e.__class__.__name__ + ec = e.__class__.__qualname__ logger.error(traceback.format_exc()) logger.error("Task {task_name} function raised {ec}: {e}".format(**locals())) func_error = "Task {task_name} raised {ec} (see worker log for details).".format(**locals()) # sort of anonymized diff --git a/pikatasks/django_compat.py b/pikatasks/django_compat.py index c5dcdcedc01c085e1909d3d8a570ada4ff16d6dd..521a657862396fcb6853b113e2b002f6d11041c3 100644 --- a/pikatasks/django_compat.py +++ b/pikatasks/django_compat.py @@ -3,11 +3,12 @@ import importlib from . import utils try: + import django from django import db as django_db from django import conf as django_conf - DJANGO = True + DJANGO = django.VERSION except ImportError: - DJANGO = False + DJANGO = None logger = logging.getLogger("pika-tasks") @@ -26,17 +27,14 @@ def close_db_connections(): try: django_db.connections.close_all() except Exception as e: - logger.warning("Failed to close django db connections: {e.__class__.__name__}: {e}".format(e=e)) + logger.warning("Failed to close django db connections: {e.__class__.__qualname__}: {e}".format(e=e)) def check_worker_db_settings(): assert DJANGO - try: - t = int(django_conf.settings.CONN_MAX_AGE) - assert t, "disabled: {t}".format(t=t) - assert not (t > 20 * 60), "way too high value: {t}".format(t=t) - except Exception as e: - logger.error("When using django on the worker side, CONN_MAX_AGE is totally absolutely necessary. {e.__class__.__name__}: {e}".format(e=e)) + t = int(django_conf.settings.CONN_MAX_AGE) + if not t or t > 20 * 60: + raise ValueError("When using django, CONN_MAX_AGE must be set to a sane value. The current value: {t} seconds.".format(t=t)) def check_fix_db_connection(): diff --git a/pikatasks/utils.py b/pikatasks/utils.py index 33cc9df8a494e4732b0cd6dd97388d70d062a6bf..7f978da994f7c823f7fce374c2fcce85bdbdece5 100644 --- a/pikatasks/utils.py +++ b/pikatasks/utils.py @@ -24,11 +24,11 @@ def deserialize(binary): def serialize_datetime(dt): - assert isinstance(dt, datetime) return datetime.strftime(dt, DATETIME_FORMAT) def deserialize_datetime(text): + """ Throws ValueError when fails to parse the text """ return datetime.strptime(text, DATETIME_FORMAT) @@ -50,8 +50,7 @@ def get_pika_connection_parameters(): username=settings.USERNAME, password=settings.PASSWORD ), - # TODO: causes a warning when closing connections - blocked_connection_timeout=settings.BLOCKED_CONNECTION_TIMEOUT.total_seconds(), + blocked_connection_timeout=settings.BLOCKED_CONNECTION_TIMEOUT.total_seconds(), # sometimes causes warnings with duplicate listeners?... ssl_options=get_ssl_options(settings) if settings.SSL_ENABLED else None, ) diff --git a/pikatasks/worker.py b/pikatasks/worker.py index 0f7e28c0c352fe365ca6a93761330a27362d5686..f595f1bfcd3613cd6dd471cbea51c0e9713a097e 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -5,7 +5,6 @@ import signal import os import pika from pika.exceptions import AMQPChannelError, AMQPConnectionError -from queue import Empty from datetime import datetime, timedelta from . import settings from . import utils @@ -99,7 +98,7 @@ def start(tasks=utils.all_tasks, number_of_processes=None): channel.queue_declare(queue=queue_name, passive=True) exists = True except AMQPChannelError as e: - logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__name__}: {e}".format(**locals())) + logger.warning("Cannot access queue \"{queue_name}\". {e.__class__.__qualname__}: {e}".format(**locals())) exists = False finally: if conn and conn.is_open: @@ -141,7 +140,8 @@ def _task_process(tasks, parent_pid): """ This is a single process, that performs tasks. """ logger = logging.getLogger("pikatasks.worker.minion.pid{0}".format(os.getpid())) signal_handler = _SignalHandler(logger=logger, this_process_name="minion") - assert tasks + if not tasks: + raise RuntimeError("Got empty list of tasks") conn, channel = None, None def control_beat(): @@ -166,7 +166,8 @@ def _task_process(tasks, parent_pid): for task in tasks: try: callback = getattr(task, "as_callback", None) - assert callback and callable(callback), "Not a valid task: {0}".format(task) + if not callback or not callable(callback): + raise ValueError("Not a valid task: {0}".format(task)) if pika.__version__ >= "1.0": # multiple breaking changes in version 1.0.0b1 # if this is broken again, also fix the other basic_consume in this project @@ -175,13 +176,13 @@ def _task_process(tasks, parent_pid): channel.basic_consume(consumer_callback=callback, queue=task.task_queue) logger.debug("Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) except Exception as e: - logger.error("Could not register task \"{t}\". {e.__class__.__name__}: {e}".format(t=task.task_name, e=e)) + logger.error("Could not register task \"{t}\". {e.__class__.__qualname__}: {e}".format(t=task.task_name, e=e)) if isinstance(e, (AMQPChannelError, AMQPConnectionError,)): raise e # does not make sense to try registering other tasks control_beat() # initial channel.start_consuming() except Exception as e: - logger.error("{e.__class__.__name__}: {e}".format(e=e)) + logger.error("{e.__class__.__qualname__}: {e}".format(e=e)) finally: if channel and channel.is_open: logger.info("Closing the channel: {0}".format(channel)) diff --git a/setup.py b/setup.py index a95a261ab9684168f9a2738544db1f59a64a714f..cf46bf555e150e7115bc73f1e9eb1d370ee5115f 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( packages=find_packages(), include_package_data=True, license='BSD', - description='Minimal RPC with pika. Skookum as frig.', + description='Minimal RPC with pika, 100.02% skookum.', long_description=README, url='https://lab.it.hs-hannover.de/django/pikatasks', author='Art Lukyanchyk', @@ -28,11 +28,9 @@ setup( 'Intended Audience :: Developers', 'License :: OSI Approved :: BSD License', 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3.4', - 'Topic :: System :: Networking', + 'Programming Language :: Python :: 3', + 'Topic :: System :: Networking', 'Topic :: Software Development :: Libraries', ], ) -