Skip to content
Snippets Groups Projects
Commit 7bf1cc7e authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

changes at pub and query

parent fde5f859
No related branches found
No related tags found
No related merge requests found
...@@ -69,7 +69,9 @@ def execute_and_log_query(query_key, query): ...@@ -69,7 +69,9 @@ def execute_and_log_query(query_key, query):
result = session.run(query) result = session.run(query)
# end_time = time.time() # end_time = time.time()
row_count = 0 row_count = 0
if query_key == '2-hop': if query_key == 'count': # added for counting in the database, and not sending all nodes
row_count = result.single()[0] #
elif query_key == '2-hop':
for record in result: for record in result:
nodes = record.get('nodes') nodes = record.get('nodes')
if nodes is not None: # Check if 'nodes' is not None if nodes is not None: # Check if 'nodes' is not None
......
...@@ -49,12 +49,13 @@ services: ...@@ -49,12 +49,13 @@ services:
- "8444:7444" - "8444:7444"
- "3001:3000" - "3001:3000"
- "9091:9091" - "9091:9091"
# - "9115:9115" only for mem_exporter
environment: environment:
- MEMGRAPH="--query-modules-directory=/usr/lib/memgraph/query_modules --monitoring-address=0.0.0.0 --monitoring-port=7444 --log-file=/var/log/memgraph/memgraph.log --log-level=TRACE " - MEMGRAPH="--query-modules-directory=/usr/lib/memgraph/query_modules --monitoring-address=0.0.0.0 --monitoring-port=7444 --log-file=/var/log/memgraph/memgraph.log --log-level=TRACE --config=/etc/memgraph/memgraph.conf --memory-limit=5120 --query-execution-timeout-sec=6000 --storage-transaction-timeout-sec=600 --cartesian-product-enabled=false"
volumes: volumes: # mybe adding --memory-limit=6G # above
- ./memgraph/mg_lib:/var/lib/memgraph - ./memgraph/mg_lib:/var/lib/memgraph
- ./memgraph/mg_log:/var/log/memgraph:rw - ./memgraph/mg_log:/var/log/memgraph:rw
- ./memgraph/mg_etc:/etc/memgraph - ./memgraph/mg_etc:/etc/memgraph:rw
entrypoint: ["/usr/bin/supervisord"] entrypoint: ["/usr/bin/supervisord"]
healthcheck: healthcheck:
test: ["CMD", "mgconsole"] test: ["CMD", "mgconsole"]
...@@ -62,6 +63,17 @@ services: ...@@ -62,6 +63,17 @@ services:
timeout: 10s timeout: 10s
retries: 5 retries: 5
start_period: 30s start_period: 30s
deploy:
resources:
limits:
memory: 12G
cpus: '4.0' # Limiting to 4 CPU cores
reservations:
memory: 12G
cpus: '2.0' # Reserving 2 CPU cores
mem_limit: 12G # Set a hard memory limit
memswap_limit: 0G # Set a swap limit to prevent excessive swapping -> export RAM to Harddrive (Paging)
#mem_exporter: #mem_exporter:
# image: mem_exporter # image: mem_exporter
...@@ -82,38 +94,40 @@ services: ...@@ -82,38 +94,40 @@ services:
- ./streaming/broker/log/:/mosquitto/log/ - ./streaming/broker/log/:/mosquitto/log/
- ./streaming/broker/data/:/mosquitto/data/ - ./streaming/broker/data/:/mosquitto/data/
sub_mem: # sub_mem: # don t start sub
container_name: sub_mem # container_name: sub_mem
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/sub_mem # image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/sub_mem
environment: # environment:
- abort_time_limit=999999 # - abort_time_limit=999999
- mem_host=bolt://memgraph:7687 # - mem_host=bolt://memgraph:7687
- mos_host=mos1 # - mos_host=mos1
- mos_port=1883 # - mos_port=1883
depends_on: # depends_on:
memgraph: # memgraph:
condition: service_healthy # condition: service_healthy
#restart: true #restart: true
pub: pub:
container_name: pub_cdm container_name: pub_cdm
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/pub_cdm image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/pub_cdm
environment: environment:
- lines_per_window=${WINDOW_SIZE} - lines_per_window= 45 #${WINDOW_SIZE}
# $Winndow from python start
- path_to_firstK=/var/lib/import/first1k.json - path_to_firstK=/var/lib/import/first1k.json
- path_data=/var/lib/import/ - path_data=/var/lib/import/
- sleep_time=1 - sleeptime=0.04
- original=false
volumes: volumes:
- ./streaming/clients/pub/data:/var/lib/import - ./streaming/clients/pub/data:/var/lib/import
depends_on: # depends_on: #don't start sub
sub_mem: # sub_mem:
condition: service_started # condition: service_started
query_memgraph: query_memgraph:
container_name: query_memgraph container_name: query_memgraph
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/memgraph_queries image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/memgraph_queries
environment: environment:
- interval=5 - interval=2
- db_host=memgraph - db_host=memgraph
- db_port=7687 - db_port=7687
- query_file_path=/app/queries/queries.txt - query_file_path=/app/queries/queries.txt
...@@ -121,7 +135,8 @@ services: ...@@ -121,7 +135,8 @@ services:
- ./eval/query/memgraph/log:/app/log - ./eval/query/memgraph/log:/app/log
- ./eval/query/memgraph/queries:/app/queries - ./eval/query/memgraph/queries:/app/queries
depends_on: depends_on:
- pub #- pub
- memgraph #added
volumes: volumes:
prometheus-data: prometheus-data:
...@@ -129,6 +144,10 @@ volumes: ...@@ -129,6 +144,10 @@ volumes:
mg_lib: mg_lib:
mg_log: mg_log:
mg_etc: mg_etc:
memgraph_data:
driver: local
driver_opts:
size: 10GB # Limit the volume size to 5GB
networks: networks:
postgres: postgres:
......
count|MATCH (n) return n;
anc|MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, size(path) AS distance_upstream ORDER BY distance_upstream;
desc|MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, size(path) AS distance_downstream ORDER BY distance_downstream;
path|MATCH path = ((a {uuid: 'A6A7C956-0132-5506-96D1-2A7DE97CB400'})-[*ALLSHORTEST(r,n|1)]->(b {uuid:'8DA367BF-36C2-11E8-BF66-D9AA8AFF4A69'})) RETURN path;
2-hop|MATCH (a) where a.uuid='9FF334BB-9072-D756-B290-556656D73728' CALL neighbors.by_hop(a, [""], 2) YIELD nodes RETURN nodes;
Own Query
2-hop|MATCH (a)-[*1]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_1 RETURN nodes_at_hop_1 AS nodes UNION MATCH (a)-[*2]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_2 RETURN nodes_at_hop_2 AS nodes;
Own desc MATCH (node:Node {nodeType: 'Host'})<-[r*]-(descendant)
RETURN descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream
ORDER BY ID(descendant);
MATCH (node:Node {nodeType: 'Host'})<-[r*]-(descendant)
RETURN descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream
ORDER BY ID(descendant);
DISTINCT
desc|MATCH (node:Node {nodeType: 'Host'})<-[r*]-(descendant) RETURN DISTINCT descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream ORDER BY distance_downstream;
MATCH (node:Node {nodeType: 'Host'})<-[:residesOn]-(k)<-[r*]-(descendant) RETURN DISTINCT descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream ORDER BY distance_downstream;
2-hop|MATCH (a)-[*1]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_1 RETURN nodes_at_hop_1 AS nodes UNION MATCH (a)-[*2]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_2 RETURN nodes_at_hop_2 AS nodes;
MATCH (a) where a.uuid='9FF334BB-9072-D756-B290-556656D73728' CALL neighbors.by_hop(a, [""], 2) YIELD nodes RETURN nodes;
SHOW STORAGE INFO
MATCH (a)-[*1]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_1 RETURN nodes_at_hop_1 AS nodes UNION MATCH (a)-[*2]-(b) WHERE a.uuid='9FF334BB-9072-D756-B290-556656D73728' WITH COLLECT(DISTINCT b) as nodes_at_hop_2 RETURN nodes_at_hop_2 AS nodes;
MATCH (node:Host)<-[r*]-(descendant) RETURN descendant, reduce(distance_downstream = 0, rel IN r | distance_downstream + 1) AS distance_downstream ORDER BY ID(descendant);
MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, size(path) AS distance_downstream ORDER BY distance_downstream;
\ No newline at end of file
count|MATCH (n) return n; count|MATCH (n) return count(n);
anc|MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, size(path) AS distance_upstream ORDER BY distance_upstream; \ No newline at end of file
desc|MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, size(path) AS distance_downstream ORDER BY distance_downstream;
path|MATCH path = ((a {_uuid: 'A6A7C956-0132-5506-96D1-2A7DE97CB400'})-[*ALLSHORTEST(r,n|1)]->(b {_uuid:'8DA367BF-36C2-11E8-BF66-D9AA8AFF4A69'})) RETURN path;
2-hop|MATCH (a) where a._uuid='9FF334BB-9072-D756-B290-556656D73728' CALL neighbors.by_hop(a, [""], 2) YIELD nodes RETURN nodes;
\ No newline at end of file
...@@ -2,4 +2,4 @@ ...@@ -2,4 +2,4 @@
log_dest file /mosquitto/log/mosquitto.log log_dest file /mosquitto/log/mosquitto.log
allow_anonymous true allow_anonymous true
listener 1883 0.0.0.0 listener 1883 0.0.0.0
max_queued_messages 1000000000 max_queued_messages 2000000
\ No newline at end of file \ No newline at end of file
...@@ -4,11 +4,12 @@ import json ...@@ -4,11 +4,12 @@ import json
import random import random
import os import os
Original_version = os.getenv('original', 'False').lower() in ['true', '1', 't', 'y', 'yes']
broker_hostname = str(os.getenv('mos_host',default="localhost")) broker_hostname = str(os.getenv('mos_host',default="localhost"))
port = int(os.getenv('mos_port',default="1883")) port = int(os.getenv('mos_port',default="1883"))
lines_per_window = int(os.getenv('lines_per_window',default=1000)) lines_per_window = int(os.getenv('lines_per_window',default=1000))
path = str(os.getenv('path_data',default='C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\pub\\data\\')) path = str(os.getenv('path_data',default='C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\pub\\data\\'))
sleep_time = int(os.getenv('sleeptime',default=1)) sleep_time = float(os.getenv('sleeptime',default=1))
files = [ files = [
'ta1-cadets-e3-official_0.json', 'ta1-cadets-e3-official_0.json',
...@@ -22,11 +23,16 @@ files = [ ...@@ -22,11 +23,16 @@ files = [
'ta1-cadets-e3-official-2_0.json', 'ta1-cadets-e3-official-2_0.json',
'ta1-cadets-e3-official-2_1.json' 'ta1-cadets-e3-official-2_1.json'
] ]
#files = ['ta1-cadets-e3-official_0.json']# Values to limit the amount of Nodes
#line_count = [100000]# -> 100000 + lines_per_window
line_count = [4999999,4999999,3911712,4999999,4999999,4999999,4999999,4999999,2059063,4999999,3433561] # line_count corresponding to each file line_count = [4999999,4999999,3911712,4999999,4999999,4999999,4999999,4999999,2059063,4999999,3433561] # line_count corresponding to each file
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client1") client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client1")
topic = "neo4j" topic = "neo4j"
def read_moving_window_and_send_data(file_path, lines_per_window, line_count): def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
#file_path="/home/sven/Schreibtisch/ma_code/code/infrastructure/streaming/clients/pub/data/ta1-cadets-e3-official_0.json" #for local
with open(file_path, 'r') as file: with open(file_path, 'r') as file:
i = 0 i = 0
index = 0 index = 0
...@@ -41,6 +47,8 @@ def read_moving_window_and_send_data(file_path, lines_per_window, line_count): ...@@ -41,6 +47,8 @@ def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
if i>=line_count: if i>=line_count:
break break
# Read the next lines_per_window lines # Read the next lines_per_window lines
if i + lines_per_window >= line_count:
lines_per_window = lines_per_window - (i+lines_per_window- line_count)
window_data = [next(file) for _ in range(lines_per_window)] window_data = [next(file) for _ in range(lines_per_window)]
# If no more data is left, break the loop # If no more data is left, break the loop
...@@ -56,16 +64,22 @@ def read_moving_window_and_send_data(file_path, lines_per_window, line_count): ...@@ -56,16 +64,22 @@ def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
def process_window_lines(window_data, window_nr): def process_window_lines(window_data, window_nr):
print(f"Processing batch {window_nr}:") print(f"Processing batch {window_nr}:")
values =[] values =[]
if Original_version:
for line in window_data: for line in window_data:
json_obj = json.loads(line.strip()) json_obj = json.loads(line.strip())
json_string = json.dumps(json_obj) json_string = json.dumps(json_obj)
values.append(json_string) values.append(json_string)
send_message(values) else:
# Load each line into JSON object if Original_version is False
values = [json.loads(line) for line in window_data]
send_message(window_data)
def send_message(messages): def send_message(messages):
msg_count = 1 msg_count = 1
time.sleep(sleep_time) time.sleep(sleep_time)
if Original_version:
for m in messages: for m in messages:
#print(m) #print(m)
result = client.publish(topic,m) result = client.publish(topic,m)
...@@ -77,6 +91,20 @@ def send_message(messages): ...@@ -77,6 +91,20 @@ def send_message(messages):
# if not client.is_connected(): # if not client.is_connected():
# print("Client not connected, exiting...") # print("Client not connected, exiting...")
msg_count +=1 msg_count +=1
else : # sending the Batch as one message
#json_string = json.dumps(messages)
#print(messages)
messages= json.dumps(messages)
result = client.publish(topic, messages)
status = result[0]
if status == 0: # Wieder auskommentieren?
print(f'Message {str(msg_count)} from {lines_per_window} lines published')
else:
print("Failed to send message to topic " + topic)
if not client.is_connected():
print("Client not connected, exiting...")
def on_connect(client, userdata, flags, return_code): def on_connect(client, userdata, flags, return_code):
if return_code == 0: if return_code == 0:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment