Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions chromadb/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,3 +861,25 @@ def detach_function(
bool: True if successful
"""
pass

@abstractmethod
def get_attached_function(
self,
attached_function_name: str,
tenant: str = DEFAULT_TENANT,
database: str = DEFAULT_DATABASE,
) -> "AttachedFunction":
"""Get metadata for a specific attached function.

Args:
attached_function_name: Name of the attached function to retrieve
tenant: The tenant name
database: The database name

Returns:
AttachedFunction: Object representing the attached function with metadata

Raises:
ValueError: If the attached function does not exist
"""
pass
97 changes: 92 additions & 5 deletions chromadb/api/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""FastAPI client implementation for Chroma API."""

import json
import orjson
import logging
from typing import Any, Dict, Optional, cast, Tuple, List
Expand Down Expand Up @@ -774,16 +777,43 @@ def attach_function(
},
)

# The response now contains a nested attached_function object
attached_func_data = resp_json["attached_function"]

# Parse timestamps from the response (Unix timestamps as strings)
from datetime import datetime

last_run = None
if attached_func_data.get("last_run"):
try:
# Convert Unix timestamp string to datetime
last_run = datetime.fromtimestamp(float(attached_func_data["last_run"]))
except (ValueError, TypeError):
last_run = None

next_run = None
if attached_func_data.get("next_run"):
try:
# Convert Unix timestamp string to datetime
next_run = datetime.fromtimestamp(float(attached_func_data["next_run"]))
except (ValueError, TypeError):
next_run = None

return AttachedFunction(
client=self,
id=UUID(resp_json["attached_function"]["id"]),
name=resp_json["attached_function"]["name"],
function_id=resp_json["attached_function"]["function_id"],
id=UUID(attached_func_data["id"]),
name=attached_func_data["name"],
function_id=attached_func_data[
"function_name"
], # Using function_name from the nested response
input_collection_id=input_collection_id,
output_collection=output_collection,
params=params,
output_collection=attached_func_data["output_collection"],
params=attached_func_data.get("params"),
tenant=tenant,
database=database,
last_run=last_run,
next_run=next_run,
global_function_parent=attached_func_data.get("global_function_parent"),
)

@trace_method("FastAPI.detach_function", OpenTelemetryGranularity.ALL)
Expand All @@ -804,3 +834,60 @@ def detach_function(
},
)
return cast(bool, resp_json["success"])

@trace_method("FastAPI.get_attached_function", OpenTelemetryGranularity.ALL)
@override
def get_attached_function(
self,
attached_function_name: str,
tenant: str = DEFAULT_TENANT,
database: str = DEFAULT_DATABASE,
) -> "AttachedFunction":
"""Get metadata for a specific attached function."""
resp_json = self._make_request(
"get",
f"/tenants/{tenant}/databases/{database}/attached_functions/{attached_function_name}",
)

# The response now contains a nested attached_function object
attached_func_data = resp_json["attached_function"]

# Parse timestamps from the response (Unix timestamps as strings)
from datetime import datetime

last_run = None
if attached_func_data.get("last_run"):
try:
# Convert Unix timestamp string to datetime
last_run = datetime.fromtimestamp(float(attached_func_data["last_run"]))
except (ValueError, TypeError):
last_run = None

next_run = None
if attached_func_data.get("next_run"):
try:
# Convert Unix timestamp string to datetime
next_run = datetime.fromtimestamp(float(attached_func_data["next_run"]))
except (ValueError, TypeError):
next_run = None

# Get the input collection by name to find its ID
input_collection_name = attached_func_data["input_collection"]
input_collection = self.get_collection(input_collection_name, tenant, database)

return AttachedFunction(
client=self,
id=UUID(attached_func_data["id"]),
name=attached_func_data["name"],
function_id=attached_func_data[
"function_name"
], # Using function_name from nested response
input_collection_id=input_collection.id,
output_collection=attached_func_data["output_collection"],
params=attached_func_data.get("params"),
tenant=tenant,
database=database,
last_run=last_run,
next_run=next_run,
global_function_parent=attached_func_data.get("global_function_parent"),
)
25 changes: 25 additions & 0 deletions chromadb/api/models/AttachedFunction.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, Optional, Dict, Any
from uuid import UUID
from datetime import datetime

if TYPE_CHECKING:
from chromadb.api import ServerAPI # noqa: F401
Expand All @@ -19,6 +20,9 @@ def __init__(
params: Optional[Dict[str, Any]],
tenant: str,
database: str,
last_run: Optional[datetime] = None,
next_run: Optional[datetime] = None,
global_function_parent: Optional[str] = None,
):
"""Initialize an AttachedFunction.

Expand All @@ -32,6 +36,9 @@ def __init__(
params: Function-specific parameters
tenant: The tenant name
database: The database name
last_run: Optional datetime of when the function last ran
next_run: Optional datetime of when the function is scheduled to run next
global_function_parent: Optional global function parent ID
"""
self._client = client
self._id = id
Expand All @@ -42,6 +49,9 @@ def __init__(
self._params = params
self._tenant = tenant
self._database = database
self._last_run = last_run
self._next_run = next_run
self._global_function_parent = global_function_parent

@property
def id(self) -> UUID:
Expand Down Expand Up @@ -73,6 +83,21 @@ def params(self) -> Optional[Dict[str, Any]]:
"""The function parameters."""
return self._params

@property
def last_run(self) -> Optional[datetime]:
"""The datetime when this function last ran."""
return self._last_run

@property
def next_run(self) -> Optional[datetime]:
"""The datetime when this function is scheduled to run next."""
return self._next_run

@property
def global_function_parent(self) -> Optional[str]:
"""The global function parent ID, if applicable."""
return self._global_function_parent

def detach(self, delete_output_collection: bool = False) -> bool:
"""Detach this function and prevent any further runs.

Expand Down
20 changes: 20 additions & 0 deletions chromadb/api/models/Collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,23 @@ def attach_function(
tenant=self.tenant,
database=self.database,
)

def get_attached_function(self, name: str) -> "AttachedFunction":
"""Get metadata for a specific attached function by name.

Args:
name: The name of the attached function to retrieve

Returns:
AttachedFunction: Object representing the attached function with metadata

Example:
>>> attached_fn = collection.get_attached_function("mycoll_stats_fn")
>>> print(f"Function ID: {attached_fn.function_id}")
>>> print(f"Last run: {attached_fn.last_run}")
"""
return self._client.get_attached_function(
attached_function_name=name,
tenant=self.tenant,
database=self.database,
)
13 changes: 13 additions & 0 deletions chromadb/api/rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,19 @@ def detach_function(
"The Rust bindings (embedded mode) do not support attached function operations."
)

@override
def get_attached_function(
self,
attached_function_name: str,
tenant: str = DEFAULT_TENANT,
database: str = DEFAULT_DATABASE,
) -> "AttachedFunction":
"""Attached functions are not supported in the Rust bindings (local embedded mode)."""
raise NotImplementedError(
"Attached functions are only supported when connecting to a Chroma server via HttpClient. "
"The Rust bindings (embedded mode) do not support attached function operations."
)

# TODO: Remove this if it's not planned to be used
@override
def get_user_identity(self) -> UserIdentity:
Expand Down
13 changes: 13 additions & 0 deletions chromadb/api/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,19 @@ def detach_function(
"The Segment API (embedded mode) does not support attached function operations."
)

@override
def get_attached_function(
self,
attached_function_name: str,
tenant: str = DEFAULT_TENANT,
database: str = DEFAULT_DATABASE,
) -> "AttachedFunction":
"""Attached functions are not supported in the Segment API (local embedded mode)."""
raise NotImplementedError(
"Attached functions are only supported when connecting to a Chroma server via HttpClient. "
"The Segment API (embedded mode) does not support attached function operations."
)

# TODO: This could potentially cause race conditions in a distributed version of the
# system, since the cache is only local.
# TODO: promote collection -> topic to a base class method so that it can be
Expand Down
60 changes: 60 additions & 0 deletions chromadb/test/distributed/test_task_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,63 @@ def test_function_remove_nonexistent(basic_http_client: System) -> None:
# Trying to detach this function again should raise NotFoundError
with pytest.raises(NotFoundError, match="does not exist"):
attached_fn.detach(delete_output_collection=True)


def test_attach_get_function_equality(basic_http_client: System) -> None:
"""Test that attach_function and get_attached_function return objects with equal structure and fields"""
client = ClientCreator.from_system(basic_http_client)
client.reset()

# Create a collection
collection = client.get_or_create_collection(name="test_equality_collection")
collection.add(
ids=["doc1", "doc2"], documents=["Test document 1", "Test document 2"]
)

# Attach a function
test_params = {"threshold": 100, "mode": "count"}
attached_from_attach = collection.attach_function(
name="equality_test_fn",
function_id="record_counter",
output_collection="equality_output",
params=test_params,
)

# Get the same function
attached_from_get = collection.get_attached_function("equality_test_fn")

# Verify both objects have the same structure and core fields
assert attached_from_attach.id == attached_from_get.id
assert attached_from_attach.name == attached_from_get.name
assert attached_from_attach.name == "equality_test_fn"
assert attached_from_attach.function_id == attached_from_get.function_id
# Note: function_id is now the human-readable function name ("record_counter")
# instead of the UUID, thanks to the new AttachedFunctionWithInfo approach
assert attached_from_attach.function_id == "record_counter"
assert (
attached_from_attach.input_collection_id
== attached_from_get.input_collection_id
)
assert attached_from_attach.input_collection_id == collection.id
assert attached_from_attach.output_collection == attached_from_get.output_collection
assert attached_from_attach.output_collection == "equality_output"
assert attached_from_attach.params == attached_from_get.params
assert (
attached_from_attach.global_function_parent
== attached_from_get.global_function_parent
)

# Both should have the timing fields (even if values differ)
assert hasattr(attached_from_attach, "last_run")
assert hasattr(attached_from_get, "last_run")
assert hasattr(attached_from_attach, "next_run")
assert hasattr(attached_from_get, "next_run")

# For a newly attached function, last_run should be None
assert attached_from_attach.last_run is None
# next_run should be set (current time for attach, actual time for get)
assert attached_from_attach.next_run is not None
assert attached_from_get.next_run is not None

# Clean up
attached_from_attach.detach(delete_output_collection=True)
Loading
Loading