import neo4j import psycopg2 from neo4j import GraphDatabase # Neo4j connection details NEO4J_URI = "bolt://localhost:7687" NEO4J_USER = "neo4j" NEO4J_PASSWORD = "p@ssw0rd" # PostgreSQL connection details POSTGRESQL_HOST = "127.0.0.1" POSTGRESQL_DATABASE = "postgres" POSTGRESQL_USER = "postgres" POSTGRESQL_PASSWORD = "p@ssw0rd" def connect_to_neo4j(): return GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) def connect_to_postgresql(): return psycopg2.connect( host=POSTGRESQL_HOST, dbname=POSTGRESQL_DATABASE, user=POSTGRESQL_USER, password=POSTGRESQL_PASSWORD ) def upsert_node(pg_cursor, node): # Insert or update a node in the kg_nodes table pg_cursor.execute(""" INSERT INTO public.kg_nodes (id, name, category) VALUES (DEFAULT, %s, %s) ON CONFLICT (name, category) DO UPDATE SET name = EXCLUDED.name, category = EXCLUDED.category RETURNING id; """, (node['name'], node['label'])) node_id = pg_cursor.fetchone()[0] # Upsert properties into kg_props table for key, value in node.items(): if key not in ['id', 'name', 'label']: pg_cursor.execute(""" INSERT INTO public.kg_props (category, ref_id, prop_name, prop_value) VALUES (%s, %s, %s, %s) ON CONFLICT (ref_id, prop_name) DO UPDATE SET prop_value = EXCLUDED.prop_value; """, (1, node_id, key, value)) return node_id def upsert_edge(pg_cursor, edge, src_id, dest_id): # Insert or update an edge in the kg_edges table pg_cursor.execute(""" INSERT INTO public.kg_edges (id, category, src_id, dest_id, name) VALUES (DEFAULT, %s, %s, %s, %s) ON CONFLICT (src_id, dest_id, name) DO UPDATE SET name = EXCLUDED.name, category = EXCLUDED.category RETURNING id; """, (edge['type'], src_id, dest_id, edge['name'])) edge_id = pg_cursor.fetchone()[0] # Upsert properties into kg_props table for key, value in edge.items(): if key not in ['id', 'type', 'src_id', 'dest_id', 'name']: pg_cursor.execute(""" INSERT INTO public.kg_props (category, ref_id, prop_name, prop_value) VALUES (%s, %s, %s, %s) ON CONFLICT (ref_id, prop_name) DO UPDATE SET prop_value = EXCLUDED.prop_value; """, (2, edge_id, key, value)) def migrate_data(): neo4j_driver = connect_to_neo4j() pg_conn = connect_to_postgresql() with pg_conn.cursor() as pg_cursor: with neo4j_driver.session() as session: # Fetch nodes and edges from Neo4j print(">>> data fetching...") result = session.run(""" MATCH (n:Disease|Symptom|Drug|Check|Department|Food|Producer)-[r]->(m:Disease|Symptom|Drug|Check|Department|Food|Producer) RETURN n, labels(n) AS label, r, type(r) AS type, m, labels(m) AS label_m """) print(">>> data fetched") # Process each record nodes_dict = {} for record in result: print(">>> process record") # Process source node src_node_key = (record['n']['name'], tuple(sorted(record['label']))) print(">>> process record", src_node_key) if src_node_key not in nodes_dict: nodes_dict[src_node_key] = upsert_node(pg_cursor, { 'name': record['n']['name'], 'label': next((lbl for lbl in record['label'] if lbl in ['Disease', 'Symptom', 'Drug', 'Check', 'Department', 'Food', 'Producer']), ''), **record['n']._properties }) # Process destination node dest_node_key = (record['m']['name'], tuple(sorted(record['label_m']))) print(">>> process record", dest_node_key) if dest_node_key not in nodes_dict: nodes_dict[dest_node_key] = upsert_node(pg_cursor, { 'name': record['m']['name'], 'label': next((lbl for lbl in record['label_m'] if lbl in ['Disease', 'Symptom', 'Drug', 'Check', 'Department', 'Food', 'Producer']), ''), **record['m']._properties }) # Process edge upsert_edge(pg_cursor, { 'type': record['type'], 'name': record['r']['name'] if 'name' in record['r']._properties else record['type'], **record['r']._properties }, nodes_dict[src_node_key], nodes_dict[dest_node_key]) pg_conn.commit() neo4j_driver.close() pg_conn.close() if __name__ == "__main__": migrate_data()