Skip to content
Snippets Groups Projects
Commit 6f3605c0 authored by julian's avatar julian
Browse files

added feature to send node batch as single statement

parent a7b1c34a
Branches
No related tags found
No related merge requests found
......@@ -8,6 +8,7 @@ services:
- EDGE_INSERTS
- UUID_EDGES
- BATCH_SIZE
- SINGLE_STATEMENT_NODES
- PGPASSFILE=/run/secrets/pgpass
profiles:
- experiment
......
......@@ -7,7 +7,9 @@ import json
import psycopg
Parameter_Query: TypeAlias = Tuple[str, Iterable[Sequence[Any]]]
QueryList: TypeAlias = Tuple[str, Iterable[Sequence[Any]]]
Query: TypeAlias = Tuple[str, Sequence[Any]]
Node: TypeAlias = Tuple[str, str, dict]
HEADER_SCHEMA: Final[bytes] = b"com.bbn.tc.schema.avro.cdm18."
NODE_TYPES: Final[dict] = {
# Python trap: Tuple with exactly one item must have trailing comma. Can you spot all 3?
......@@ -37,7 +39,7 @@ def first_key(obj: dict) -> str:
return next(iter(obj))
def unpack_node(node_entry: dict) -> Tuple[str, str, dict]:
def unpack_node(node_entry: dict) -> Node:
node = node_entry["datum"]
node_type = first_key(node)
content = node[node_type]
......@@ -45,9 +47,15 @@ def unpack_node(node_entry: dict) -> Tuple[str, str, dict]:
return (uuid, node_type, content)
def format_node_query(node_list: Sequence[Tuple[str, str, dict]]) -> Parameter_Query:
node_json = ((type, json.dumps(node)) for _, type, node in node_list)
return ("INSERT INTO vertex (type, content) VALUES(%s, %s)", node_json)
def format_node_query(node_list: Sequence[Node]) -> QueryList:
nodes_json = ((type, json.dumps(node)) for _, type, node in node_list)
return ("INSERT INTO vertex (type, content) VALUES (%s,%s)", nodes_json)
def format_nodes_single_statement(node_list: Sequence[Node]) -> Query:
node_json = [val for _, type, node in node_list for val in (type, json.dumps(node))]
query_str = f"INSERT INTO vertex (type, content) VALUES {",".join("(%s,%s)"*len(node_list))}"
return (query_str, node_json)
def try_path(obj: dict, keys: Sequence[str]) -> str | None:
......@@ -65,13 +73,13 @@ def get_edges(uuid: str, type: str, content: dict) -> Iterable[Tuple[str, str, s
yield (uuid, dest, edge_type)
def format_edge_uuid(nodes: Sequence[Tuple[str, str, dict]]) -> Parameter_Query:
def format_edge_uuid(nodes: Sequence[Tuple[str, str, dict]]) -> QueryList:
edges = (edge for node in nodes for edge in get_edges(*node))
return ("INSERT INTO edge (source, destination, type) VALUES (%s, %s, %s)", edges)
# currently produces malformed SQL when there are no edges
def format_edge_id(nodes: Sequence[Tuple[str, str, dict]]) -> Tuple[str, Sequence[str]]:
def format_edge_id(nodes: Sequence[Tuple[str, str, dict]]) -> Query:
edges = [edge for node in nodes for edge in get_edges(*node)]
query_str = (
f"INSERT INTO edge (source, destination, type) "
......@@ -116,23 +124,25 @@ if __name__ == "__main__":
basicConfig(level=INFO)
batch_size = max(int(environ["BATCH_SIZE"]), 1)
file = "/data.zip"
try:
single_statement_nodes = len(getenv("SINGLE_STATEMENT_NODES", "")) > 0
insert_edges = len(getenv("EDGE_INSERTS", "")) > 0
uuid_edges = len(getenv("UUID_EDGES", "")) > 0
with psycopg.connect() as connection:
for batch in read_zip(file, batch_size):
if interrupted:
break
batch_short = (line.replace(HEADER_SCHEMA, b"") for line in batch)
json_iter = (json.loads(line) for line in batch_short)
batch_iter = (line.replace(HEADER_SCHEMA, b"") for line in batch)
json_iter = (json.loads(line) for line in batch_iter)
nodes = [unpack_node(entry) for entry in json_iter]
with connection.cursor() as cursor:
with connection.cursor() as cursor, connection.transaction():
if single_statement_nodes:
cursor.execute(*format_nodes_single_statement(nodes))
else:
cursor.executemany(*format_node_query(nodes))
if len(getenv("EDGE_INSERTS", "")) > 0:
if len(getenv("UUID_EDGES", "")) > 0:
if insert_edges and uuid_edges:
cursor.executemany(*format_edge_uuid(nodes))
else:
elif insert_edges:
query, params = format_edge_id(nodes)
if len(params) > 0:
cursor.execute(query, params)
connection.commit()
finally:
connection.close()
......@@ -80,6 +80,6 @@ if __name__ == "__main__":
"env": {"BATCH_SIZE": 5000},
},
)
# environ["SINGLE_STATEMENT_NODES"]="SINGLE_STATEMENT_NODES"
for experiment in experiments:
run_experiment(**experiment)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment