From 3adf6c6fe312d6b3e09de7a6647aa72f2a7e5a47 Mon Sep 17 00:00:00 2001
From: cwy-p8d-u1 <sven-ove.haensel@stud.hs-hannover.de>
Date: Wed, 14 Feb 2024 13:28:38 +0100
Subject: [PATCH] add possible solution for faster inserts

---
 .../streaming/clients/spielwiese.ipynb        | 108 ++++++++++++++++++
 1 file changed, 108 insertions(+)

diff --git a/code/infrastructure/streaming/clients/spielwiese.ipynb b/code/infrastructure/streaming/clients/spielwiese.ipynb
index ec2e38a..96327cc 100755
--- a/code/infrastructure/streaming/clients/spielwiese.ipynb
+++ b/code/infrastructure/streaming/clients/spielwiese.ipynb
@@ -1,5 +1,106 @@
 {
  "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Mögliche Optimierung der Neo4j Subscriber..."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import paho.mqtt.client as mqtt\n",
+    "import time\n",
+    "import json\n",
+    "from neo4j import GraphDatabase\n",
+    "import os\n",
+    "from threading import Thread\n",
+    "from queue import Queue\n",
+    "\n",
+    "# Existing setup for MQTT and Neo4j connection\n",
+    "broker_hostname = str(os.getenv('mos_host', default=\"localhost\"))\n",
+    "broker_port = int(os.getenv('mos_port', default=1883))\n",
+    "client = mqtt.Client(\"Client4\")\n",
+    "db_uri = str(os.getenv('mem_host', default=\"bolt://localhost:7687\"))\n",
+    "neo4j_auth = (\"\", \"\")\n",
+    "abort_time_limit = int(os.getenv('abort_time_limit', default=99999))\n",
+    "\n",
+    "# Initialize a Queue for incoming MQTT messages\n",
+    "message_queue = Queue()\n",
+    "\n",
+    "def flatten_obj(key, val, target):\n",
+    "    # Your existing flatten_obj function\n",
+    "    pass\n",
+    "\n",
+    "def parse_json_to_cypher(input):\n",
+    "    # Your existing parse_json_to_cypher function\n",
+    "    pass\n",
+    "\n",
+    "def create_cypher_query_from_cdm(json):\n",
+    "    # Your existing create_cypher_query_from_cdm function\n",
+    "    pass\n",
+    "\n",
+    "def on_message(client, userdata, message):\n",
+    "    data = json.loads(message.payload.decode(\"utf-8\"))\n",
+    "    # Instead of processing immediately, put the message into the queue\n",
+    "    message_queue.put(data)\n",
+    "\n",
+    "def on_connect(client, userdata, flags, return_code):\n",
+    "    # Your existing on_connect function\n",
+    "    pass\n",
+    "\n",
+    "def connect_to_db(uri, auth):\n",
+    "    # Establish db connection to neo4j\n",
+    "    driver = GraphDatabase.driver(uri, auth=auth)\n",
+    "    # Consider moving session cleanup and connection verification outside of this function\n",
+    "    return driver\n",
+    "\n",
+    "def execute_batch_queries(batch):\n",
+    "    # New function to handle batch processing of messages\n",
+    "    with driver.session() as session:\n",
+    "        for data in batch:\n",
+    "            q, attr = create_cypher_query_from_cdm(data)\n",
+    "            session.run(q, attributes=attr)\n",
+    "\n",
+    "def process_message_batch():\n",
+    "    while True:\n",
+    "        batch = []\n",
+    "        while not message_queue.empty():\n",
+    "            batch.append(message_queue.get())\n",
+    "\n",
+    "        if batch:\n",
+    "            execute_batch_queries(batch)\n",
+    "\n",
+    "            for _ in batch:\n",
+    "                message_queue.task_done()\n",
+    "\n",
+    "# Start processing thread for handling MQTT messages in batches\n",
+    "processing_thread = Thread(target=process_message_batch, daemon=True)\n",
+    "processing_thread.start()\n",
+    "\n",
+    "driver = connect_to_db(db_uri, neo4j_auth)\n",
+    "\n",
+    "client.on_connect = on_connect\n",
+    "client.on_message = on_message\n",
+    "client.connect(broker_hostname, broker_port)\n",
+    "client.loop_start()\n",
+    "\n",
+    "# Your MQTT client loop and cleanup logic with try-finally\n",
+    "try:\n",
+    "    i = 0\n",
+    "    while i < abort_time_limit and not client.failed_connect:\n",
+    "        time.sleep(1)\n",
+    "        i += 1\n",
+    "finally:\n",
+    "    client.disconnect()\n",
+    "    client.loop_stop()\n",
+    "    driver.close()\n"
+   ]
+  },
   {
    "cell_type": "markdown",
    "metadata": {},
@@ -7,6 +108,13 @@
     "CDM Data"
    ]
   },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  },
   {
    "cell_type": "code",
    "execution_count": null,
-- 
GitLab