Skip to content
Snippets Groups Projects
Commit c9622caa authored by Art's avatar Art :lizard:
Browse files

Initial commit.

parents
Branches
No related tags found
No related merge requests found
*.pyc
*~
.idea/
build/
*.egg-info/
dist/
Art Lukyanchyk <artiom.lukyanchyk@hs-hannover.de>
LICENSE 0 → 100644
Copyright (c) Hochschule Hannover and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Hochschule Hannover nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# pikatasks
**pikatasks** is a library that allows you to run remote tasks easily.
## Requirements
- `pip install pika`
- RabbitMQ as message broker
## How-to
##### Import:
```python
import pikatasks
```
##### Configure:
```python
pikatasks.settings.BROKER = "localhost"
pikatasks.settings.VIRTUAL_HOST = "test"
pikatasks.settings.USERNAME = "lancelot"
pikatasks.settings.PASSWORD = "swalloWcoc0nut"
```
##### Implement a task (server):
```python
@pikatasks.task(name="hello")
def hello(something):
msg = "Hello, " + something + "!"
print(msg)
return msg
```
Note: you will need a queue with exactly the same name as the task. See section: Queues and Permissions.
##### Start a server:
```python
pikatasks.worker.start(tasks=[hello])
```
##### Run a task (client):
To simply run a task:
```python
pikatasks.run("hello", something="World")
```
Run a task and get its reult:
```python
result = pikatasks.rpc("hello", something="World")
print(result)
# >>> "Hello, World!"
```
##### Catch exceptions:
if a task raises an exception on the server, `pikatask.rpc()` call on the client will also raise an exception. Full exception message is not sent for security/isolation reasons.
```python
try:
pikatasks.run("hello", something=42)
except pikatasks.RPCError as e:
print(e)
# >>> Task hello raised TypeError (see worker log for details).
# Note: TypeError was raised when the server was running: "Hello, " + 42 + "!"
```
## Queues and Permissions
##### Queues and exchanges:
With AMQ, messages first arrive to `exchanges`, then broker distributes them to to `queues` using `routing keys`. If you are not sure what it is all about, read [this tutorial](https://www.rabbitmq.com/tutorials/tutorial-four-python.html) first and further RabbitMQ documentation if needed.
##### Queues and Tasks:
* ***pikatasks*** requires a separate queue for each task.
* `queue name == task name`
* You need to create these queues by yourself.
##### Developent setup:
You are done after creating queues for each of your tasks. Don't need anything else for the development. Note: exchange `amq.default` will be used.
##### Client:
* Create a new exchange for your client. Let's call it `client-out`, and its type should be `direct`. This exchange will be used for sending tasks.
* Decide which tasks should the client use. Let's say these are `task1` and `task2` (you should have the corresponding queues already).
* For each of the tasks, create a new binding for the exchange `client-out`, with `routing key == queue name == task name`
* e.g. `exchange = client-out`, `routing key = task1`, `queue = task1`
* User permissions:
* Configure: `^$` or an empty string (both mean no permissions)
* Write: `^client-out$` (replace with the name of your exchange)
* Read: `^$` or an empty string (no read permissions, RPC results/replies will still work)
##### Worker:
* User permissions:
* Configure: `^$` or an empty string (no config permissions)
* Write: `.*` or `^amq.default$` (this is still in the TODO)
* Read: `^(task1|task2)$`, replace `taskN` with whatever your task names are
import pika
import traceback
from . import utils
from . import settings
from . import worker # keep it here for pikatasks.worker.start() being possible
from .utils import logger
class RPCError(Exception):
pass
class RPCTimedOut(RPCError):
pass
def run(name, **kwargs):
"""
Runs a task remotely.
:param name: name of the task to run
:param kwargs: kwargs for the task
:return: None
"""
with utils.get_connection() as conn:
channel = conn.channel()
channel.confirm_delivery()
channel.basic_publish(exchange=settings.CLIENT_EXCHANGE_NAME, routing_key=name, body=utils.serialize(kwargs), properties=pika.BasicProperties())
def rpc(name, **kwargs):
"""
Runs a task remotely and returns its result.
Raises RPCError if there were problems.
:param name: name of the task to run
:param kwargs: kwargs for the task
:return: whatever the task returned
"""
result, exception = None, None # results of the RPC call, shared with the callback functions
def callback_result(channel, method, properties, body):
""" run by pika when a result has beed received """
nonlocal result, exception
channel.stop_consuming()
try:
reply = utils.deserialize(body)
reply_result, reply_error = reply.get("result"), reply.get("error")
if reply_error:
exception = RPCError(reply_error)
else:
result = reply_result
except Exception as e:
exception = RPCError("RPC got bad result: {0}".format(repr(body)))
def callback_timeout():
""" run by pika simple timer """
nonlocal exception
channel.stop_consuming()
exception = RPCTimedOut("RPC timed out. Channel: {0}".format(channel))
try:
with utils.get_connection() as conn:
channel = conn.channel()
channel.confirm_delivery()
# start waiting for RPC response (required before sending a request with reply-to)
channel.basic_consume(callback_result, queue="amq.rabbitmq.reply-to", no_ack=True)
# send a request
channel.basic_publish(exchange=settings.CLIENT_EXCHANGE_NAME, routing_key=name, body=utils.serialize(kwargs),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
# ask to run the timeout function soon
conn.add_timeout(settings.RPC_TIMEOUT, callback_timeout)
# consume, which will be stopped by either callback
channel.start_consuming()
channel.close()
except Exception as e:
exception = RPCError("Failed to run task {name}: {e.__class__.__name__}: {e}".format(**locals()))
# done, return the result or indicate a problem
if exception:
raise exception
else:
return result
def task(name):
"""
Use this to decorate your tasks.
:param queue: queue name, defaults to the task name
:return: callable object, that can be used by various pika consumption methods
"""
assert isinstance(name, str)
def decorator(func):
def wrapper(channel, method, properties, body):
nonlocal name, func
func_result, func_error = None, None
channel.basic_ack(delivery_tag=method.delivery_tag)
logger.debug("Received task {name}".format(**locals())) # don't log the body, private data
try:
task_kwargs = utils.deserialize(body)
assert isinstance(task_kwargs, dict), "Bad task kwargs."
func_result = func(**task_kwargs)
except Exception as e:
ec = e.__class__.__name__
logger.error(traceback.format_exc())
logger.error("Task {name} function raised {ec}: {e}".format(**locals()))
func_error = "Task {name} raised {ec} (see worker log for details).".format(**locals()) # sort of anonymized
if properties.reply_to:
try:
logger.debug("Sending the result of {name} to {properties.reply_to}.".format(**locals()))
reply = {"error": func_error} if func_error else {"result": func_result}
channel.basic_publish(exchange=settings.WORKER_REPLYTO_EXCHANGE, routing_key=properties.reply_to, body=utils.serialize(reply))
except Exception as e:
logger.error("Could not reply to the {properties.reply_to}. {e.__class__.__name__}: {e}".format(**locals()))
else:
if func_result:
logger.warning("Task {name} returned a result but the client doesn't want to receive it.".format(name=name))
wrapper.is_pikatasks_task = True
wrapper.task_name = name
wrapper.task_queue = name
return wrapper
return decorator
import logging
try:
from django import db as django_db
from django import conf as django_conf
DJANGO = True
except ImportError:
DJANGO = False
logger = logging.getLogger("pika-tasks")
def close_db_connections():
"""
Closes all Django db connections.
This must be done before forking task processes.
"""
if DJANGO:
try:
django_db.connections.close_all()
except Exception as e:
logger.warning("Failed to close django db connections: {e.__class__.__name__}: {e}".format(e=e))
else:
logger.debug("No django, no db connections to close.")
def settings_from_django():
"""
Configure pikatasks from django.conf.settings, so you can easily keep all your stuff in one place.
"""
if not DJANGO:
return
raise NotImplementedError()
"""
If you want to override some settings:
import pikatasks
pikatasks.settings.RPC_TIMEOUT = 5
"""
# stuff you want to change:
BROKER = "localhost"
VIRTUAL_HOST = "vhost"
USERNAME = "user"
PASSWORD = "password"
CLIENT_EXCHANGE_NAME = "" # empty string -> amq.default exchange
WORKER_TASK_PROCESSES = 10 # this many processes will be executing tasks
# stuff you might want to change sometimes:
RPC_TIMEOUT = 10 # seconds, how long wil an rpc call wait
WORKER_GRACEFUL_STOP_TIMEOUT = 60 # seconds
WORKER_REPLYTO_EXCHANGE = ""
# stuff you probably don't want to touch:
BLOCKED_CONNECTION_TIMEOUT = 20 # seconds, weird stuff to avoid deadlocks, see pika documentation
WORKER_CHECK_SUBPROCESSES_PERIOD = 2 # seconds
import json
import pika
import logging
from . import settings
logger = logging.getLogger("pika-tasks")
def serialize(stuff):
return json.dumps(stuff).encode("utf-8")
def deserialize(binary):
return json.loads(binary.decode("utf-8"))
def get_pika_connection_parameters():
return pika.ConnectionParameters(
host=settings.BROKER,
virtual_host=settings.VIRTUAL_HOST,
credentials=pika.PlainCredentials(username=settings.USERNAME, password=settings.PASSWORD),
blocked_connection_timeout=settings.BLOCKED_CONNECTION_TIMEOUT,
)
def get_connection():
return pika.BlockingConnection(get_pika_connection_parameters())
import multiprocessing
from queue import Empty
from . import settings
from . import utils
import logging
import time
import signal
import os
from . import django as django_support
MSG_SIGINT = "MSG_SIGINT"
MSG_CHECK_FREQ = 2 # seconds
logger = logging.getLogger("pikatasks.worker")
def start(tasks, number_of_processes=settings.WORKER_TASK_PROCESSES):
"""
Use this to launch a worker.
:param tasks: list of tasks to process
:param number_of_processes: number of worker processes
:return:
"""
logger.info("Starting pikatasks worker...")
processes = list()
control_queue = multiprocessing.Queue()
# the main loop (exits with SIGINT) watches the worker processes
try:
while True:
_remove_ended_processes(processes)
while len(processes) < number_of_processes:
processes.append(_create_worker_process(tasks, control_queue))
time.sleep(settings.WORKER_CHECK_SUBPROCESSES_PERIOD)
except KeyboardInterrupt:
_start_ignoring_sigint() # in case user gets impatient and continues slamming ctrl+c
logger.info("Received SIGINT")
# stopping
logger.info("Stopping worker processes...")
_stop_worker_processes(processes, control_queue)
if processes:
logger.error("{n} worker processes failed to stop gracefully.".format(n=len(processes)))
_terminate_worker_processes(processes)
else:
logger.info("All worker processes have stopped gracefully.")
logger.info("Stopped pikatasks worker.")
def _remove_ended_processes(processes, expect_exited_processes=False):
alive = [p for p in processes if p.is_alive()]
exited = set(processes) - set(alive)
for p in exited:
if not expect_exited_processes:
logger.error("Process {0} exited unexpectedly.".format(p.pid))
processes.remove(p)
def _create_worker_process(tasks, control_queue):
django_support.close_db_connections()
p = multiprocessing.Process(
target=_task_process,
kwargs=dict(
tasks=tasks,
control_queue=control_queue,
)
)
p.start()
logger.info("Started new worker process (PID={0}).".format(p.pid))
return p
def _stop_worker_processes(processes, control_queue):
for countdown in reversed(range(settings.WORKER_GRACEFUL_STOP_TIMEOUT)):
while control_queue.qsize() < len(processes) * 2:
control_queue.put(MSG_SIGINT)
time.sleep(1)
_remove_ended_processes(processes, expect_exited_processes=True)
if processes:
logger.info("Stopping worker processes ({c}). Processes still running: {n}".format(c=countdown, n=len(processes)))
else:
break
def _terminate_worker_processes(processes):
for p in processes:
logger.warning("Killing worker process {pid}".format(pid=p.pid))
p.terminate()
def _start_ignoring_sigint():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _task_process(tasks, control_queue):
""" This is a single process, that performs tasks. """
_start_ignoring_sigint() # no interruptions in the middle of the task, graceful exit is controlled by the main process
own_pid = os.getpid()
subprocess_logger = logging.getLogger("pikatasks.worker.subprocess_{0}".format(own_pid))
log_prefix = "(task process PID={0}) ".format(own_pid)
assert tasks, "Received empty task list."
conn, channel = None, None
def check_control_queue():
# this function registers itself to be called again
try:
msg = control_queue.get_nowait()
if msg == MSG_SIGINT:
subprocess_logger.debug(log_prefix + "Stopping consuming messages from queues.")
channel.stop_consuming()
else:
subprocess_logger.error(log_prefix + "Don't know what to do with the control message: {msg}".format(msg=msg))
except Empty:
pass
conn.add_timeout(MSG_CHECK_FREQ, check_control_queue) # run this function again soon
subprocess_logger.debug(log_prefix + "Opening a connection...")
with utils.get_connection() as conn:
channel = conn.channel()
for task in tasks:
assert getattr(task, "is_pikatasks_task", False) and callable(task), "Not a task: {0}".format(task)
subprocess_logger.debug(log_prefix + "Registering task {t} on queue {q}".format(t=task.task_name, q=task.task_queue))
channel.basic_consume(task, queue=task.task_queue)
check_control_queue()
channel.start_consuming()
channel.close()
subprocess_logger.debug(log_prefix + "Stopped.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment