Skip to content
Snippets Groups Projects
Commit 72ae0ef4 authored by Art's avatar Art :lizard:
Browse files

Now @task can be called as if they were regular functions.

parent c85511ac
No related branches found
No related tags found
No related merge requests found
...@@ -14,6 +14,9 @@ class RPCTimedOut(RPCError): ...@@ -14,6 +14,9 @@ class RPCTimedOut(RPCError):
pass pass
all_tasks = set() # each registered task will show up here
def run(name, **kwargs): def run(name, **kwargs):
""" """
Runs a task remotely. Runs a task remotely.
...@@ -89,14 +92,17 @@ def rpc(name, **kwargs): ...@@ -89,14 +92,17 @@ def rpc(name, **kwargs):
def task(name): def task(name):
""" """
Use this to decorate your tasks. Use this to decorate your tasks.
:param name: name of the task and its queue It doesn't replace the function with a wrapper. Instead, it adds additional properties to the function.
:return: callable object, ready to be consumed by pika's functions like Channel.basic_consume Property .as_callback is a callable ready to be consumed by pika's functions like Channel.basic_consume
:param name: name of the task == name of the queue
""" """
assert isinstance(name, str) assert isinstance(name, str)
def decorator(func): def decorator(func):
""" Creates an actual decorator. """
def wrapper(channel, method, properties, body): def as_callback(channel, method, properties, body):
""" Creates a callback to be used by pika. """
nonlocal name, func nonlocal name, func
func_result, func_error = None, None func_result, func_error = None, None
channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_ack(delivery_tag=method.delivery_tag)
...@@ -124,10 +130,12 @@ def task(name): ...@@ -124,10 +130,12 @@ def task(name):
if func_result: if func_result:
logger.warning("Task {name} returned a result but the client doesn't want to receive it.".format(name=name)) logger.warning("Task {name} returned a result but the client doesn't want to receive it.".format(name=name))
wrapper.is_pikatasks_task = True func.as_callback = as_callback
wrapper.task_name = name func.task_name = name
wrapper.task_queue = name func.task_queue = name
return wrapper global all_tasks
all_tasks.add(func)
return func
return decorator return decorator
...@@ -33,13 +33,16 @@ WORKER_CHECK_SUBPROCESSES_PERIOD = timedelta(seconds=2) ...@@ -33,13 +33,16 @@ WORKER_CHECK_SUBPROCESSES_PERIOD = timedelta(seconds=2)
# merge these settings with django.conf.settings # merge these settings with django.conf.settings
try: try:
from django.conf import settings as django_settings from django.conf import settings as django_settings
from django.core.exceptions import ImproperlyConfigured
for k in list(globals().keys()): # list() prevents errors on changes for k in list(globals().keys()): # list() prevents errors on changes
if k.isupper() and not k.startswith("_"): # looks like a setting if k.isupper() and not k.startswith("_"): # looks like a setting
try: try:
new_value = getattr(django_settings, "PIKATASKS_" + k) new_value = getattr(django_settings, "PIKATASKS_" + k)
globals()[k] = new_value globals()[k] = new_value
except ImproperlyConfigured:
pass # django is installed but not used
except AttributeError: except AttributeError:
pass pass # django is installed and used, but the setting is not present
except ImportError: except ImportError:
pass # no Django pass # no django
...@@ -121,9 +121,10 @@ def _task_process(tasks, control_queue): ...@@ -121,9 +121,10 @@ def _task_process(tasks, control_queue):
with utils.get_connection() as conn: with utils.get_connection() as conn:
channel = conn.channel() channel = conn.channel()
for task in tasks: for task in tasks:
assert getattr(task, "is_pikatasks_task", False) and callable(task), "Not a task: {0}".format(task) callback = getattr(task, "as_callback", None)
assert callback and callable(callback), "Not a valid task: {0}".format(task)
subprocess_logger.debug(log_prefix + "Registering task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) subprocess_logger.debug(log_prefix + "Registering task {t} on queue {q}".format(t=task.task_name, q=task.task_queue))
channel.basic_consume(task, queue=task.task_queue) channel.basic_consume(callback, queue=task.task_queue)
check_control_queue() check_control_queue()
channel.start_consuming() channel.start_consuming()
channel.close() channel.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment