@@ -85,16 +85,21 @@ def hash_edge(edge: EdgeDict) -> int:
8585 )
8686
8787
88+ def hash_result (result : ResultDict ) -> int :
89+ """Get a hash of a ResultDict instance."""
90+ return hash (
91+ tuple (
92+ (qnode_id , * (hash_node_binding (binding ) for binding in bindings ))
93+ for qnode_id , bindings in result ["node_bindings" ].items ()
94+ )
95+ )
96+
97+
8898@tracer .start_as_current_span ("merge_results" )
8999def merge_results (current : dict [int , ResultDict ], new : list [ResultDict ]) -> None :
90100 """Merge ResultDicts in a dict of results by hash."""
91101 for result in new :
92- key = hash (
93- tuple (
94- (qnode_id , * (hash_node_binding (binding ) for binding in bindings ))
95- for qnode_id , bindings in result ["node_bindings" ].items ()
96- )
97- )
102+ key = hash_result (result )
98103 if key not in current :
99104 current [key ] = result
100105 continue
@@ -151,9 +156,9 @@ def update_edge(edge: EdgeDict, new: EdgeDict) -> None:
151156 # Roll in upstream_resource_ids from new sources that overlap
152157 for source_id , source in old_sources .items ():
153158 if new_source := new_sources .get (source_id ):
154- update_retrieval_source ( source , new_source )
155- # Grab from new first to preserve merged upstream_resource_ids
156- new ["sources" ] = list ({** new_sources , ** old_sources }.values ())
159+ # Update new source so it overwrites the old source
160+ update_retrieval_source ( new_source , source )
161+ edge ["sources" ] = list ({** old_sources , ** new_sources }.values ())
157162
158163
159164def update_kgraph (kgraph : KnowledgeGraphDict , new : KnowledgeGraphDict ) -> None :
0 commit comments