Skip to content
Snippets Groups Projects
Commit 63c497e2 authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

added stopping after last index but currently in test mode

parent 11278d62
No related branches found
No related tags found
No related merge requests found
......@@ -12,26 +12,32 @@ sleep_time = int(os.getenv('sleeptime',default=1))
files = [
'ta1-cadets-e3-official_0.json',
'ta1-cadets-e3-official_1.json',
'ta1-cadets-e3-official_2.json',
'ta1-cadets-e3-official-1_0.json',
'ta1-cadets-e3-official-1_1.json',
'ta1-cadets-e3-official-1_2.json',
'ta1-cadets-e3-official-1_3.json',
'ta1-cadets-e3-official-1_4.json',
'ta1-cadets-e3-official-2_0.json',
'ta1-cadets-e3-official-2_1.json'
# 'ta1-cadets-e3-official_1.json',
# 'ta1-cadets-e3-official_2.json',
# 'ta1-cadets-e3-official-1_0.json',
# 'ta1-cadets-e3-official-1_1.json',
# 'ta1-cadets-e3-official-1_2.json',
# 'ta1-cadets-e3-official-1_3.json',
# 'ta1-cadets-e3-official-1_4.json',
# 'ta1-cadets-e3-official-2_0.json',
# 'ta1-cadets-e3-official-2_1.json'
]
line_count = [4999999,4999999,3911712,4999999,4999999,4999999,4999999,4999999,2059063,4999999,3433561] # line_count corresponding to each file
line_count = [100000]#[4999999,4999999,3911712,4999999,4999999,4999999,4999999,4999999,2059063,4999999,3433561] # line_count corresponding to each file
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client1")
topic = "neo4j"
def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
with open(file_path, 'r') as file:
i = 0
index = 0
while True:
print("index: ",index)
print("i: ",i)
if index> stop_index:
break
try:
i += 1
index += (lines_per_window)
if i>=line_count:
break
# Read the next lines_per_window lines
......@@ -48,7 +54,7 @@ def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
# Process a window and send a message
def process_window_lines(window_data, window_nr):
print(f"Processing window {window_nr}:")
print(f"Processing batch {window_nr}:")
values =[]
for line in window_data:
json_obj = json.loads(line.strip())
......@@ -61,15 +67,15 @@ def send_message(messages):
msg_count = 1
time.sleep(sleep_time)
for m in messages:
print(m)
#print(m)
result = client.publish(topic,m)
status = result[0]
if status == 0:
print(f'Message {str(msg_count)} from {lines_per_window} lines published')
else:
print("Failed to send message to topic " + topic)
if not client.is_connected():
print("Client not connected, exiting...")
# if status == 0:
# #print(f'Message {str(msg_count)} from {lines_per_window} lines published')
# else:
# print("Failed to send message to topic " + topic)
# if not client.is_connected():
# print("Client not connected, exiting...")
msg_count +=1
def on_connect(client, userdata, flags, return_code):
......@@ -91,6 +97,7 @@ try:
stop_index = line_count[i]
file_path = path+file
print("Reading file: ", file)
print("Max number of batches: ",line_count[i]/lines_per_window)
read_moving_window_and_send_data(file_path, lines_per_window,stop_index)
i += 1
finally:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment