-
Notifications
You must be signed in to change notification settings - Fork 128
File patterns in read_storage: wildcard, globstar & braces #1309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis pull request enhances read_storage to support glob patterns (wildcard, globstar, question mark) and brace expansion by introducing URI parsing and filtering utilities, and updating the listing logic to split URIs into base paths and patterns, expand braces, and apply post-listing glob filters within DataChain pipelines. Sequence diagram for read_storage with glob and brace pattern expansionsequenceDiagram
participant User
participant Storage
participant StoragePatternUtils
participant DataChain
User->>Storage: read_storage(uri)
Storage->>StoragePatternUtils: expand_uri_braces(uri)
StoragePatternUtils-->>Storage: expanded_uris
loop for each expanded_uri
Storage->>StoragePatternUtils: split_uri_pattern(expanded_uri)
StoragePatternUtils-->>Storage: base_uri, glob_pattern
Storage->>DataChain: ls(base_uri)
alt glob_pattern exists
Storage->>StoragePatternUtils: should_use_recursion(glob_pattern, recursive)
StoragePatternUtils-->>Storage: use_recursive
Storage->>StoragePatternUtils: convert_globstar_to_sqlite(glob_pattern)
StoragePatternUtils-->>Storage: sqlite_pattern
Storage->>DataChain: filter(sqlite_pattern)
else no pattern
Storage->>DataChain: ls(base_uri)
end
end
Storage->>User: return storage_chain
Class diagram for new and updated storage pattern utilitiesclassDiagram
class StoragePatternUtils {
+split_uri_pattern(uri: str) tuple[str, str|None]
+should_use_recursion(pattern: str, user_recursive: bool) bool
+expand_brace_pattern(pattern: str) list[str]
+expand_uri_braces(uri: str) list[str]
+convert_globstar_to_sqlite(filter_pattern: str) str
}
class DataChain {
+filter()
+union()
}
class Storage {
+read_storage(uri, ...)
+_apply_glob_filter(dc, patterns, list_path, use_recursive, column)
}
StoragePatternUtils <.. Storage : uses
Storage o-- DataChain : pipeline
Class diagram for _apply_glob_filter functionclassDiagram
class Storage {
+_apply_glob_filter(dc: DataChain, patterns: list[str], list_path: str, use_recursive: bool, column: str): DataChain
}
class DataChain {
+filter()
+ls()
}
Storage o-- DataChain : applies filter
Class diagram for brace and glob pattern expansion functionsclassDiagram
class StoragePatternUtils {
+expand_brace_pattern(pattern: str): list[str]
+expand_uri_braces(uri: str): list[str]
+_expand_single_braces(pattern: str): list[str]
}
StoragePatternUtils <.. StoragePatternUtils : uses internally
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/datachain/lib/dc/storage.py:310` </location>
<code_context>
+ # If a glob pattern was detected, use it for filtering
</code_context>
<issue_to_address>
Filtering is applied after listing all files, which may be inefficient for large datasets.
Listing all files before filtering can cause excessive data transfer and slow performance on remote storage. If supported, pass the glob pattern directly to the listing function to filter files at the source.
Suggested implementation:
```python
# If a glob pattern was detected, pass it to get_listing for source-side filtering
if glob_pattern:
# Handle brace expansion patterns
patterns = expand_brace_pattern(glob_pattern)
# If only one pattern, pass it directly
if len(patterns) == 1:
list_ds_name, list_uri, list_path, list_ds_exists = get_listing(
list_uri_to_use, session, update=update, glob_pattern=patterns[0]
)
else:
# For multiple patterns, aggregate results from multiple listings
all_listings = []
for pat in patterns:
ds_name, uri, path, ds_exists = get_listing(
list_uri_to_use, session, update=update, glob_pattern=pat
)
if path:
all_listings.extend(path)
list_ds_name, list_uri, list_path, list_ds_exists = (
list_ds_name, list_uri, all_listings, list_ds_exists
)
else:
list_ds_name, list_uri, list_path, list_ds_exists = get_listing(
list_uri_to_use, session, update=update
)
# list_ds_name is None if object is a file, we don't want to use cache
lambda ds_name=list_ds_name, lst_uri=list_uri: lst_fn(ds_name, lst_uri)
)
# Filtering is now done at the source if glob_pattern is provided
# If further filtering is needed (e.g., for complex patterns not supported by the source), apply here
# Otherwise, use the original list_path from get_listing
from datachain.query.schema import Column
chain = dc
```
- Ensure that the `get_listing` function supports a `glob_pattern` argument and applies filtering at the source. You may need to update its implementation.
- Remove or refactor any redundant post-listing filtering logic that is now handled by the listing function.
- If the listing backend does not support glob patterns, fallback to post-listing filtering as before.
</issue_to_address>
### Comment 2
<location> `src/datachain/lib/dc/storage.py:329` </location>
<code_context>
+ filter_expr = None
+ for pattern in patterns:
+ pattern_filter = Column(f"{column}.path").glob(pattern)
+ filter_expr = pattern_filter if filter_expr is None else filter_expr | pattern_filter
+ chain = chain.filter(filter_expr)
+ chains.append(chain)
</code_context>
<issue_to_address>
Operator precedence in filter expression construction may be ambiguous.
Verify that the filter objects handle the bitwise OR operator as expected, and use parentheses if needed to clarify precedence.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
for pattern in patterns:
pattern_filter = Column(f"{column}.path").glob(pattern)
filter_expr = pattern_filter if filter_expr is None else filter_expr | pattern_filter
=======
for pattern in patterns:
pattern_filter = Column(f"{column}.path").glob(pattern)
filter_expr = pattern_filter if filter_expr is None else (filter_expr | pattern_filter)
>>>>>>> REPLACE
</suggested_fix>
### Comment 3
<location> `tests/unit/lib/test_read_storage_glob.py:209` </location>
<code_context>
+ def test_multiple_patterns(self, mock_read_dataset, mock_ls, mock_get_listing, mock_session, mock_listing):
</code_context>
<issue_to_address>
Test for empty URI list and non-string URI types.
Add tests for an empty URI list and for cases where URIs include os.PathLike objects or a mix of string and PathLike, to verify read_storage handles these inputs correctly.
</issue_to_address>
### Comment 4
<location> `tests/unit/lib/test_read_storage_glob.py:109` </location>
<code_context>
+ def test_wildcard_pattern(self, mock_read_dataset, mock_ls, mock_get_listing, mock_session, mock_listing):
</code_context>
<issue_to_address>
Consider verifying the actual filter arguments for correctness.
Please add an assertion to check that the filter was called with the correct glob pattern argument.
Suggested implementation:
```python
def test_wildcard_pattern(self, mock_read_dataset, mock_ls, mock_get_listing, mock_session, mock_listing):
"""Test that wildcard patterns are automatically filtered"""
tmp_dir, files = mock_listing
# Setup mocks
mock_get_listing.return_value = ("test_dataset", str(tmp_dir), "audio", True)
mock_chain = MagicMock()
mock_query = MagicMock()
mock_chain._query = mock_query
mock_chain.signals_schema = MagicMock()
mock_chain.signals_schema.mutate = MagicMock(return_value=mock_chain.signals_schema)
# Call the function under test (assuming it's called here, e.g. read_storage)
# Example: read_storage("test_dataset/*.wav", ...)
# Assert that the filter was called with the correct glob pattern
expected_pattern = "test_dataset/*.wav"
mock_ls.assert_any_call(expected_pattern)
```
- If the actual glob pattern or the function under test differs, adjust `expected_pattern` and the assertion accordingly.
- If the filter is applied via a different mock (not `mock_ls`), replace `mock_ls` with the correct mock object.
- Ensure the function under test is actually called in the test (e.g., `read_storage(...)`), so the assertion is meaningful.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
src/datachain/lib/dc/storage.py
Outdated
# If a glob pattern was detected, use it for filtering | ||
# Otherwise, use the original list_path from get_listing | ||
if glob_pattern: | ||
# Handle brace expansion patterns | ||
patterns = expand_brace_pattern(glob_pattern) | ||
|
||
# Apply glob filter(s) | ||
from datachain.query.schema import Column | ||
chain = dc | ||
if len(patterns) == 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Filtering is applied after listing all files, which may be inefficient for large datasets.
Listing all files before filtering can cause excessive data transfer and slow performance on remote storage. If supported, pass the glob pattern directly to the listing function to filter files at the source.
Suggested implementation:
# If a glob pattern was detected, pass it to get_listing for source-side filtering
if glob_pattern:
# Handle brace expansion patterns
patterns = expand_brace_pattern(glob_pattern)
# If only one pattern, pass it directly
if len(patterns) == 1:
list_ds_name, list_uri, list_path, list_ds_exists = get_listing(
list_uri_to_use, session, update=update, glob_pattern=patterns[0]
)
else:
# For multiple patterns, aggregate results from multiple listings
all_listings = []
for pat in patterns:
ds_name, uri, path, ds_exists = get_listing(
list_uri_to_use, session, update=update, glob_pattern=pat
)
if path:
all_listings.extend(path)
list_ds_name, list_uri, list_path, list_ds_exists = (
list_ds_name, list_uri, all_listings, list_ds_exists
)
else:
list_ds_name, list_uri, list_path, list_ds_exists = get_listing(
list_uri_to_use, session, update=update
)
# list_ds_name is None if object is a file, we don't want to use cache
lambda ds_name=list_ds_name, lst_uri=list_uri: lst_fn(ds_name, lst_uri)
)
# Filtering is now done at the source if glob_pattern is provided
# If further filtering is needed (e.g., for complex patterns not supported by the source), apply here
# Otherwise, use the original list_path from get_listing
from datachain.query.schema import Column
chain = dc
- Ensure that the
get_listing
function supports aglob_pattern
argument and applies filtering at the source. You may need to update its implementation. - Remove or refactor any redundant post-listing filtering logic that is now handled by the listing function.
- If the listing backend does not support glob patterns, fallback to post-listing filtering as before.
src/datachain/lib/dc/storage.py
Outdated
for pattern in patterns: | ||
pattern_filter = Column(f"{column}.path").glob(pattern) | ||
filter_expr = pattern_filter if filter_expr is None else filter_expr | pattern_filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Operator precedence in filter expression construction may be ambiguous.
Verify that the filter objects handle the bitwise OR operator as expected, and use parentheses if needed to clarify precedence.
for pattern in patterns: | |
pattern_filter = Column(f"{column}.path").glob(pattern) | |
filter_expr = pattern_filter if filter_expr is None else filter_expr | pattern_filter | |
for pattern in patterns: | |
pattern_filter = Column(f"{column}.path").glob(pattern) | |
filter_expr = pattern_filter if filter_expr is None else (filter_expr | pattern_filter) |
def test_multiple_patterns(self, mock_read_dataset, mock_ls, mock_get_listing, mock_session, mock_listing): | ||
"""Test multiple URIs with different patterns""" | ||
tmp_dir, files = mock_listing | ||
|
||
# Setup mocks for multiple URIs | ||
mock_get_listing.side_effect = [ | ||
("test_dataset1", str(tmp_dir), "audio/*.mp3", True), | ||
("test_dataset2", str(tmp_dir), "docs/*.json", True), | ||
] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Test for empty URI list and non-string URI types.
Add tests for an empty URI list and for cases where URIs include os.PathLike objects or a mix of string and PathLike, to verify read_storage handles these inputs correctly.
def test_wildcard_pattern(self, mock_read_dataset, mock_ls, mock_get_listing, mock_session, mock_listing): | ||
"""Test that wildcard patterns are automatically filtered""" | ||
tmp_dir, files = mock_listing | ||
|
||
# Setup mocks | ||
mock_get_listing.return_value = ("test_dataset", str(tmp_dir), "audio", True) | ||
mock_chain = MagicMock() | ||
mock_query = MagicMock() | ||
mock_chain._query = mock_query | ||
mock_chain.signals_schema = MagicMock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider verifying the actual filter arguments for correctness.
Please add an assertion to check that the filter was called with the correct glob pattern argument.
Suggested implementation:
def test_wildcard_pattern(self, mock_read_dataset, mock_ls, mock_get_listing, mock_session, mock_listing):
"""Test that wildcard patterns are automatically filtered"""
tmp_dir, files = mock_listing
# Setup mocks
mock_get_listing.return_value = ("test_dataset", str(tmp_dir), "audio", True)
mock_chain = MagicMock()
mock_query = MagicMock()
mock_chain._query = mock_query
mock_chain.signals_schema = MagicMock()
mock_chain.signals_schema.mutate = MagicMock(return_value=mock_chain.signals_schema)
# Call the function under test (assuming it's called here, e.g. read_storage)
# Example: read_storage("test_dataset/*.wav", ...)
# Assert that the filter was called with the correct glob pattern
expected_pattern = "test_dataset/*.wav"
mock_ls.assert_any_call(expected_pattern)
- If the actual glob pattern or the function under test differs, adjust
expected_pattern
and the assertion accordingly. - If the filter is applied via a different mock (not
mock_ls
), replacemock_ls
with the correct mock object. - Ensure the function under test is actually called in the test (e.g.,
read_storage(...)
), so the assertion is meaningful.
src/datachain/lib/dc/storage.py
Outdated
"s3://bucket/dir" -> ("s3://bucket/dir", None) | ||
""" | ||
# Check if URI contains any glob patterns | ||
if not any(char in uri for char in ['*', '?', '[', '{', '}']): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Invert any/all to simplify comparisons (
invert-any-all
) - Hoist repeated code outside conditional statement [×2] (
hoist-statement-from-if
)
src/datachain/lib/dc/storage.py
Outdated
|
||
|
||
def expand_brace_pattern(pattern: str) -> list[str]: | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Convert for loop into list comprehension (
list-comprehension
) - Inline variable that is immediately returned (
inline-immediately-returned-variable
)
Deploying datachain-documentation with
|
Latest commit: |
962b92e
|
Status: | ✅ Deploy successful! |
Preview URL: | https://79cda28e.datachain-documentation.pages.dev |
Branch Preview URL: | https://globstar.datachain-documentation.pages.dev |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1309 +/- ##
==========================================
- Coverage 88.84% 88.78% -0.07%
==========================================
Files 155 156 +1
Lines 14240 14383 +143
Branches 2025 2062 +37
==========================================
+ Hits 12652 12770 +118
- Misses 1124 1134 +10
- Partials 464 479 +15
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
This reverts commit 94bedbd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't check storage_pattern.py
yet (will check it later), but I have some comments I believe we should adress first before merging this PR.
Also a question: is it possible to use glob pattern in bucket name? What will be a result of: dc.read_storage(s3://*/**/*)
?
# Check if URI contains glob patterns and split them | ||
base_uri, glob_pattern = split_uri_pattern(single_uri) | ||
|
||
# If a pattern is found, use the base_uri for listing | ||
# The pattern will be used for filtering later | ||
list_uri_to_use = base_uri if glob_pattern else single_uri | ||
|
||
list_ds_name, list_uri, list_path, list_ds_exists = get_listing( | ||
single_uri, session, update=update | ||
list_uri_to_use, session, update=update | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: same, we already have checks in split_uri_pattern
, so we can simplify this here:
base_uri, glob_pattern = split_uri_pattern(single_uri)
list_ds_name, list_uri, list_path, list_ds_exists = get_listing(
base_uri, session, update=update
)
for single_uri in uris: | ||
for single_uri in expanded_uris: | ||
# Check if URI contains glob patterns and split them | ||
base_uri, glob_pattern = split_uri_pattern(single_uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am not mistaken, glob_pattern
will be rewrited here, in case of few expanded_uris
. Let's say, first one have globs, and second one — does now have, then result glob_pattern
will be None
and later check (line 220
) will now works, and globs will not be processed.
We should define glob_pattern
with default None
out of the for
loop scope and set it only if glob_pattern
from split_uri_pattern
is not None
.
Also it might be the case with multiple glob, but we are storing only one (from last URI) and ignoring the rest.
Also since we are storing glob pattern and apply it to all the results, it will affects other URIs in case, let's say, first URI was defined without glob pattern, and second URI have glob pattern. Results will be unpredictable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate?
My understanding - glob_pattern
is defined for each uri independently and not affecting each other. Could you please explain the cases when it can affect something.
The only thing I was able to find that changes over the loops - bucket update. The same bucket could be updated multiple times which we have to avoid. Change is comming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, you're absolutely right! Sorry for confusing message from me :(
I was "mislook" the indentation level 😢
Co-authored-by: Vladimir Rudnykh <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor improvements
for single_uri in uris: | ||
for single_uri in expanded_uris: | ||
# Check if URI contains glob patterns and split them | ||
base_uri, glob_pattern = split_uri_pattern(single_uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate?
My understanding - glob_pattern
is defined for each uri independently and not affecting each other. Could you please explain the cases when it can affect something.
The only thing I was able to find that changes over the loops - bucket update. The same bucket could be updated multiple times which we have to avoid. Change is comming.
Good catch - obviously it's not supported. Added the validation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! Great improvement 👍
Wild random controversial idea: implement additional filter_file
chain method to filter chain by the same globs using the same functions you've added, without the need of using C("file").glob(...)
(which syntax I don't remember and need to go to the docs all the time). With signature looks something like that: .filter_file(glob: str, file_signal: str = "file")
.
for single_uri in uris: | ||
for single_uri in expanded_uris: | ||
# Check if URI contains glob patterns and split them | ||
base_uri, glob_pattern = split_uri_pattern(single_uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, you're absolutely right! Sorry for confusing message from me :(
I was "mislook" the indentation level 😢
@patch("datachain.lib.dc.storage.ls") | ||
@patch.object(sys.modules["datachain.lib.dc.datasets"], "read_dataset") | ||
def test_read_storage_brace_expansion_pattern( | ||
mock_read_dataset, mock_ls, mock_get_listing, mock_session, mock_listing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests like this look very complicated but really test only that a function was called? mock_chain.filter.called
- is that the whole purpose? To my mind we don't test much, just testing that function is called is not very interesting.
But also there will be quite heavy maintenance cost to such tests (all these Mocks expose internals and will require updates on each refactoring change + good understanding how internals work).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, removed. Just a small drop in test coverage since it's covered by func tests pretty well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mock tests seem very complicated and not very meaningful - we'll have to maintain them. I would consider improvements there.
Raises: | ||
ValueError: If a cloud storage bucket name contains glob patterns | ||
""" | ||
if not any(uri.startswith(scheme) for scheme in ["s3://", "gs://", "az://"]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No hf
? Do we really want to hard code the list? Is there a better way to handle this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
-
I added hf with test.
-
I was not able to find a single check for this in fsspec/client - I extracted this to
fsspec.py
if not any(uri.startswith(scheme) for scheme in ["s3://", "gs://", "az://"]): | ||
return | ||
|
||
# Extract bucket name (everything between :// and first /) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we have some helpers for this in Client / fsspec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope. But I created one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite a lot of logic in storage pattern ....it would be great to check if some libraries exist to support all of this. It is quite a lot of low level code for the change to maintain later
Yeah, I also was hoping that it will be much easier - that's why I took it. However, it's much more involved when it comes down to the details. |
Closes #1283
AI generated mostly.
Result: user can do this
without the need in this
But it also applies to dirs and combination of these like
dc.read_storage("s3://mybkt/**/{march,april,may}/**/*.{png,jpg}")
One side effect:
dir1/*
include only files in dir1 (likedir1/file.txt
) and won't include nested files (likedir1/dir2/song.mp4
) like we did before (old wildcard). It better aligns with Unix way of expanding file pattern and it's hard to support both.Summary by Sourcery
Enable full glob pattern support in read_storage by adding utilities for expanding braces, detecting and splitting patterns, converting globstars to SQL-compatible filters, and integrating pattern-based filtering into the listing process.
New Features:
Enhancements:
Tests: