diff --git a/code/infrastructure/streaming/clients/spielwiese.ipynb b/code/infrastructure/streaming/clients/spielwiese.ipynb index ec2e38ac15cb7d8aa09e7756dd8b5f283338da07..96327cc8dd86ecdfe3f38d7dc9f516846d60fe9b 100755 --- a/code/infrastructure/streaming/clients/spielwiese.ipynb +++ b/code/infrastructure/streaming/clients/spielwiese.ipynb @@ -1,5 +1,106 @@ { "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Mögliche Optimierung der Neo4j Subscriber..." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import paho.mqtt.client as mqtt\n", + "import time\n", + "import json\n", + "from neo4j import GraphDatabase\n", + "import os\n", + "from threading import Thread\n", + "from queue import Queue\n", + "\n", + "# Existing setup for MQTT and Neo4j connection\n", + "broker_hostname = str(os.getenv('mos_host', default=\"localhost\"))\n", + "broker_port = int(os.getenv('mos_port', default=1883))\n", + "client = mqtt.Client(\"Client4\")\n", + "db_uri = str(os.getenv('mem_host', default=\"bolt://localhost:7687\"))\n", + "neo4j_auth = (\"\", \"\")\n", + "abort_time_limit = int(os.getenv('abort_time_limit', default=99999))\n", + "\n", + "# Initialize a Queue for incoming MQTT messages\n", + "message_queue = Queue()\n", + "\n", + "def flatten_obj(key, val, target):\n", + " # Your existing flatten_obj function\n", + " pass\n", + "\n", + "def parse_json_to_cypher(input):\n", + " # Your existing parse_json_to_cypher function\n", + " pass\n", + "\n", + "def create_cypher_query_from_cdm(json):\n", + " # Your existing create_cypher_query_from_cdm function\n", + " pass\n", + "\n", + "def on_message(client, userdata, message):\n", + " data = json.loads(message.payload.decode(\"utf-8\"))\n", + " # Instead of processing immediately, put the message into the queue\n", + " message_queue.put(data)\n", + "\n", + "def on_connect(client, userdata, flags, return_code):\n", + " # Your existing on_connect function\n", + " pass\n", + "\n", + "def connect_to_db(uri, auth):\n", + " # Establish db connection to neo4j\n", + " driver = GraphDatabase.driver(uri, auth=auth)\n", + " # Consider moving session cleanup and connection verification outside of this function\n", + " return driver\n", + "\n", + "def execute_batch_queries(batch):\n", + " # New function to handle batch processing of messages\n", + " with driver.session() as session:\n", + " for data in batch:\n", + " q, attr = create_cypher_query_from_cdm(data)\n", + " session.run(q, attributes=attr)\n", + "\n", + "def process_message_batch():\n", + " while True:\n", + " batch = []\n", + " while not message_queue.empty():\n", + " batch.append(message_queue.get())\n", + "\n", + " if batch:\n", + " execute_batch_queries(batch)\n", + "\n", + " for _ in batch:\n", + " message_queue.task_done()\n", + "\n", + "# Start processing thread for handling MQTT messages in batches\n", + "processing_thread = Thread(target=process_message_batch, daemon=True)\n", + "processing_thread.start()\n", + "\n", + "driver = connect_to_db(db_uri, neo4j_auth)\n", + "\n", + "client.on_connect = on_connect\n", + "client.on_message = on_message\n", + "client.connect(broker_hostname, broker_port)\n", + "client.loop_start()\n", + "\n", + "# Your MQTT client loop and cleanup logic with try-finally\n", + "try:\n", + " i = 0\n", + " while i < abort_time_limit and not client.failed_connect:\n", + " time.sleep(1)\n", + " i += 1\n", + "finally:\n", + " client.disconnect()\n", + " client.loop_stop()\n", + " driver.close()\n" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -7,6 +108,13 @@ "CDM Data" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null,