Skip to content
Snippets Groups Projects
Commit 5d66db92 authored by Art's avatar Art :lizard:
Browse files

Add pika 1.0.0b1 compatibility

parent 73c09624
No related branches found
No related tags found
No related merge requests found
...@@ -88,7 +88,12 @@ def rpc(_task_name, **kwargs): ...@@ -88,7 +88,12 @@ def rpc(_task_name, **kwargs):
channel = conn.channel() channel = conn.channel()
channel.confirm_delivery() channel.confirm_delivery()
# start waiting for RPC response (required before sending a request with reply-to) # start waiting for RPC response (required before sending a request with reply-to)
channel.basic_consume(callback_result, queue="amq.rabbitmq.reply-to", no_ack=True) if pika.__version__ >= "1.0":
# multiple breaking changes in version 1.0.0b1
# if this is broken again, also fix the other basic_consume in this project
channel.basic_consume(queue="amq.rabbitmq.reply-to", on_message_callback=callback_result, auto_ack=True)
else:
channel.basic_consume(consumer_callback=callback_result, queue="amq.rabbitmq.reply-to", no_ack=True)
# send a request # send a request
channel.basic_publish( channel.basic_publish(
exchange=settings.CLIENT_EXCHANGE_NAME, exchange=settings.CLIENT_EXCHANGE_NAME,
......
import multiprocessing import multiprocessing
from queue import Empty
from . import settings
from . import utils
import logging import logging
import time import time
import signal import signal
import os import os
from . import django_compat import pika
from pika.exceptions import AMQPChannelError, AMQPConnectionError
from queue import Empty
from datetime import datetime from datetime import datetime
from . import settings
from . import utils
from . import django_compat
MSG_STOP = "MSG_STOP" MSG_STOP = "MSG_STOP"
...@@ -133,8 +135,13 @@ def _task_process(tasks, control_queue, parent_pid): ...@@ -133,8 +135,13 @@ def _task_process(tasks, control_queue, parent_pid):
try: try:
callback = getattr(task, "as_callback", None) callback = getattr(task, "as_callback", None)
assert callback and callable(callback), "Not a valid task: {0}".format(task) assert callback and callable(callback), "Not a valid task: {0}".format(task)
channel.basic_consume(callback, queue=task.task_queue) if pika.__version__ >= "1.0":
subprocess_logger.debug(log_prefix + "Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue)) # multiple breaking changes in version 1.0.0b1
# if this is broken again, also fix the other basic_consume in this project
channel.basic_consume(queue=task.task_queue, on_message_callback=callback)
else:
channel.basic_consume(consumer_callback=callback, queue=task.task_queue)
logger.debug("Registered task {t} on queue {q}".format(t=task.task_name, q=task.task_queue))
except Exception as e: except Exception as e:
logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e)) logger.error("Could not register task {t}. {e.__class__.__name__}: {e}".format(t=task, e=e))
control_beat() # initial control_beat() # initial
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment