Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f494b52
feature:read_many_api first iteration
dibahlfi Jul 13, 2025
1d29d85
read_many_items - adding logic for chunking/semaphores
dibahlfi Jul 13, 2025
b73001f
read_many_items - code refactor
dibahlfi Jul 14, 2025
00672ca
read_many_items - created a new helper class for chunking/concurrency
dibahlfi Jul 15, 2025
2e8f35b
fix: adding test cases
dibahlfi Jul 21, 2025
91529c2
read_many_items - refactoring
dibahlfi Jul 22, 2025
c7d79e3
read_many_items - refactoring
dibahlfi Jul 22, 2025
9be4c7d
read_many_items - clean up
dibahlfi Jul 22, 2025
3f90167
Update sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
dibahlfi Jul 22, 2025
f9367d7
Update sdk/cosmos/azure-cosmos/tests/test_read_many_items_partition_s…
dibahlfi Jul 22, 2025
685084e
read_many_items - adding code for the sync flow.
dibahlfi Jul 25, 2025
d8005e3
Merge branch 'users/dibahl/readmanyitems_api' of https://github.com/A…
dibahlfi Jul 25, 2025
b6fe1bc
resolving conflicts
dibahlfi Jul 25, 2025
488d8b7
fix: addressing comments
dibahlfi Jul 28, 2025
d4b66e8
fix: add support for aggregated request charges in the header
dibahlfi Jul 29, 2025
03e0ff8
fix: fixing typos
dibahlfi Jul 29, 2025
1cebd0a
fix: fixing split tests
dibahlfi Jul 30, 2025
d682db1
fix: fixing linting issues
dibahlfi Jul 30, 2025
3ac03b5
fix: addressing comments
dibahlfi Jul 31, 2025
f750525
fix: linting errors
dibahlfi Jul 31, 2025
3e780f8
fix: linting errors
dibahlfi Jul 31, 2025
a9c8d5f
fix: adding order
dibahlfi Aug 1, 2025
285cbf2
fix: cleaning up
dibahlfi Aug 1, 2025
0b704d6
fix: cleaning up
dibahlfi Aug 2, 2025
f59a12b
fix: cleaning up
dibahlfi Aug 2, 2025
c13d8eb
fix: bug fixing in the chunking logic
dibahlfi Aug 5, 2025
396100b
fix: addressing comments
dibahlfi Aug 11, 2025
4c763ae
fixing pylink comments
dibahlfi Aug 14, 2025
dd3da39
resolving conflict in CHANGELOG.md
dibahlfi Aug 14, 2025
7ac6bb1
fixing pylint errors
dibahlfi Aug 15, 2025
750b0e6
fix: adding samples
dibahlfi Aug 17, 2025
7afdadf
fix: fixing tests
dibahlfi Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.14.0b3 (Unreleased)

