diff --git a/pikatasks/__init__.py b/pikatasks/__init__.py index f2c0112ca976cc14e2965128dfd1923042f76f2f..0e9e13816767943bcd361a12034a06c4e6c4d1df 100644 --- a/pikatasks/__init__.py +++ b/pikatasks/__init__.py @@ -88,7 +88,12 @@ def rpc(_task_name, **kwargs): channel = conn.channel() channel.confirm_delivery() # start waiting for RPC response (required before sending a request with reply-to) - channel.basic_consume(callback_result, queue="amq.rabbitmq.reply-to", no_ack=True) + if pika.__version__ >= "1.0": + # multiple breaking changes in version 1.0.0b1 + # if this is broken again, also fix the other basic_consume in this project + channel.basic_consume(queue="amq.rabbitmq.reply-to", on_message_callback=callback_result, auto_ack=True) + else: + channel.basic_consume(consumer_callback=callback_result, queue="amq.rabbitmq.reply-to", no_ack=True) # send a request channel.basic_publish( exchange=settings.CLIENT_EXCHANGE_NAME, diff --git a/pikatasks/worker.py b/pikatasks/worker.py index 44d8a555a722fde3cab611dc71755df76d4c59ce..1aca29eb9c017da25990b0012bbe85ffb45e4446 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -1,13 +1,15 @@ import multiprocessing -from queue import Empty -from . import settings -from . import utils import logging import time import signal import os -from . import django_compat +import pika +from pika.exceptions import AMQPChannelError, AMQPConnectionError +from queue import Empty from datetime import datetime +from . import settings +from . import utils +from . import django_compat MSG_STOP = "MSG_STOP" @@ -133,8 +135,13 @@ def _task_process(tasks, control_queue, parent_pid): try: callback = getattr(task, "as_callback", None) assert callback and callable(callback), "Not a valid task: {0}".format(task) - channel.basic_consume(callback, queue=task.task_queue) - subprocess_logger.debug(log_prefix + "Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) + if pika.__version__ >= "1.0": + # multiple breaking changes in version 1.0.0b1 + # if this is broken again, also fix the other basic_consume in this project + channel.basic_consume(queue=task.task_queue, on_message_callback=callback) + else: + channel.basic_consume(consumer_callback=callback, queue=task.task_queue) + logger.debug("Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) except Exception as e: logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e)) control_beat() # initial