Prefect flow to index documents from a postgres database in Elasticsearch for hetarchief.be.
This flow runs the following tasks:
- Retrieve a list of all indexes from the postgres view
- If there is a full sync:
- delete all indexes that are no longer present in the view
- retrieve the indexes ordered by size
- For each index (smallest first)
- create new index
- stream documents to the elasticsearch API
- replace old index by the new index by switching the alias and deleting the old index
- If it is not a full sync, stream documents to the elasticsearch API to be deleted or indexed
If an error occurs during streaming, the created indexes are rolled back.
The Flow's diagram:
The following Prefect Blocks need to be configured:
- location and credentials of the postgres database (type:
prefect_sqlalchemy.DatabaseCredentials) - location and credentials of the elasticsearch cluster (type:
prefect_meemoo.credentials.ElasticsearchCredentials)
The Prefect Flow requires setting the following parameters:
db_block_name: name of the database blockdb_table: name of the table or view where the index documents are storedes_block_name: name of the elasticsearch blockdb_column_es_id: name of the column that contains the document identifiers(default:"id")db_column_es_index: name of the column that contains the index alias (default:"index")or_ids_to_run: list of indexes that need to be included. Set toNonefor all indexes. (default:None)full_sync: set the sync to a full reload (default:False)db_batch_size: size of the database cursor (default:1000)es_chunk_size: elasticsearch chunk size (default:500)es_request_timeout: elasticsearch request timeout (default:30)es_max_retries: elasticsearch retries when a document failed to index (default:10)es_retry_on_timeout: retry indexing a document when at timeout (default:True)
