Skip to content
Snippets Groups Projects
Commit 6a259c9e authored by julian's avatar julian
Browse files

added signal handling so python processes no longer linger for 10 seconds before being killed

parent 2a3b220e
No related branches found
No related tags found
No related merge requests found
from signal import SIGINT, SIGTERM, signal
from typing import Sequence, Tuple, Any, TypeAlias, Deque
from time import strftime, perf_counter, sleep
from os import environ, path, makedirs
......@@ -61,7 +62,18 @@ def perf_job() -> None:
info(f"Averaging {fmean(perf_list):.2f} vertices inserted per second.")
interrupted = False
def exit_gracefully(signum, frame):
global interrupted
interrupted = True
if __name__ == "__main__":
signal(SIGTERM, exit_gracefully)
signal(SIGINT, exit_gracefully)
interval = int(environ["QUERY_INTERVAL"])
log_directory = "log"
log_path = f"{log_directory}/{strftime('%Y-%m-%d_%H-%M-%S')}_query_log"
......@@ -100,7 +112,7 @@ if __name__ == "__main__":
end_time = timedelta(hours=2)
# schedule.every(interval).minutes.until(end_time).do(job, queries, csv_name).run()
schedule.every(5).seconds.until(end_time).do(perf_job)
while (sleep_time := schedule.idle_seconds()) is not None:
while (sleep_time := schedule.idle_seconds()) is not None and not interrupted:
sleep(max(sleep_time, 0))
schedule.run_pending()
info("done")
from signal import SIGINT, SIGTERM, signal
from typing import Iterator, List, Iterable, Sequence, IO
from paho.mqtt.client import Client, CallbackAPIVersion, ReasonCode
from zipfile import ZipFile, ZIP_LZMA
......@@ -46,7 +47,18 @@ def publish_lines(client: Client, topic: str, zip: Iterator[Sequence[str]]):
client.publish(topic, batch, 2)
interrupted = False
def exit_gracefully(signum, frame):
global interrupted
interrupted = True
if __name__ == "__main__":
signal(SIGTERM, exit_gracefully)
signal(SIGINT, exit_gracefully)
basicConfig(level=INFO)
broker_conf: dict = {"host": environ["MOS_HOST"]}
topic = environ["MOS_TOPIC"]
......@@ -59,7 +71,7 @@ if __name__ == "__main__":
client.loop_start()
batch = (batch_lines(lines, batch_size) for lines in read_zip(file, num_lines))
schedule.every().second.do(publish_lines, client, topic, batch)
while (sleep_time := schedule.idle_seconds()) is not None:
while (sleep_time := schedule.idle_seconds()) is not None and not interrupted:
sleep(max(sleep_time, 0))
try:
schedule.run_pending()
......
from signal import SIGINT, SIGTERM, signal
import sys
from paho.mqtt.client import Client, CallbackAPIVersion, MQTTMessage, ReasonCode
from typing import Tuple, Any, Iterable, Sequence, TypeAlias, Final
from os import environ
......@@ -36,6 +38,8 @@ class Postgres_inserter:
self.client = Client(**client_conf)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
signal(SIGINT, self.exit_gracefully)
signal(SIGTERM, self.exit_gracefully)
def start(self):
self.client.connect(**self.broker_conf)
......@@ -67,6 +71,9 @@ class Postgres_inserter:
if hasattr(self, "db_conn"):
self.db_conn.close()
def exit_gracefully(self, signum, frame):
sys.exit(0)
if __name__ == "__main__":
basicConfig(level=DEBUG)
......@@ -79,6 +86,5 @@ if __name__ == "__main__":
"topic": environ["MOS_TOPIC"],
"broker_conf": {"host": environ["MOS_HOST"], "keepalive": 3600 * 2},
}
with Postgres_inserter(**configuration) as subscriber:
subscriber.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment