mirge.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import neo4j
  2. import psycopg2
  3. from neo4j import GraphDatabase
  4. # Neo4j connection details
  5. NEO4J_URI = "bolt://localhost:7687"
  6. NEO4J_USER = "neo4j"
  7. NEO4J_PASSWORD = "p@ssw0rd"
  8. # PostgreSQL connection details
  9. POSTGRESQL_HOST = "127.0.0.1"
  10. POSTGRESQL_DATABASE = "postgres"
  11. POSTGRESQL_USER = "postgres"
  12. POSTGRESQL_PASSWORD = "p@ssw0rd"
  13. def connect_to_neo4j():
  14. return GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
  15. def connect_to_postgresql():
  16. return psycopg2.connect(
  17. host=POSTGRESQL_HOST,
  18. dbname=POSTGRESQL_DATABASE,
  19. user=POSTGRESQL_USER,
  20. password=POSTGRESQL_PASSWORD
  21. )
  22. def upsert_node(pg_cursor, node):
  23. # Insert or update a node in the kg_nodes table
  24. pg_cursor.execute("""
  25. INSERT INTO public.kg_nodes (id, name, category)
  26. VALUES (DEFAULT, %s, %s)
  27. ON CONFLICT (name, category) DO UPDATE SET
  28. name = EXCLUDED.name,
  29. category = EXCLUDED.category
  30. RETURNING id;
  31. """, (node['name'], node['label']))
  32. node_id = pg_cursor.fetchone()[0]
  33. # Upsert properties into kg_props table
  34. for key, value in node.items():
  35. if key not in ['id', 'name', 'label']:
  36. pg_cursor.execute("""
  37. INSERT INTO public.kg_props (category, ref_id, prop_name, prop_value)
  38. VALUES (%s, %s, %s, %s)
  39. ON CONFLICT (ref_id, prop_name) DO UPDATE SET
  40. prop_value = EXCLUDED.prop_value;
  41. """, (1, node_id, key, value))
  42. return node_id
  43. def upsert_edge(pg_cursor, edge, src_id, dest_id):
  44. # Insert or update an edge in the kg_edges table
  45. pg_cursor.execute("""
  46. INSERT INTO public.kg_edges (id, category, src_id, dest_id, name)
  47. VALUES (DEFAULT, %s, %s, %s, %s)
  48. ON CONFLICT (src_id, dest_id, name) DO UPDATE SET
  49. name = EXCLUDED.name,
  50. category = EXCLUDED.category
  51. RETURNING id;
  52. """, (edge['type'], src_id, dest_id, edge['name']))
  53. edge_id = pg_cursor.fetchone()[0]
  54. # Upsert properties into kg_props table
  55. for key, value in edge.items():
  56. if key not in ['id', 'type', 'src_id', 'dest_id', 'name']:
  57. pg_cursor.execute("""
  58. INSERT INTO public.kg_props (category, ref_id, prop_name, prop_value)
  59. VALUES (%s, %s, %s, %s)
  60. ON CONFLICT (ref_id, prop_name) DO UPDATE SET
  61. prop_value = EXCLUDED.prop_value;
  62. """, (2, edge_id, key, value))
  63. def migrate_data():
  64. neo4j_driver = connect_to_neo4j()
  65. pg_conn = connect_to_postgresql()
  66. with pg_conn.cursor() as pg_cursor:
  67. with neo4j_driver.session() as session:
  68. # Fetch nodes and edges from Neo4j
  69. print(">>> data fetching...")
  70. result = session.run("""
  71. MATCH (n:Disease|Symptom|Drug|Check|Department|Food|Producer)-[r]->(m:Disease|Symptom|Drug|Check|Department|Food|Producer)
  72. RETURN n, labels(n) AS label, r, type(r) AS type, m, labels(m) AS label_m
  73. """)
  74. print(">>> data fetched")
  75. # Process each record
  76. nodes_dict = {}
  77. for record in result:
  78. print(">>> process record")
  79. # Process source node
  80. src_node_key = (record['n']['name'], tuple(sorted(record['label'])))
  81. print(">>> process record", src_node_key)
  82. if src_node_key not in nodes_dict:
  83. nodes_dict[src_node_key] = upsert_node(pg_cursor, {
  84. 'name': record['n']['name'],
  85. 'label': next((lbl for lbl in record['label'] if lbl in ['Disease', 'Symptom', 'Drug', 'Check', 'Department', 'Food', 'Producer']), ''),
  86. **record['n']._properties
  87. })
  88. # Process destination node
  89. dest_node_key = (record['m']['name'], tuple(sorted(record['label_m'])))
  90. print(">>> process record", dest_node_key)
  91. if dest_node_key not in nodes_dict:
  92. nodes_dict[dest_node_key] = upsert_node(pg_cursor, {
  93. 'name': record['m']['name'],
  94. 'label': next((lbl for lbl in record['label_m'] if lbl in ['Disease', 'Symptom', 'Drug', 'Check', 'Department', 'Food', 'Producer']), ''),
  95. **record['m']._properties
  96. })
  97. # Process edge
  98. upsert_edge(pg_cursor, {
  99. 'type': record['type'],
  100. 'name': record['r']['name'] if 'name' in record['r']._properties else record['type'],
  101. **record['r']._properties
  102. }, nodes_dict[src_node_key], nodes_dict[dest_node_key])
  103. pg_conn.commit()
  104. neo4j_driver.close()
  105. pg_conn.close()
  106. if __name__ == "__main__":
  107. migrate_data()