Skip to content
Snippets Groups Projects
Select Git revision
  • 8f8ea03584a83e90247eb2ce542bc2234e718aec
  • main default protected
  • idacs_experiments
  • postgres_idacs
  • memgraph_fabian
  • postgres_julian
  • dev_ma
  • test_results
  • FZ_changed_code
  • FZ_Memgraph
10 results

sub_mem.py

Blame
  • sub_mem.py 10.19 KiB
    import paho.mqtt.client as mqtt
    import time
    import json
    from neo4j import GraphDatabase
    import os
    
    broker_hostname=str(os.getenv('mos_host',default="localhost"))
    broker_port = int(os.getenv('mos_port',default=1883)) 
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"memgraph")
    db_uri =str(os.getenv('mem_host',default="bolt://localhost:7687"))
    neo4j_auth=("","")
    abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
    create_indices = os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes']
    
    def flatten_obj(key, val, target):
        complexType = type(val)
        if val is None:
            return None
        elif complexType is dict:
            for k, v in val.items():
                flatten_obj(key + "_" + k, v, target)
        elif complexType is list:
            for i in range(len(val)):
                v = val[i]
                flatten_obj(key + "_" + str(i), v, target)
        elif complexType is object:
            for k, v in val.__dict__.items():
                flatten_obj(key + "_" + k, v, target)
        else:
            target[key] = val
    
    def parse_json_to_cypher(input) :
        jsonType = list(input['datum'].keys())[0]
        # short type string
        nodeType = jsonType.rsplit(".", 1)[1]
        # data of object
        value = input["datum"][jsonType]
    
        relations = dict(
         runsOn=""
        ,isGeneratedBy=""
        ,affects=""
        ,affects2=""
        ,residesOn=""
        ,isPartOf="" # not in data set
        ,hasOwningPricipal=""
        ,hasTag="" # not in data set
        ,hasParent=""
        ,hasAccountOn=""
        ,hasLocalPrincipal=""
        )
    
    
        # create relationships
        try: 
            if nodeType == 'Subject':
                if value['parentSubject'] != None:
                    relations.update({'hasParent':value['parentSubject']["com.bbn.tc.schema.avro.cdm18.UUID"]})
                if value['hostId'] != None:
                    relations.update({'runsOn':value['hostId']})
                if value['localPrincipal'] != None:
                    relations.update({'hasLocalPrincipal':value['localPrincipal']})
                # the relationship for subject -[affects]-> event is missing... probably implicit through is generated by
    
            if nodeType == 'FileObject':
                if value['baseObject'] != None:
                    relations.update({"residesOn":value['baseObject']['hostId']})
                # relations.update({"isPartOf":})
                if value['localPrincipal']:
                    relations.update({"hasOwningPrincipal":value['localPrincipal']})
                # relations.update({"hasTag":})
            
            # create relationships for host id 
            # mapping of cdm fields to relationships of nodes
            if nodeType == 'Event':
                # if value['hostId'] != None:
                #     relations.update({'runsOn':value['hostId']})
                if value['subject'] !=  None:
                    relations.update({'isGeneratedBy':value['subject']['com.bbn.tc.schema.avro.cdm18.UUID']})
                if value['predicateObject'] !=  None:
                    relations.update({'affects':value['predicateObject']['com.bbn.tc.schema.avro.cdm18.UUID']})
                if value['predicateObject2'] != None:
                    relations.update({'affects2':value['predicateObject2']['com.bbn.tc.schema.avro.cdm18.UUID']})
                    
            if nodeType == 'Principal':
                if value['hostId'] != None:
                    relations.update({'hasAccountOn':value['hostId']})
                
            if nodeType == 'UnnamedPipeObject':
                if value['sourceUUID'] != None:
                    relations.update({'affects':value['sourceUUID']['com.bbn.tc.schema.avro.cdm18.UUID']})
                if value['sinkUUID'] != None:
                    relations.update({'affects2':value["sinkUUID"]["com.bbn.tc.schema.avro.cdm18.UUID"]})
                if value['baseObject'] != None:
                    relations.update({'residesOn':value['baseObject']['hostId']})
    
            if nodeType == 'NetFlowObject':
                if value['baseObject'] != None:
                    relations.update({'residesOn':value['baseObject']['hostId']})
        
            if nodeType == 'SrcSinkObject':
                if value['baseObject'] != None:
                    relations.update({'residesOn':value['baseObject']['hostId']})
            
            q_rel = ''
            insert_strings = []
            # lookup existing nodes for relations  
            for rel in relations.keys(): 
                val = relations[rel]
                if val != '':
                    if rel =="residesOn":
                        s = f'''
                            WITH new
                            MATCH (existing:Host) where existing._uuid = "{val}" 
                            CREATE (new) -[:{rel}]-> (existing)
                            '''
                        insert_strings.append(s)
                        
                    if rel == "runsOn":
                        s = f'''
                            WITH new
                            MATCH (existing:Host) where existing._uuid = "{val}"
                            CREATE (new) -[:{rel}]-> (existing)
                            '''
                        insert_strings.append(s)
                    if rel =="isGeneratedBy":
                        s = f'''
                            WITH new
                            MATCH (existing:Subject) where existing._uuid = "{val}"
                            CREATE (new) -[:{rel}]-> (existing)
                            '''
                        insert_strings.append(s)
                    if rel =="hasOwningPrincipal":
                        s = f'''
                            WITH new
                            MATCH (existing:Principal) where existing._uuid = "{val}"
                            CREATE (new) -[:{rel}]-> (existing)
                            '''
                        insert_strings.append(s)
                    if rel =="affects":
                        s = f'''
                            WITH new
                            MATCH (existing) where existing._uuid = "{val}"
                            CREATE (new) -[:{rel}]-> (existing)
                        '''
                        insert_strings.append(s)
                    if rel == 'affects2':
                            s = f'''
                                WITH new
                                MATCH (existing) where existing._uuid = "{val}"
                                CREATE (new) -[:{rel}]-> (existing)
                            '''
                            insert_strings.append(s)            
                    # ... other relations for Object not in data
                    if rel =="hasParent":
                        s = f'''
                            WITH new
                            MATCH (existing) where existing._uuid = "{val}"
                            CREATE (new) -[:{rel}]-> (existing)
                        '''
                        insert_strings.append(s)      
                    if rel =='hasAccountOn':
                        s = f'''
                            WITH new
                            MATCH (existing) where existing._uuid = "{val}"
                            CREATE (new) -[:{rel}]-> (existing)
                        '''
                        insert_strings.append(s)  
    
                    if rel == 'hasLocalPrincipal':
                        s = f'''
                            WITH new
                            MATCH (existing) where existing._uuid = "{val}"
                            CREATE (new) -[:{rel}]-> (existing)
                        '''
                        insert_strings.append(s)  
            q_rel = "".join(insert_strings)
            value_flat = {}
            flatten_obj("",value,value_flat)
    
            query =  f"""
            CREATE (new:{nodeType} $attributes)
            {q_rel}
            RETURN new
            """
            return query,value_flat
        except:
            print('input: ', input)
            print('relations: ', relations)
    
    
    def create_cypher_query_from_cdm(json):
        '''
        Create Cypher Queries from publisher message
        ''' 
        query,value = parse_json_to_cypher(json)
        return query, value
    
    def on_message(client, userdata, message):
        '''
        The callback function for message listener 
        '''
        data = json.loads(message.payload.decode("utf-8"))
        print("Received message from: ",message.topic)
        q,attr = create_cypher_query_from_cdm(data)
        # print(q)
        execute_query(q,attr)
    
    def on_connect(client, userdata, flags, return_code):
        '''
        Connecting and subscribing to the Mosquitto topic
        '''
        if return_code == 0:
            print("connected")
            client.subscribe("neo4j",qos=1)
        else:
            print("could not connect, return code:", return_code)
            client.failed_connect = True
    
    def connect_to_db(uri,auth):
        '''
        Establish db connection to neo4j
        '''
        driver = GraphDatabase.driver(uri, auth=auth)
        with driver.session() as session:    
            print("Cleanup existing data...")
            session.run("MATCH (n) detach delete n")
            session.run("RETURN 1 as result")
            print("Successfully connected to DB...")
    
            # create indices here ....
            if (create_indices):
                session.run("CREATE INDEX ON :Subject(_uuid);")
                session.run("CREATE INDEX ON :Event(_uuid);")
                session.run("CREATE INDEX ON :Host(_uuid);")
                session.run("CREATE INDEX ON :FileObject(_uuid);")
                session.run("CREATE INDEX ON :NetFlowObject(_uuid);")
                session.run("CREATE INDEX ON :SrcSinkObject(_uuid);")
                session.run("CREATE INDEX ON :UnnamedPipeObject(_uuid);")
                session.run("CREATE INDEX ON :Principal(_uuid);")
    
        return driver
        
       
    def execute_query(query:str, attributes):
        '''
        Execute any Neo4j Query. 
        
        Expected Query Parameter: 
        query = Query String, 
        attributes = atttributes to be inserted
        ''' 
        with driver.session() as session:
            result = session.run(query,attributes=attributes)
            return result.data()
    
    driver = connect_to_db(db_uri,neo4j_auth)
    
    # client.username_pw_set(username="user_name", password="password") # uncomment if you use password auth
    client.on_connect = on_connect
    client.on_message = on_message
    client.failed_connect = False
    
    client.connect(broker_hostname, broker_port,keepalive=3600*4) 
    client.loop_start()
    
    # this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits   
    try:
        i = 0
        while i < abort_time_limit: #and client.failed_connect == False:
                time.sleep(1)
                i += 1
                if client.failed_connect == True:
                    print('Connection failed, exiting...')
    
    finally:
        client.disconnect()
        client.loop_stop()   
        driver.close()