Skip to content
Snippets Groups Projects
Commit c85511ac authored by Art's avatar Art :lizard:
Browse files

Add Django config support and error handling in task subprocesses.

parent 8781070c
Branches
No related tags found
No related merge requests found
# pikatasks
**pikatasks** is a library that allows you to run remote tasks easily.
**pikatasks** is a minimalistic library that allows you to run remote tasks easily. There's also a Django integration.
## Requirements
......@@ -17,11 +17,17 @@ import pikatasks
##### Configure:
```python
pikatasks.settings.BROKER = "localhost"
pikatasks.settings.VIRTUAL_HOST = "test"
pikatasks.settings.VIRTUAL_HOST = "foo"
pikatasks.settings.USERNAME = "lancelot"
pikatasks.settings.PASSWORD = "swalloWcoc0nut"
```
Or in Django settings:
```python
PIKATASKS_BROKER = "localhost"
PIKATASKS_VIRTUAL_HOST = "foo"
PIKATASKS_USERNAME = "lancelot"
PIKATASKS_PASSWORD = "swalloWcoc0nut"
```
##### Implement a task (server):
```python
@pikatasks.task(name="hello")
......@@ -80,13 +86,13 @@ You are done after creating queues for each of your tasks. Don't need anything e
* For each of the tasks, create a new binding for the exchange `client-out`, with `routing key == queue name == task name`
* e.g. `exchange = client-out`, `routing key = task1`, `queue = task1`
* User permissions:
* Configure: `^$` or an empty string (both mean no permissions)
* Configure: empty string (no config permissions)
* Write: `^client-out$` (replace with the name of your exchange)
* Read: `^$` or an empty string (no read permissions, RPC results/replies will still work)
* Read: empty string (no read permissions, RPC results/replies will still work)
##### Worker:
* User permissions:
* Configure: `^$` or an empty string (no config permissions)
* Write: `.*` or `^amq.default$` (this is still in the TODO)
* Configure: empty string (no config permissions)
* Write: `.*` (everything) or `^amq.default$` (`amq.default` is required to send "direct reply-to")
* Read: `^(task1|task2)$`, replace `taskN` with whatever your task names are
......@@ -25,11 +25,14 @@ def close_db_connections():
logger.debug("No django, no db connections to close.")
def settings_from_django():
def update_settings_from_django_settings():
"""
Configure pikatasks from django.conf.settings, so you can easily keep all your stuff in one place.
"""
if not DJANGO:
return
raise NotImplementedError()
try:
pass
except Exception as e:
logger.warning("Failed to configure pikatasks from django.conf.settings")
"""
If you want to override some settings:
Change settings:
- Django:
PIKATASKS_BROKER = "localhost" # prefix the setting name with "PIKATASKS_"
- Non-Django:
import pikatasks
pikatasks.settings.RPC_TIMEOUT = 5
pikatasks.settings.BROKER = "localhost"
"""
from datetime import timedelta
......@@ -26,3 +29,17 @@ WORKER_GRACEFUL_STOP_TIMEOUT = timedelta(seconds=60)
BLOCKED_CONNECTION_TIMEOUT = timedelta(seconds=20) # weird stuff to avoid deadlocks, see pika documentation
WORKER_CHECK_SUBPROCESSES_PERIOD = timedelta(seconds=2)
# merge these settings with django.conf.settings
try:
from django.conf import settings as django_settings
for k in list(globals().keys()): # list() prevents errors on changes
if k.isupper() and not k.startswith("_"): # looks like a setting
try:
new_value = getattr(django_settings, "PIKATASKS_" + k)
globals()[k] = new_value
except AttributeError:
pass
except ImportError:
pass # no Django
......@@ -16,7 +16,7 @@ MSG_CHECK_FREQ = 2 # seconds
logger = logging.getLogger("pikatasks.worker")
def start(tasks, number_of_processes=settings.WORKER_TASK_PROCESSES):
def start(tasks, number_of_processes=None):
"""
Use this to launch a worker.
:param tasks: list of tasks to process
......@@ -26,11 +26,12 @@ def start(tasks, number_of_processes=settings.WORKER_TASK_PROCESSES):
logger.info("Starting pikatasks worker...")
processes = list()
control_queue = multiprocessing.Queue()
# the main loop (exits with SIGINT) watches the worker processes
assert tasks, "Received empty task list."
# the main loop (exits with SIGINT) watches worker processes
try:
while True:
_remove_ended_processes(processes)
while len(processes) < number_of_processes:
while len(processes) < (number_of_processes or settings.WORKER_TASK_PROCESSES):
processes.append(_create_worker_process(tasks, control_queue))
time.sleep(settings.WORKER_CHECK_SUBPROCESSES_PERIOD.total_seconds())
except KeyboardInterrupt:
......@@ -98,8 +99,8 @@ def _task_process(tasks, control_queue):
_start_ignoring_sigint() # no interruptions in the middle of the task, graceful exit is controlled by the main process
own_pid = os.getpid()
subprocess_logger = logging.getLogger("pikatasks.worker.subprocess_{0}".format(own_pid))
log_prefix = "(task process PID={0}) ".format(own_pid)
assert tasks, "Received empty task list."
log_prefix = "(pikatasks subprocess PID={0}) ".format(own_pid)
assert tasks
conn, channel = None, None
def check_control_queue():
......@@ -115,6 +116,7 @@ def _task_process(tasks, control_queue):
pass
conn.add_timeout(MSG_CHECK_FREQ, check_control_queue) # run this function again soon
try:
subprocess_logger.debug(log_prefix + "Opening a connection...")
with utils.get_connection() as conn:
channel = conn.channel()
......@@ -125,5 +127,11 @@ def _task_process(tasks, control_queue):
check_control_queue()
channel.start_consuming()
channel.close()
except Exception as e:
subprocess_logger.error(log_prefix + "{e.__class__.__name__}: {e}".format(e=e))
finally:
if conn and conn.is_open:
conn.close()
subprocess_logger.debug(log_prefix + "Stopped.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment