Skip to content
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7bc6fb4
Working version.
Xiao-zhen-Liu Jan 20, 2025
9f5427c
Merge branch 'refs/heads/master' into xiaozhen-add-python-storage
Xiao-zhen-Liu Jan 20, 2025
8d2493a
Refactoring of serde.
Xiao-zhen-Liu Jan 20, 2025
bcf42ea
Merge branch 'refs/heads/master' into xiaozhen-add-python-storage
Xiao-zhen-Liu Jan 20, 2025
1f0e7dc
Refactoring.
Xiao-zhen-Liu Jan 20, 2025
2bd1e95
Add VFS URI Factory; modify retry config.
Xiao-zhen-Liu Jan 22, 2025
b4f7bb7
Merge branch 'refs/heads/master' into xiaozhen-add-python-storage
Xiao-zhen-Liu Jan 22, 2025
230a658
Add document factory.
Xiao-zhen-Liu Jan 23, 2025
b3f2533
Add rest catalog.
Xiao-zhen-Liu Jan 23, 2025
b1c31af
Add rest catalog.
Xiao-zhen-Liu Jan 23, 2025
1e5a493
Merge branch 'master' into xiaozhen-add-python-storage
bobbai00 Jan 23, 2025
879a98f
Merge remote-tracking branch 'origin/xiaozhen-add-python-storage' int…
Xiao-zhen-Liu Jan 23, 2025
07f3bb2
Temporary fix.
Xiao-zhen-Liu Jan 23, 2025
8bc6050
fix port uri.
Xiao-zhen-Liu Jan 24, 2025
16ed509
fix read from java; use sql catalog.
Xiao-zhen-Liu Jan 25, 2025
b040eaa
Merge branch 'refs/heads/master' into xiaozhen-add-python-storage
Xiao-zhen-Liu Jan 27, 2025
0e77b85
Fix file_sequence_number retrieval logic.
Xiao-zhen-Liu Jan 27, 2025
45cf549
Merge branch 'refs/heads/master' into xiaozhen-add-python-storage
Xiao-zhen-Liu Jan 27, 2025
edbb20e
Improve retry.
Xiao-zhen-Liu Jan 28, 2025
00500ec
env setup.
Xiao-zhen-Liu Jan 29, 2025
41759f7
env setup.
Xiao-zhen-Liu Jan 29, 2025
d1db92a
fix lint and fmt; fix path.
Xiao-zhen-Liu Jan 29, 2025
d3312eb
fix requirements.txt
Xiao-zhen-Liu Jan 29, 2025
42622e2
fix postgres config for test.
Xiao-zhen-Liu Jan 29, 2025
433a545
fix postgres config for test.
Xiao-zhen-Liu Jan 29, 2025
45f92d3
create namespace if not exists.
Xiao-zhen-Liu Jan 29, 2025
27a515e
fix test.
Xiao-zhen-Liu Jan 29, 2025
d7a70a2
add psql script.
Xiao-zhen-Liu Jan 29, 2025
7164476
fix fmt.
Xiao-zhen-Liu Jan 29, 2025
4240577
fix fmt.
Xiao-zhen-Liu Jan 29, 2025
3077263
add storage config.
Xiao-zhen-Liu Jan 29, 2025
c77c238
Merge branch 'master' into xiaozhen-add-python-storage
Xiao-zhen-Liu Jan 29, 2025
3717520
refactoring.
Xiao-zhen-Liu Jan 29, 2025
2231b9e
refactoring.
Xiao-zhen-Liu Jan 29, 2025
f795ce3
use wait random exponential for concurrent writers.
Xiao-zhen-Liu Jan 29, 2025
d768921
refactorings and documentation.
Xiao-zhen-Liu Jan 30, 2025
c3b2fa3
minor update.
Xiao-zhen-Liu Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/github-action-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ jobs:
if [ -f core/amber/requirements.txt ]; then pip install -r core/amber/requirements.txt; fi
if [ -f core/amber/r-requirements.txt ]; then pip install -r core/amber/r-requirements.txt; fi
if [ -f core/amber/operator-requirements.txt ]; then pip install -r core/amber/operator-requirements.txt; fi
- name: Install PostgreSQL
run: sudo apt-get update && sudo apt-get install -y postgresql
- name: Start PostgreSQL Service
run: sudo systemctl start postgresql
- name: Create Database and User
run: |
cd core/scripts/sql && sudo -u postgres psql -f iceberg_postgres_catalog.sql
- name: Lint with flake8 and black
run: |
cd core/amber/src/main/python && flake8 && black . --check
Expand Down
7 changes: 6 additions & 1 deletion core/amber/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ bidict==0.22.0
cached_property==1.5.2
psutil==5.9.0
transformers==4.44.2
tzlocal==2.1
tzlocal==2.1
pyiceberg==0.8.1
readerwriterlock==1.0.9
tenacity==8.5.0
SQLAlchemy==2.0.37
psycopg2==2.9.10
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,18 @@ class AttributeType(Enum):
AttributeType.DOUBLE: pa.float64(),
AttributeType.BOOL: pa.bool_(),
AttributeType.BINARY: pa.binary(),
AttributeType.TIMESTAMP: pa.timestamp("ms", tz="UTC"),
AttributeType.TIMESTAMP: pa.timestamp("us"),
}

FROM_ARROW_MAPPING = {
lib.Type_INT32: AttributeType.INT,
lib.Type_INT64: AttributeType.LONG,
lib.Type_STRING: AttributeType.STRING,
lib.Type_LARGE_STRING: AttributeType.STRING,
lib.Type_DOUBLE: AttributeType.DOUBLE,
lib.Type_BOOL: AttributeType.BOOL,
lib.Type_BINARY: AttributeType.BINARY,
lib.Type_LARGE_BINARY: AttributeType.BINARY,
lib.Type_TIMESTAMP: AttributeType.TIMESTAMP,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def arrow_schema(self):
pa.field("field-3", pa.int64()),
pa.field("field-4", pa.float64()),
pa.field("field-5", pa.bool_()),
pa.field("field-6", pa.timestamp("ms", tz="UTC")),
pa.field("field-6", pa.timestamp("us")),
pa.field("field-7", pa.binary()),
]
)
Expand Down
Empty file.
103 changes: 103 additions & 0 deletions core/amber/src/main/python/core/storage/document_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from urllib.parse import urlparse

from typing import Optional

from core.models import Schema, Tuple
from core.storage.iceberg.iceberg_catalog_instance import IcebergCatalogInstance
from core.storage.iceberg.iceberg_document import IcebergDocument
from core.storage.iceberg.iceberg_utils import (
create_table,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
load_table_metadata,
)
from core.storage.model.virtual_document import VirtualDocument
from core.storage.storage_config import StorageConfig
from core.storage.vfs_uri_factory import VFSURIFactory, VFSResourceType


class DocumentFactory:
"""
Factory class to create and open documents.
Currently only iceberg documents are supported.
"""

ICEBERG = "iceberg"

@staticmethod
def sanitize_uri_path(uri):
return uri.path.lstrip("/").replace("/", "_")

@staticmethod
def create_document(uri: str, schema: Schema) -> VirtualDocument:
parsed_uri = urlparse(uri)
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {
VFSResourceType.RESULT,
VFSResourceType.MATERIALIZED_RESULT,
}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

iceberg_schema = Schema.as_arrow_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_NAMESPACE,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
StorageConfig.ICEBERG_TABLE_NAMESPACE,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
else:
raise ValueError(f"Resource type {resource_type} is not supported")
else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document"
)

@staticmethod
def open_document(uri: str) -> (VirtualDocument, Optional[Schema]):
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "vfs":
_, _, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {
VFSResourceType.RESULT,
VFSResourceType.MATERIALIZED_RESULT,
}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_NAMESPACE,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
StorageConfig.ICEBERG_TABLE_NAMESPACE,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema
else:
raise ValueError(f"Resource type {resource_type} is not supported")
else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document"
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from pyiceberg.catalog import Catalog
from typing import Optional

from core.storage.iceberg.iceberg_utils import create_postgres_catalog
from core.storage.storage_config import StorageConfig


class IcebergCatalogInstance:
"""
IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
Currently only postgres SQL catalog is supported.
- Provides a single shared catalog for all Iceberg table-related operations.
- Lazily initializes the catalog on first access.
- Supports replacing the catalog instance for testing or reconfiguration.
"""

_instance: Optional[Catalog] = None

@classmethod
def get_instance(cls):
"""
Retrieves the singleton Iceberg catalog instance.
- If the catalog is not initialized, it is lazily created using the configured
properties.
:return: the Iceberg catalog instance.
"""
if cls._instance is None:
cls._instance = create_postgres_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
)
return cls._instance

@classmethod
def replace_instance(cls, catalog: Catalog):
"""
Replaces the existing Iceberg catalog instance.
- This method is useful for testing or dynamically updating the catalog.
:param catalog: the new Iceberg catalog instance to replace the current one.
"""
cls._instance = catalog
Loading
Loading