Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions blazegraph-migration/enrich_triples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ LOG_FILE="enrich_parallel_$(date +%Y%m%d_%H%M%S).log"
CURL_TIMEOUT=300
MAX_RETRIES=3
CHUNK_SIZE=50000
FAIL_LIMIT_PERCENT=1.0

log_message() {
local level="$1"
Expand Down Expand Up @@ -35,16 +34,33 @@ generate_enriched_chunks() {

log_message "INFO" "Streaming and enriching $input_file into chunked enriched files"

local total_input_triples
total_input_triples=$(gunzip -c "$input_file" | wc -l)
log_message "INFO" "Total triples to process: $total_input_triples"

local chunk_count=0
local line_count=0
local processed_input_triples=0
local enriched_chunk_file="$chunk_dir/enriched_chunk_$(printf "%05d" $chunk_count).nq"
local bar_width=50

gunzip -c "$input_file" | while IFS= read -r line || [[ -n "$line" ]]; do
graph_iri=$(echo "$line" | awk '{print $4}' | sed 's/[<>]//g')
graph_iri=$(awk '
{
in_string = 0
graph = ""
for (i = 1; i <= NF; i++) {
if ($i ~ /^"/) in_string = 1
if (in_string && $i ~ /"$/ && substr($i, length($i), 1) != "\\") in_string = 0
if (!in_string && $i ~ /^<.*>$/) graph = $i
}
gsub(/[<>]/, "", graph)
print graph
}' <<< "$line")

IFS='/' read -ra parts <<< "$graph_iri"
if [[ ${#parts[@]} -ge 5 ]]; then
network_prefix="${parts[0]}" # did:dkg:otp:2043
network_prefix="${parts[0]}"
contract="${parts[1]}"
collection_id="${parts[2]}"
asset_id="${parts[3]}"
Expand All @@ -61,17 +77,34 @@ generate_enriched_chunks() {
((line_count+=3))
fi

((processed_input_triples++))

if (( line_count >= CHUNK_SIZE * 3 )); then
((chunk_count++))
enriched_chunk_file="$chunk_dir/enriched_chunk_$(printf "%05d" $chunk_count).nq"
line_count=0
fi


if (( processed_input_triples % 1000 == 0 )); then
local progress=$(( (processed_input_triples * bar_width) / total_input_triples ))
local percent=$(( (processed_input_triples * 100) / total_input_triples ))
local progress_bar=$(printf "%-${bar_width}s" "#" | tr ' ' '#')
printf "\r[%-${bar_width}.${progress}s] %d%% (%d/%d triples processed)\n" "$progress_bar" "$percent" "$processed_input_triples" "$total_input_triples"
fi
done

echo "$((chunk_count + 1))" > "$input_dir/.total_chunks_created"
log_message "INFO" "Enriched chunks generated: $((chunk_count + 1))"
echo

enriched_chunk_files=("$chunk_dir"/enriched_chunk_*.nq)
total_enriched_chunks=${#enriched_chunk_files[@]}

echo "$total_enriched_chunks" > "$input_dir/.total_chunks_created"
log_message "INFO" "Enriched chunks generated: $total_enriched_chunks"
}



process_chunk() {
local chunk_file="$1"
local chunk_num="$2"
Expand Down Expand Up @@ -170,6 +203,22 @@ for folder in "${target_folders[@]}"; do
log_message "INFO" "Processing folder: $folder"

done_marker=".enrichment_migration_done_$(basename "$folder")"
chunk_dir="$folder/chunks"

retry_mode=false
if [ -f "$done_marker" ]; then
marker_val=$(cat "$done_marker")
if [ "$marker_val" == "0" ]; then
shopt -s nullglob
remaining_chunks=("$chunk_dir"/enriched_chunk_*.nq)
shopt -u nullglob
if [ ${#remaining_chunks[@]} -gt 0 ]; then
retry_mode=true
log_message "INFO" "Retry mode enabled — retrying failed chunks"
process_chunks "$folder"
fi
fi
fi

if [ ! -f "$folder/$folder/data.nq.gz" ]; then
log_message "ERROR" "Missing file: $folder/$folder/data.nq.gz"
Expand All @@ -178,37 +227,25 @@ for folder in "${target_folders[@]}"; do
continue
fi

generate_enriched_chunks "$folder"
process_chunks "$folder"

total_chunks=$(cat "$folder/.total_chunks_created" 2>/dev/null)
rm -f "$folder/.total_chunks_created"
if [ "$retry_mode" = false ]; then
generate_enriched_chunks "$folder"
process_chunks "$folder"
fi

shopt -s nullglob
failed_chunks=("$folder/chunks"/enriched_chunk_*.nq)
remaining_failed_chunks=("$chunk_dir"/enriched_chunk_*.nq)
shopt -u nullglob
failed_count=${#failed_chunks[@]}

if (( total_chunks == 0 )); then
log_message "WARN" "No enriched chunks were generated for $folder"
echo "0" > "$done_marker"
overall_success=0
continue
fi

fail_ratio=$(awk "BEGIN { printf \"%.2f\", ($failed_count / $total_chunks) * 100 }")
remaining_failed_count=${#remaining_failed_chunks[@]}

if (( $(awk "BEGIN { print ($fail_ratio > $FAIL_LIMIT_PERCENT) }") )); then
log_message "ERROR" "$failed_count/$total_chunks chunks failed (${fail_ratio}%) — above threshold"
if (( remaining_failed_count > 0 )); then
log_message "ERROR" "$remaining_failed_count chunks still remain after retries"
echo "0" > "$done_marker"
overall_success=0
else
if (( failed_count > 0 )); then
log_message "WARN" "$failed_count/$total_chunks chunks failed (${fail_ratio}%), but within allowed $FAIL_LIMIT_PERCENT%"
fi
echo "1" > "$done_marker"
log_message "INFO" "Migration success marker written to $done_marker"
fi

done

job_pool_shutdown
Expand Down
13 changes: 12 additions & 1 deletion blazegraph-migration/paranet_migration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,18 @@ generate_enriched_chunks() {
gunzip -c "$input_file" | while IFS= read -r line || [[ -n "$line" ]]; do
echo "$line" >> "$chunk_file"

named_graph=$(echo "$line" | awk '{print $4}' | sed 's/[<>]//g')
named_graph=$(awk '
{
in_string = 0
graph = ""
for (i = 1; i <= NF; i++) {
if ($i ~ /^"/) in_string = 1
if (in_string && $i ~ /"$/ && substr($i, length($i), 1) != "\\") in_string = 0
if (!in_string && $i ~ /^<.*>$/) graph = $i
}
gsub(/[<>]/, "", graph)
print graph
}' <<< "$line")

echo "<${ual}> <https://ontology.origintrail.io/dkg/1.0#hasNamedGraph> <${named_graph}> <${ual}> ." >> "$chunk_file"

Expand Down
Loading