This utility converts Parquet files to DeepLake format with embedding support, automatic schema parsing, and data validation.
- Automatic Schema Detection: Parses Spark-style schema files to configure column types
- Embedding Validation: Automatically filters out rows with invalid/missing embeddings
- Efficient Processing: Batch processing with memory optimization
- Flexible Input: Support for both folder-based and file-list based ingestion
- Index Creation: Automatic index creation for vector search and inverted text search
pip install deeplake pyarrow numpy tqdmProcess all .parquet files in a directory:
python main.py \
--schema schema.txt \
--source /path/to/parquet/folder \
--output s3://bucket/dataset \
--org-id=<org_id> --ds-name=<ds_name>python main.py \
--schema schema.txt \
--source s3://<bucket>/path \
--output s3://bucket/dataset \
--org-id=<org_id> --ds-name=<ds_name>Process a specific list of parquet files (space-separated):
python main.py \
--schema schema.txt \
--files /path/file1.parquet /path/file2.parquet /path/file3.parquet \
--output s3://bucket/dataset \
--org-id=<org_id> --ds-name=<ds_name>Important: Files must be space-separated, not comma-separated.
python main.py \
--schema schema.txt \
--source /path/to/parquet/folder \
--output s3://bucket/dataset \
--embedding title_emb \
--embedding-size 768 \
--inverted-column title1 \
--unique-id-column video_id \
--org-id=<org_id> --ds-name=<ds_name>| Argument | Required | Default | Description |
|---|---|---|---|
--schema |
Yes | - | Path to schema configuration file (Spark format) |
--source |
Conditional* | - | Source folder containing parquet files |
--files |
Conditional* | - | List of parquet file paths (space-separated) |
--output |
Yes | - | Output DeepLake dataset path (local or S3) |
--embedding |
No | title_emb |
Name of the embedding column |
--embedding-size |
No | 768 |
Dimension size of embedding vectors |
--inverted-column |
No | title1 |
String column to index with inverted index |
--unique-id-column |
No | video_id |
Column to use as unique identifier |
* Either --source or --files must be provided (not both)
The schema file should follow Spark's printSchema() output format:
root
|-- ucid: string (nullable = true)
|-- video_id: string (nullable = true)
|-- id: long (nullable = true)
|-- created_at: timestamp (nullable = true)
|-- title1: string (nullable = true)
|-- views: long (nullable = true)
|-- made_for_kids: boolean (nullable = true)
|-- cluster_index: integer (nullable = true)
|-- v7_average: float (nullable = true)
|-- performance_index: float (nullable = true)
|-- title_emb: array (nullable = true)
| |-- element: float (containsNull = true)
|-- power_keywords: array (nullable = true)
| |-- element: struct (containsNull = true)
- String:
string - Integer:
integer(32-bit) - Long:
long(64-bit) - Float:
float(32-bit) - Double:
double(64-bit) - Boolean:
boolean - Timestamp:
timestamp - Array:
array(simple arrays like embeddings) - Struct/Dict:
arraywith nestedstruct(complex nested data)
The converter automatically:
- Validates Embeddings: Rows with
None, empty, or invalid embeddings are filtered out - Maintains Alignment: All columns remain aligned after filtering
- Logs Filtering: Prints number of rows filtered per batch
- Skips Empty Batches: If all rows in a batch have invalid embeddings, the batch is skipped
- Schema Parsing: Reads and parses the schema file
- Dataset Creation: Creates DeepLake dataset with proper column types
- Data Ingestion:
- Processes embedding column first
- Identifies invalid rows
- Filters all columns to maintain alignment
- Appends valid data in batches
- Index Creation: Enables indexing mode and commits (separate step)
python main.py \
--schema schema.txt \
--source /data/parquet_files \
--output ./local_datasetpython main.py \
--schema schema.txt \
--source /data/parquet_files \
--output s3://my-bucket/my-dataset \
--embedding video_embedding \
--embedding-size 1024 \
--inverted-column description \
--unique-id-column content_idpython main.py \
--schema schema.txt \
--files \
/data/batch1/part-00001.parquet \
/data/batch1/part-00002.parquet \
/data/batch2/part-00003.parquet \
--output s3://my-bucket/incremental-datasetThe converter will display:
- Schema Summary: Column counts by type
- Processing Progress: File-by-file progress with tqdm
- Filtering Stats: Number of rows filtered due to invalid embeddings
- Batch Information: Size of each batch being appended
- Index Creation: Status of index building
Example output:
Parsed Schema Configuration:
String columns: 15
Integer columns: 2
Long columns: 5
Float columns: 3
Double columns: 1
Boolean columns: 1
Timestamp columns: 2
Array columns (simple): 1
Dict columns (complex): 1
Embedding column: title_emb
Embedding size: 768
Total columns: 31
Processing 3 parquet files
Converting files: 100%|████████████| 3/3 [01:23<00:00, 27.8s/it]
Indexing dataset...
Conversion completed successfully!
- Batch Size: Default 200,000 rows per batch (configurable in
SchemaConfig) - Memory Management: Automatic garbage collection after each batch
- Contiguous Arrays: Ensures numpy arrays are memory-contiguous for optimal performance
- Zero-Copy: Uses PyArrow zero-copy conversion when possible
You can also use the converter programmatically:
from parquet_to_deeplake import ParquetToDeepLakeConverter
from schema_parser import parse_schema_file
# Parse schema
config = parse_schema_file(
"schema.txt",
embedding_column="title_emb",
embedding_size=768,
inverted_column="title1",
unique_id_column="video_id"
)
# Create converter
converter = ParquetToDeepLakeConverter("s3://bucket/dataset", config)
# Option 1: Convert folder
converter.convert_folder("/path/to/folder")
# Option 2: Convert specific files
file_list = [
"/path/file1.parquet",
"/path/file2.parquet"
]
converter.convert_files(file_list)Solution: You must specify either --source (for folder) or --files (for file list), but not both.
Solution: Ensure files are space-separated, not comma-separated:
# Wrong
--files file1.parquet,file2.parquet
# Correct
--files file1.parquet file2.parquetSolution: Ensure the --embedding argument matches the column name in your schema file.
Solution: Check that your embedding column contains valid non-null vectors. Rows with None, empty, or invalid embeddings are automatically filtered.
- The converter commits after every batch for incremental progress tracking
- Indexing is enabled in a separate commit after all data is ingested
- Invalid embeddings (None, empty, or wrong type) cause the entire row to be filtered
- All numeric arrays are converted to contiguous memory layout for performance
- The unique ID column is indexed with exact matching for fast lookups
- The inverted column supports full-text search capabilities
main.py
├─ ParquetToDeepLakeConverter
│ ├─ DeepLakeManager (dataset creation/schema)
│ └─ ParquetProcessor (data processing)
│ ├─ _convert_arrow_array (type conversion + filtering)
│ ├─ _transform_batch_data (batch transformation)
│ └─ process_file (file iteration)
└─ parse_schema_file (schema parsing)
Creates a new DeepLake dataset from a folder of parquet files:
python3 main.py \
--schema=schema.txt \
--source=s3://path/to/short \
--output=s3://path/to/scripts/ds \
--org-id=scripts \
--ds-name=ds \
--num-workers=6When to use: First-time data load into a new dataset.
Adds additional data to an existing dataset without modifying existing records:
python3 main.py \
--schema=schema.txt \
--output=s3://path/to/scripts/ds \
--org-id=scripts \
--ds-name=ds \
--files \
/path/to/new/part-00001.parquet \
/path/to/new/part-00002.parquet \
/path/to/new/part-00003.parquetWhen to use: Adding new records to an existing dataset (no duplicates expected). Does not check for existing records.
Updates existing records or inserts new ones based on the unique ID column:
python3 main.py \
--schema=schema.txt \
--output=s3://path/to/scripts/ds \
--org-id=scripts \
--ds-name=ds \
--files \
/path/to/upsert_data/part-6fb66e36-5e67-42bc-89d4-7147c139a20d-1993-1-c000.snappy.parquet \
/path/to/upsert_data/part-6fb66e36-5e67-42bc-89d4-7147c139a20d-2003-1-c000.snappy.parquet \
/path/to/upsert_data/part-6fb66e36-5e67-42bc-89d4-7147c139a20d-2007-1-c000.snappy.parquet \
--upsertpython3 main.py \
--schema=schema.txt \
--output=s3://path/to/scripts/ds \
--org-id=scripts \
--ds-name=ds \
--files \
s3://data/short/part-00000-6fb66e36-5e67-42bc-89d4-7147c139a20d-1375-1-c000.snappy.parquet \
s3://data/short/part-00004-6fb66e36-5e67-42bc-89d4-7147c139a20d-1399-1-c000.snappy.parquet \
--upsertWhen to use: Updating existing records or handling incremental updates with potential duplicates.