123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import neo4j
- import psycopg2
- from neo4j import GraphDatabase
- # Neo4j connection details
- NEO4J_URI = "bolt://localhost:7687"
- NEO4J_USER = "neo4j"
- NEO4J_PASSWORD = "p@ssw0rd"
- # PostgreSQL connection details
- POSTGRESQL_HOST = "127.0.0.1"
- POSTGRESQL_DATABASE = "postgres"
- POSTGRESQL_USER = "postgres"
- POSTGRESQL_PASSWORD = "p@ssw0rd"
- def connect_to_neo4j():
- return GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
- def connect_to_postgresql():
- return psycopg2.connect(
- host=POSTGRESQL_HOST,
- dbname=POSTGRESQL_DATABASE,
- user=POSTGRESQL_USER,
- password=POSTGRESQL_PASSWORD
- )
- def upsert_node(pg_cursor, node):
- # Insert or update a node in the kg_nodes table
- pg_cursor.execute("""
- INSERT INTO public.kg_nodes (id, name, category)
- VALUES (DEFAULT, %s, %s)
- ON CONFLICT (name, category) DO UPDATE SET
- name = EXCLUDED.name,
- category = EXCLUDED.category
- RETURNING id;
- """, (node['name'], node['label']))
-
- node_id = pg_cursor.fetchone()[0]
-
- # Upsert properties into kg_props table
- for key, value in node.items():
- if key not in ['id', 'name', 'label']:
- pg_cursor.execute("""
- INSERT INTO public.kg_props (category, ref_id, prop_name, prop_value)
- VALUES (%s, %s, %s, %s)
- ON CONFLICT (ref_id, prop_name) DO UPDATE SET
- prop_value = EXCLUDED.prop_value;
- """, (1, node_id, key, value))
-
- return node_id
- def upsert_edge(pg_cursor, edge, src_id, dest_id):
- # Insert or update an edge in the kg_edges table
- pg_cursor.execute("""
- INSERT INTO public.kg_edges (id, category, src_id, dest_id, name)
- VALUES (DEFAULT, %s, %s, %s, %s)
- ON CONFLICT (src_id, dest_id, name) DO UPDATE SET
- name = EXCLUDED.name,
- category = EXCLUDED.category
- RETURNING id;
- """, (edge['type'], src_id, dest_id, edge['name']))
-
- edge_id = pg_cursor.fetchone()[0]
-
- # Upsert properties into kg_props table
- for key, value in edge.items():
- if key not in ['id', 'type', 'src_id', 'dest_id', 'name']:
- pg_cursor.execute("""
- INSERT INTO public.kg_props (category, ref_id, prop_name, prop_value)
- VALUES (%s, %s, %s, %s)
- ON CONFLICT (ref_id, prop_name) DO UPDATE SET
- prop_value = EXCLUDED.prop_value;
- """, (2, edge_id, key, value))
- def migrate_data():
- neo4j_driver = connect_to_neo4j()
- pg_conn = connect_to_postgresql()
- with pg_conn.cursor() as pg_cursor:
- with neo4j_driver.session() as session:
- # Fetch nodes and edges from Neo4j
- print(">>> data fetching...")
- result = session.run("""
- MATCH (n:Disease|Symptom|Drug|Check|Department|Food|Producer)-[r]->(m:Disease|Symptom|Drug|Check|Department|Food|Producer)
- RETURN n, labels(n) AS label, r, type(r) AS type, m, labels(m) AS label_m
- """)
- print(">>> data fetched")
- # Process each record
- nodes_dict = {}
- for record in result:
- print(">>> process record")
- # Process source node
- src_node_key = (record['n']['name'], tuple(sorted(record['label'])))
-
- print(">>> process record", src_node_key)
- if src_node_key not in nodes_dict:
- nodes_dict[src_node_key] = upsert_node(pg_cursor, {
- 'name': record['n']['name'],
- 'label': next((lbl for lbl in record['label'] if lbl in ['Disease', 'Symptom', 'Drug', 'Check', 'Department', 'Food', 'Producer']), ''),
- **record['n']._properties
- })
-
- # Process destination node
- dest_node_key = (record['m']['name'], tuple(sorted(record['label_m'])))
-
- print(">>> process record", dest_node_key)
- if dest_node_key not in nodes_dict:
- nodes_dict[dest_node_key] = upsert_node(pg_cursor, {
- 'name': record['m']['name'],
- 'label': next((lbl for lbl in record['label_m'] if lbl in ['Disease', 'Symptom', 'Drug', 'Check', 'Department', 'Food', 'Producer']), ''),
- **record['m']._properties
- })
- # Process edge
- upsert_edge(pg_cursor, {
- 'type': record['type'],
- 'name': record['r']['name'] if 'name' in record['r']._properties else record['type'],
- **record['r']._properties
- }, nodes_dict[src_node_key], nodes_dict[dest_node_key])
- pg_conn.commit()
- neo4j_driver.close()
- pg_conn.close()
- if __name__ == "__main__":
- migrate_data()
|