#### Features Added
* Added read_items API to provide an efficient method for retrieving multiple items in a single request. See [PR 42167](https://github.com/Azure/azure-sdk-for-python/pull/42167).

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import os
import urllib.parse
import uuid
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast

from typing_extensions import TypedDict
from urllib3.util.retry import Retry

Expand Down Expand Up @@ -65,6 +67,7 @@
from ._cosmos_http_logging_policy import CosmosHttpLoggingPolicy
from ._cosmos_responses import CosmosDict, CosmosList
from ._range_partition_resolver import RangePartitionResolver
from ._read_items_helper import ReadItemsHelperSync
from ._request_object import RequestObject
from ._retry_utility import ConnectionRetryPolicy
from ._routing import routing_map_provider, routing_range
Expand Down Expand Up @@ -1039,6 +1042,49 @@ def DeletePermission(
self.DeleteResource(path, http_constants.ResourceType.Permission, permission_id, None, options,
**kwargs)

def read_items(
self,
collection_link: str,
items: Sequence[Tuple[str, _PartitionKeyType]],
options: Optional[Mapping[str, Any]] = None,
*,
executor: Optional[ThreadPoolExecutor] = None,
**kwargs: Any
) -> CosmosList:
"""Reads many items.

:param str collection_link: The link to the document collection.
:param items: A list of tuples, where each tuple contains an item's ID and partition key.
:type items: Sequence[Tuple[str, _PartitionKeyType]]
:param dict options: The request options for the request.
:keyword executor: Optional ThreadPoolExecutor for thread management
:paramtype executor: Optional[ThreadPoolExecutor]
:return: The list of read items.
:rtype: CosmosList
"""
if options is None:
options = {}
if not items:
return CosmosList([], response_headers=CaseInsensitiveDict())

partition_key_definition = self._get_partition_key_definition(collection_link, options)
if not partition_key_definition:
raise ValueError("Could not find partition key definition for collection.")

# Extract and remove max_concurrency from kwargs
max_concurrency = kwargs.pop('max_concurrency', 10)

helper = ReadItemsHelperSync(
client=self,
collection_link=collection_link,
items=items,
options=options,
partition_key_definition=partition_key_definition,
executor=executor,
max_concurrency=max_concurrency,
**kwargs)
return helper.read_items()

def ReadItems(
self,
collection_link: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ def _fetch_items_helper_no_retries(self, fetch_function):
fetched_items = []
new_options = copy.deepcopy(self._options)
while self._continuation or not self._has_started:
if not self._has_started:
self._has_started = True
new_options["continuation"] = self._continuation

response_headers = {}
(fetched_items, response_headers) = fetch_function(new_options)
if not self._has_started:
self._has_started = True

continuation_key = http_constants.HttpHeaders.Continuation
self._continuation = response_headers.get(continuation_key)
Expand Down
181 changes: 181 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_query_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Internal query builder for multi-item operations."""

from typing import Dict, Tuple, Any, TYPE_CHECKING, Sequence

from azure.cosmos.partition_key import _Undefined, _Empty, NonePartitionKeyValue
if TYPE_CHECKING:
from azure.cosmos._cosmos_client_connection import _PartitionKeyType


class _QueryBuilder:
"""Internal class for building optimized queries for multi-item operations."""

@staticmethod
def _get_field_expression(path: str) -> str:
"""Converts a path string into a query field expression.

:param str path: The path string to convert.
:return: The query field expression.
:rtype: str
"""
field_name = path.lstrip("/")
if "/" in field_name:
# Handle nested paths like "a/b" -> c["a"]["b"]
field_parts = field_name.split("/")
return "c" + "".join(f'["{part}"]' for part in field_parts)
# Handle simple paths like "pk" -> c.pk or c["non-identifier-pk"]
return f"c.{field_name}" if field_name.isidentifier() else f'c["{field_name}"]'

@staticmethod
def is_id_partition_key_query(
items: Sequence[Tuple[str, "_PartitionKeyType"]],
partition_key_definition: Dict[str, Any]
) -> bool:
"""Check if we can use the optimized ID IN query.

:param Sequence[tuple[str, any]] items: The list of items to check.
:param dict[str, any] partition_key_definition: The partition key definition of the container.
:return: True if the optimized ID IN query can be used, False otherwise.
:rtype: bool
"""
partition_key_paths = partition_key_definition.get("paths", [])
if len(partition_key_paths) != 1 or partition_key_paths[0] != "/id":
return False

for item_id, partition_key_value in items:
pk_val = partition_key_value[0] if isinstance(partition_key_value, list) else partition_key_value
if pk_val != item_id:
return False
return True

@staticmethod
def is_single_logical_partition_query(
items: Sequence[Tuple[str, "_PartitionKeyType"]]
) -> bool:
"""Check if all items in a chunk belong to the same logical partition.

This is used to determine if an optimized query with an IN clause can be used.

:param Sequence[tuple[str, any]] items: The list of items to check.
:return: True if all items belong to the same logical partition, False otherwise.
:rtype: bool
"""
if not items or len(items) <= 1:
return False
first_pk = items[0][1]
return all(item[1] == first_pk for item in items)

@staticmethod
def build_pk_and_id_in_query(
items: Sequence[Tuple[str, "_PartitionKeyType"]],
partition_key_definition: Dict[str, Any]
) -> Dict[str, Any]:
"""Build a query for items in a single logical partition using an IN clause for IDs.

e.g., SELECT * FROM c WHERE c.pk = @pk AND c.id IN (@id1, @id2)

:param Sequence[tuple[str, any]] items: The list of items to build the query for.
:param dict[str, any] partition_key_definition: The partition key definition of the container.
:return: A dictionary containing the query text and parameters.
:rtype: dict[str, any]
"""
partition_key_path = partition_key_definition['paths'][0].lstrip('/')
partition_key_value = items[0][1]

id_params = {f"@id{i}": item[0] for i, item in enumerate(items)}
id_param_names = ", ".join(id_params.keys())

query_text = f"SELECT * FROM c WHERE c.{partition_key_path} = @pk AND c.id IN ({id_param_names})"

parameters = [{"name": "@pk", "value": partition_key_value}]
parameters.extend([{"name": name, "value": value} for name, value in id_params.items()])

return {"query": query_text, "parameters": parameters}

@staticmethod
def build_id_in_query(items: Sequence[Tuple[str, "_PartitionKeyType"]]) -> Dict[str, Any]:
"""Build optimized query using ID IN clause when ID equals partition key.

:param Sequence[tuple[str, any]] items: The list of items to build the query for.
:return: A dictionary containing the query text and parameters.
:rtype: dict[str, any]
"""
id_params = {f"@param_id{i}": item_id for i, (item_id, _) in enumerate(items)}
param_names = ", ".join(id_params.keys())
parameters = [{"name": name, "value": value} for name, value in id_params.items()]

query_string = f"SELECT * FROM c WHERE c.id IN ( {param_names} )"

return {"query": query_string, "parameters": parameters}

@staticmethod
def build_parameterized_query_for_items(
items_by_partition: Dict[str, Sequence[Tuple[str, "_PartitionKeyType"]]],
partition_key_definition: Dict[str, Any]
) -> Dict[str, Any]:
"""Builds a parameterized SQL query for reading multiple items.

:param dict[str, Sequence[tuple[str, any]]] items_by_partition: A dictionary of items grouped by partition key.
:param dict[str, any] partition_key_definition: The partition key definition of the container.
:return: A dictionary containing the query text and parameters.
:rtype: dict[str, any]
"""
all_items = [item for partition_items in items_by_partition.values() for item in partition_items]

if not all_items:
return {"query": "SELECT * FROM c WHERE false", "parameters": []}

partition_key_paths = partition_key_definition.get("paths", [])
query_parts = []
parameters = []

for i, (item_id, partition_key_value) in enumerate(all_items):
id_param_name = f"@param_id{i}"
parameters.append({"name": id_param_name, "value": item_id})
condition_parts = [f"c.id = {id_param_name}"]

pk_values = []
if partition_key_value is not None and not isinstance(partition_key_value, type(NonePartitionKeyValue)):
pk_values = partition_key_value if isinstance(partition_key_value, list) else [partition_key_value]
if len(pk_values) != len(partition_key_paths):
raise ValueError(
f"Number of components in partition key value ({len(pk_values)}) "
f"does not match definition ({len(partition_key_paths)})"
)

for j, path in enumerate(partition_key_paths):
field_expr = _QueryBuilder._get_field_expression(path)
pk_value = pk_values[j] if j < len(pk_values) else None

if pk_value is None or isinstance(pk_value, (_Undefined, _Empty)):
condition_parts.append(f"IS_DEFINED({field_expr}) = false")
else:
pk_param_name = f"@param_pk{i}{j}"
parameters.append({"name": pk_param_name, "value": pk_value})
condition_parts.append(f"{field_expr} = {pk_param_name}")

query_parts.append(f"( {' AND '.join(condition_parts)} )")

query_string = f"SELECT * FROM c WHERE ( {' OR '.join(query_parts)} )"
return {"query": query_string, "parameters": parameters}
Loading
Loading