diff --git a/code/eval/queries/cypher/memgraph/memgraph_queries.py b/code/eval/queries/cypher/memgraph/memgraph_queries.py
index 51868ec5e3f058f98e84249b2ba374c88e06e649..0cd61a931e0941576e243abefa4fc9688ff0bc51 100644
--- a/code/eval/queries/cypher/memgraph/memgraph_queries.py
+++ b/code/eval/queries/cypher/memgraph/memgraph_queries.py
@@ -69,7 +69,9 @@ def execute_and_log_query(query_key, query):
result = session.run(query)
# end_time = time.time()
row_count = 0
- if query_key == '2-hop':
+ if query_key == 'count': # added for counting in the database, and not sending all nodes
+ row_count = result.single()[0] #
+ elif query_key == '2-hop':
for record in result:
nodes = record.get('nodes')
if nodes is not None: # Check if 'nodes' is not None
diff --git a/code/infrastructure/docker-compose_memgraph.yml b/code/infrastructure/docker-compose_memgraph.yml
index 288945ea0b253516dbf9bbb1239287d901335c10..611e5a9ccc2e1d55ed37fe3753717744eff5999d 100644
--- a/code/infrastructure/docker-compose_memgraph.yml
+++ b/code/infrastructure/docker-compose_memgraph.yml
@@ -49,12 +49,13 @@ services:
- "8444:7444"
- "3001:3000"
- "9091:9091"
+ # - "9115:9115" only for mem_exporter
environment:
- - MEMGRAPH="--query-modules-directory=/usr/lib/memgraph/query_modules --monitoring-address=0.0.0.0 --monitoring-port=7444 --log-file=/var/log/memgraph/memgraph.log --log-level=TRACE "
- volumes:
+ - MEMGRAPH="--query-modules-directory=/usr/lib/memgraph/query_modules --monitoring-address=0.0.0.0 --monitoring-port=7444 --log-file=/var/log/memgraph/memgraph.log --log-level=TRACE --config=/etc/memgraph/memgraph.conf --memory-limit=5120 --query-execution-timeout-sec=6000 --storage-transaction-timeout-sec=600 --cartesian-product-enabled=false"
+ volumes: # mybe adding --memory-limit=6G # above
- ./memgraph/mg_lib:/var/lib/memgraph
- ./memgraph/mg_log:/var/log/memgraph:rw
- - ./memgraph/mg_etc:/etc/memgraph
+ - ./memgraph/mg_etc:/etc/memgraph:rw
entrypoint: ["/usr/bin/supervisord"]
healthcheck:
test: ["CMD", "mgconsole"]
@@ -62,6 +63,17 @@ services:
timeout: 10s
retries: 5
start_period: 30s
+ deploy:
+ resources:
+ limits:
+ memory: 12G
+ cpus: '4.0' # Limiting to 4 CPU cores
+ reservations:
+ memory: 12G
+ cpus: '2.0' # Reserving 2 CPU cores
+ mem_limit: 12G # Set a hard memory limit
+ memswap_limit: 0G # Set a swap limit to prevent excessive swapping -> export RAM to Harddrive (Paging)
+
#mem_exporter:
# image: mem_exporter
@@ -82,38 +94,40 @@ services:
- ./streaming/broker/log/:/mosquitto/log/
- ./streaming/broker/data/:/mosquitto/data/
- sub_mem:
- container_name: sub_mem
- image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/sub_mem
- environment:
- - abort_time_limit=999999
- - mem_host=bolt://memgraph:7687
- - mos_host=mos1
- - mos_port=1883
- depends_on:
- memgraph:
- condition: service_healthy
+# sub_mem: # don t start sub
+# container_name: sub_mem
+# image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/sub_mem
+# environment:
+# - abort_time_limit=999999
+# - mem_host=bolt://memgraph:7687
+# - mos_host=mos1
+# - mos_port=1883
+# depends_on:
+# memgraph:
+# condition: service_healthy
#restart: true
pub:
container_name: pub_cdm
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/pub_cdm
environment:
- - lines_per_window=${WINDOW_SIZE}
+ - lines_per_window= 45 #${WINDOW_SIZE}
+ # $Winndow from python start
- path_to_firstK=/var/lib/import/first1k.json
- path_data=/var/lib/import/
- - sleep_time=1
+ - sleeptime=0.04
+ - original=false
volumes:
- ./streaming/clients/pub/data:/var/lib/import
- depends_on:
- sub_mem:
- condition: service_started
+# depends_on: #don't start sub
+# sub_mem:
+# condition: service_started
query_memgraph:
container_name: query_memgraph
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/memgraph_queries
environment:
- - interval=5
+ - interval=2
- db_host=memgraph
- db_port=7687
- query_file_path=/app/queries/queries.txt
@@ -121,7 +135,8 @@ services:
- ./eval/query/memgraph/log:/app/log
- ./eval/query/memgraph/queries:/app/queries
depends_on:
- - pub
+ #- pub
+ - memgraph #added
volumes:
prometheus-data:
@@ -129,6 +144,10 @@ volumes:
mg_lib:
mg_log:
mg_etc:
+ memgraph_data:
+ driver: local
+ driver_opts:
+ size: 10GB # Limit the volume size to 5GB
networks:
postgres:
diff --git a/code/infrastructure/eval/query/memgraph/queries/queries copy.txt b/code/infrastructure/eval/query/memgraph/queries/queries copy.txt
new file mode 100644
index 0000000000000000000000000000000000000000..7b455c1d5fcbe1169e4e577af1b51b1d138e4df9
--- /dev/null
+++ b/code/infrastructure/eval/query/memgraph/queries/queries copy.txt
@@ -0,0 +1,39 @@
+count|MATCH (n) return n;
+anc|MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, size(path) AS distance_upstream ORDER BY distance_upstream;
+desc|MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, size(path) AS distance_downstream ORDER BY distance_downstream;
+path|MATCH path = ((a {uuid: 'A6A7C956-0132-5506-96D1-2A7DE97CB400'})-[*ALLSHORTEST(r,n|1)]->(b {uuid:'8DA367BF-36C2-11E8-BF66-D9AA8AFF4A69'})) RETURN path;
+2-hop|MATCH (a) where a.uuid='9FF334BB-9072-D756-B290-556656D73728' CALL neighbors.by_hop(a, [""], 2) YIELD nodes RETURN nodes;
+
+Own Query
+2-hop|MATCH (a)-[*1]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_1 RETURN nodes_at_hop_1 AS nodes UNION MATCH (a)-[*2]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_2 RETURN nodes_at_hop_2 AS nodes;
+
+Own desc MATCH (node:Node {nodeType: 'Host'})<-[r*]-(descendant)
+RETURN descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream
+ORDER BY ID(descendant);
+
+ MATCH (node:Node {nodeType: 'Host'})<-[r*]-(descendant)
+RETURN descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream
+ORDER BY ID(descendant);
+
+DISTINCT
+desc|MATCH (node:Node {nodeType: 'Host'})<-[r*]-(descendant) RETURN DISTINCT descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream ORDER BY distance_downstream;
+
+
+
+MATCH (node:Node {nodeType: 'Host'})<-[:residesOn]-(k)<-[r*]-(descendant) RETURN DISTINCT descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream ORDER BY distance_downstream;
+
+
+2-hop|MATCH (a)-[*1]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_1 RETURN nodes_at_hop_1 AS nodes UNION MATCH (a)-[*2]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_2 RETURN nodes_at_hop_2 AS nodes;
+
+
+
+
+MATCH (a) where a.uuid='9FF334BB-9072-D756-B290-556656D73728' CALL neighbors.by_hop(a, [""], 2) YIELD nodes RETURN nodes;
+
+SHOW STORAGE INFO
+
+MATCH (a)-[*1]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_1 RETURN nodes_at_hop_1 AS nodes UNION MATCH (a)-[*2]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_2 RETURN nodes_at_hop_2 AS nodes;
+
+MATCH (node:Host)<-[r*]-(descendant) RETURN descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream ORDER BY ID(descendant);
+
+MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, size(path) AS distance_downstream ORDER BY distance_downstream;
\ No newline at end of file
diff --git a/code/infrastructure/eval/query/memgraph/queries/queries.txt b/code/infrastructure/eval/query/memgraph/queries/queries.txt
index b07ff75145b9fe19988c5a1da33b04d7db8899b6..e6ce4402c2dbbac240c0f56c3a0665db58abd3c7 100644
--- a/code/infrastructure/eval/query/memgraph/queries/queries.txt
+++ b/code/infrastructure/eval/query/memgraph/queries/queries.txt
@@ -1,5 +1 @@
-count|MATCH (n) return n;
-anc|MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, size(path) AS distance_upstream ORDER BY distance_upstream;
-desc|MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, size(path) AS distance_downstream ORDER BY distance_downstream;
-path|MATCH path = ((a {_uuid: 'A6A7C956-0132-5506-96D1-2A7DE97CB400'})-[*ALLSHORTEST(r,n|1)]->(b {_uuid:'8DA367BF-36C2-11E8-BF66-D9AA8AFF4A69'})) RETURN path;
-2-hop|MATCH (a) where a._uuid='9FF334BB-9072-D756-B290-556656D73728' CALL neighbors.by_hop(a, [""], 2) YIELD nodes RETURN nodes;
\ No newline at end of file
+count|MATCH (n) return count(n);
\ No newline at end of file
diff --git a/code/infrastructure/streaming/broker/mosquitto.conf b/code/infrastructure/streaming/broker/mosquitto.conf
index cb4082b7e060d744dc59a8e2dad8c8f1c277a59b..3a37e15909bab9a3e6dbdb20106285340c4170e4 100644
--- a/code/infrastructure/streaming/broker/mosquitto.conf
+++ b/code/infrastructure/streaming/broker/mosquitto.conf
@@ -2,4 +2,4 @@
log_dest file /mosquitto/log/mosquitto.log
allow_anonymous true
listener 1883 0.0.0.0
-max_queued_messages 1000000000
\ No newline at end of file
+max_queued_messages 2000000
\ No newline at end of file
diff --git a/code/infrastructure/streaming/clients/pub/pub_cdm.py b/code/infrastructure/streaming/clients/pub/pub_cdm.py
index 85aef2f11045c34f1cdf53ce34f6c2fcf2c9cb7d..5486a0aeaa67e941cb33b266f3500aa3a06dfdca 100755
--- a/code/infrastructure/streaming/clients/pub/pub_cdm.py
+++ b/code/infrastructure/streaming/clients/pub/pub_cdm.py
@@ -4,29 +4,35 @@ import json
import random
import os
+Original_version = os.getenv('original', 'False').lower() in ['true', '1', 't', 'y', 'yes']
broker_hostname = str(os.getenv('mos_host',default="localhost"))
port = int(os.getenv('mos_port',default="1883"))
lines_per_window = int(os.getenv('lines_per_window',default=1000))
path = str(os.getenv('path_data',default='C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\pub\\data\\'))
-sleep_time = int(os.getenv('sleeptime',default=1))
+sleep_time = float(os.getenv('sleeptime',default=1))
files = [
- 'ta1-cadets-e3-official_0.json',
- 'ta1-cadets-e3-official_1.json',
- 'ta1-cadets-e3-official_2.json',
- 'ta1-cadets-e3-official-1_0.json',
- 'ta1-cadets-e3-official-1_1.json',
- 'ta1-cadets-e3-official-1_2.json',
- 'ta1-cadets-e3-official-1_3.json',
- 'ta1-cadets-e3-official-1_4.json',
- 'ta1-cadets-e3-official-2_0.json',
- 'ta1-cadets-e3-official-2_1.json'
+ 'ta1-cadets-e3-official_0.json',
+ 'ta1-cadets-e3-official_1.json',
+ 'ta1-cadets-e3-official_2.json',
+ 'ta1-cadets-e3-official-1_0.json',
+ 'ta1-cadets-e3-official-1_1.json',
+ 'ta1-cadets-e3-official-1_2.json',
+ 'ta1-cadets-e3-official-1_3.json',
+ 'ta1-cadets-e3-official-1_4.json',
+ 'ta1-cadets-e3-official-2_0.json',
+ 'ta1-cadets-e3-official-2_1.json'
]
+#files = ['ta1-cadets-e3-official_0.json']# Values to limit the amount of Nodes
+#line_count = [100000]# -> 100000 + lines_per_window
line_count = [4999999,4999999,3911712,4999999,4999999,4999999,4999999,4999999,2059063,4999999,3433561] # line_count corresponding to each file
+
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client1")
topic = "neo4j"
def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
+ #file_path="/home/sven/Schreibtisch/ma_code/code/infrastructure/streaming/clients/pub/data/ta1-cadets-e3-official_0.json" #for local
+
with open(file_path, 'r') as file:
i = 0
index = 0
@@ -41,6 +47,8 @@ def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
if i>=line_count:
break
# Read the next lines_per_window lines
+ if i + lines_per_window >= line_count:
+ lines_per_window = lines_per_window - (i+lines_per_window- line_count)
window_data = [next(file) for _ in range(lines_per_window)]
# If no more data is left, break the loop
@@ -56,27 +64,47 @@ def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
def process_window_lines(window_data, window_nr):
print(f"Processing batch {window_nr}:")
values =[]
- for line in window_data:
- json_obj = json.loads(line.strip())
- json_string = json.dumps(json_obj)
- values.append(json_string)
- send_message(values)
-
+ if Original_version:
+ for line in window_data:
+ json_obj = json.loads(line.strip())
+ json_string = json.dumps(json_obj)
+ values.append(json_string)
+ else:
+ # Load each line into JSON object if Original_version is False
+ values = [json.loads(line) for line in window_data]
+ send_message(window_data)
+
+
def send_message(messages):
msg_count = 1
time.sleep(sleep_time)
- for m in messages:
- #print(m)
- result = client.publish(topic,m)
+ if Original_version:
+ for m in messages:
+ #print(m)
+ result = client.publish(topic,m)
+ status = result[0]
+ # if status == 0:
+ # #print(f'Message {str(msg_count)} from {lines_per_window} lines published')
+ # else:
+ # print("Failed to send message to topic " + topic)
+ # if not client.is_connected():
+ # print("Client not connected, exiting...")
+ msg_count +=1
+ else : # sending the Batch as one message
+ #json_string = json.dumps(messages)
+ #print(messages)
+ messages= json.dumps(messages)
+
+ result = client.publish(topic, messages)
status = result[0]
- # if status == 0:
- # #print(f'Message {str(msg_count)} from {lines_per_window} lines published')
- # else:
- # print("Failed to send message to topic " + topic)
- # if not client.is_connected():
- # print("Client not connected, exiting...")
- msg_count +=1
+ if status == 0: # Wieder auskommentieren?
+ print(f'Message {str(msg_count)} from {lines_per_window} lines published')
+ else:
+ print("Failed to send message to topic " + topic)
+ if not client.is_connected():
+ print("Client not connected, exiting...")
+
def on_connect(client, userdata, flags, return_code):
if return_code == 0: