Skip to content

Joydeep2005Banik/data-ingestion

Repository files navigation

AI-Powered Global Ontology Engine: Ingestion Layer

This implementation delivers only the ingestion system and Kafka foundation using the exact workflow:

External Sources -> API Collectors / Scrapers -> Raw Data Acquisition -> JSON Normalization -> Message Serialization -> Kafka Streaming Topic

What Is Implemented

  • External source connectors across domains:
    • collector-modules/economy.py (World Bank)
    • collector-modules/geopolitics.py (GDELT)
    • collector-modules/climate.py (Open-Meteo)
    • collector-modules/technology.py (HN Algolia)
    • collector-modules/society_scraper.py (UN RSS)
  • Raw data acquisition envelope with ingestion metadata (ingest_id, pipeline_version, collector_module, ingested_at)
  • Canonical JSON normalization in normalization/normalise.py
  • Serialization stage in serialization.py
  • Kafka streaming client in kafka_stream.py
  • Orchestrated end-to-end pipeline in ingestion_pipeline.py
  • Kafka container stack in docker-compose.yml

Kafka Topics

  • Raw replay/debug topic: ontology.intelligence.raw.v1
  • Normalized ingestion topic: ontology.intelligence.ingestion.v1

Run Locally

  1. Start Kafka:
docker compose up -d kafka kafdrop
  1. Install Python dependencies:
pip install -r requirements.txt
  1. Run one ingestion cycle:
python ingestion_pipeline.py

Environment Variables

  • KAFKA_BOOTSTRAP_SERVERS (default: localhost:9092)
  • KAFKA_RAW_TOPIC (default: ontology.intelligence.raw.v1)
  • KAFKA_TOPIC (default: ontology.intelligence.ingestion.v1)

Notes

  • Collectors are modular: any file in collector-modules/ exposing collect() is auto-discovered.
  • The pipeline currently runs in batch mode (run_once). You can schedule it for near-real-time operation (cron, Airflow, or streaming service) in next iterations.

About

Data ingestion for ontology engine

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors