From 5d66db928584f77a67096bd8cf1bdd5d50d72f5c Mon Sep 17 00:00:00 2001 From: Art Lukyanchyk <artiom.lukyanchyk@hs-hannover.de> Date: Tue, 20 Feb 2018 14:46:46 +0100 Subject: [PATCH] Add pika 1.0.0b1 compatibility --- pikatasks/__init__.py | 7 ++++++- pikatasks/worker.py | 19 +++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pikatasks/__init__.py b/pikatasks/__init__.py index f2c0112..0e9e138 100644 --- a/pikatasks/__init__.py +++ b/pikatasks/__init__.py @@ -88,7 +88,12 @@ def rpc(_task_name, **kwargs): channel = conn.channel() channel.confirm_delivery() # start waiting for RPC response (required before sending a request with reply-to) - channel.basic_consume(callback_result, queue="amq.rabbitmq.reply-to", no_ack=True) + if pika.__version__ >= "1.0": + # multiple breaking changes in version 1.0.0b1 + # if this is broken again, also fix the other basic_consume in this project + channel.basic_consume(queue="amq.rabbitmq.reply-to", on_message_callback=callback_result, auto_ack=True) + else: + channel.basic_consume(consumer_callback=callback_result, queue="amq.rabbitmq.reply-to", no_ack=True) # send a request channel.basic_publish( exchange=settings.CLIENT_EXCHANGE_NAME, diff --git a/pikatasks/worker.py b/pikatasks/worker.py index 44d8a55..1aca29e 100644 --- a/pikatasks/worker.py +++ b/pikatasks/worker.py @@ -1,13 +1,15 @@ import multiprocessing -from queue import Empty -from . import settings -from . import utils import logging import time import signal import os -from . import django_compat +import pika +from pika.exceptions import AMQPChannelError, AMQPConnectionError +from queue import Empty from datetime import datetime +from . import settings +from . import utils +from . import django_compat MSG_STOP = "MSG_STOP" @@ -133,8 +135,13 @@ def _task_process(tasks, control_queue, parent_pid): try: callback = getattr(task, "as_callback", None) assert callback and callable(callback), "Not a valid task: {0}".format(task) - channel.basic_consume(callback, queue=task.task_queue) - subprocess_logger.debug(log_prefix + "Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) + if pika.__version__ >= "1.0": + # multiple breaking changes in version 1.0.0b1 + # if this is broken again, also fix the other basic_consume in this project + channel.basic_consume(queue=task.task_queue, on_message_callback=callback) + else: + channel.basic_consume(consumer_callback=callback, queue=task.task_queue) + logger.debug("Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) except Exception as e: logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e)) control_beat() # initial -- GitLab