Skip to content

Commit 68d6ef8

Browse files
authored
feat: implements a fixture writer utility (#65)
Introduce a utility for reading and writing fixture files, supporting various formats including compressed JSON files.
1 parent 5a1e46e commit 68d6ef8

File tree

3 files changed

+715
-49
lines changed

3 files changed

+715
-49
lines changed

sqlspec/core/parameters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1176,7 +1176,7 @@ def coerce_value(value: Any) -> Any:
11761176
return value
11771177

11781178
if isinstance(value, TypedParameter):
1179-
wrapped_value = value.value
1179+
wrapped_value: Any = value.value
11801180
# Skip coercion for None values even when wrapped
11811181
if wrapped_value is None:
11821182
return wrapped_value

sqlspec/utils/fixtures.py

Lines changed: 235 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,267 @@
11
"""Fixture loading utilities for SQLSpec.
22
3-
Provides functions for loading and parsing JSON fixture files
3+
Provides functions for writing, loading and parsing JSON fixture files
44
used in testing and development. Supports both sync and async operations.
55
"""
66

7+
import gzip
8+
import zipfile
79
from pathlib import Path
8-
from typing import Any
10+
from typing import TYPE_CHECKING, Any, Union
911

10-
from sqlspec._serialization import decode_json
11-
from sqlspec.exceptions import MissingDependencyError
12+
from sqlspec.storage import storage_registry
13+
from sqlspec.utils.serializers import from_json as decode_json
14+
from sqlspec.utils.serializers import to_json as encode_json
15+
from sqlspec.utils.sync_tools import async_
16+
from sqlspec.utils.type_guards import schema_dump
1217

13-
__all__ = ("open_fixture", "open_fixture_async")
18+
if TYPE_CHECKING:
19+
from sqlspec.typing import ModelDictList, SupportedSchemaModel
20+
21+
__all__ = ("open_fixture", "open_fixture_async", "write_fixture", "write_fixture_async")
22+
23+
24+
def _read_compressed_file(file_path: Path) -> str:
25+
"""Read and decompress a file based on its extension.
26+
27+
Args:
28+
file_path: Path to the file to read
29+
30+
Returns:
31+
The decompressed file content as a string
32+
33+
Raises:
34+
ValueError: If the file format is not supported
35+
"""
36+
if file_path.suffix == ".gz":
37+
with gzip.open(file_path, mode="rt", encoding="utf-8") as f:
38+
return f.read()
39+
elif file_path.suffix == ".zip":
40+
with zipfile.ZipFile(file_path, "r") as zf:
41+
# Assume the JSON file inside has the same name without .zip
42+
json_name = file_path.stem + ".json"
43+
if json_name in zf.namelist():
44+
with zf.open(json_name) as f:
45+
return f.read().decode("utf-8")
46+
# If not found, try the first JSON file in the archive
47+
json_files = [name for name in zf.namelist() if name.endswith(".json")]
48+
if json_files:
49+
with zf.open(json_files[0]) as f:
50+
return f.read().decode("utf-8")
51+
msg = f"No JSON file found in ZIP archive: {file_path}"
52+
raise ValueError(msg)
53+
else:
54+
msg = f"Unsupported compression format: {file_path.suffix}"
55+
raise ValueError(msg)
56+
57+
58+
def _find_fixture_file(fixtures_path: Any, fixture_name: str) -> Path:
59+
"""Find a fixture file with various extensions.
60+
61+
Args:
62+
fixtures_path: The path to look for fixtures
63+
fixture_name: The fixture name to load
64+
65+
Returns:
66+
Path to the found fixture file
67+
68+
Raises:
69+
FileNotFoundError: If no fixture file is found
70+
"""
71+
base_path = Path(fixtures_path)
72+
73+
# Try different file extensions in order of preference
74+
for extension in [".json", ".json.gz", ".json.zip"]:
75+
fixture_path = base_path / f"{fixture_name}{extension}"
76+
if fixture_path.exists():
77+
return fixture_path
78+
79+
# If no file found, raise error
80+
msg = f"Could not find the {fixture_name} fixture"
81+
raise FileNotFoundError(msg)
1482

1583

1684
def open_fixture(fixtures_path: Any, fixture_name: str) -> Any:
17-
"""Load and parse a JSON fixture file.
85+
"""Load and parse a JSON fixture file with compression support.
86+
87+
Supports reading from:
88+
- Regular JSON files (.json)
89+
- Gzipped JSON files (.json.gz)
90+
- Zipped JSON files (.json.zip)
1891
1992
Args:
20-
fixtures_path: The path to look for fixtures (pathlib.Path or anyio.Path)
93+
fixtures_path: The path to look for fixtures (pathlib.Path)
2194
fixture_name: The fixture name to load.
2295
23-
Raises:
24-
FileNotFoundError: Fixtures not found.
2596
2697
Returns:
2798
The parsed JSON data
2899
"""
29-
fixture = Path(fixtures_path / f"{fixture_name}.json")
30-
if fixture.exists():
31-
with fixture.open(mode="r", encoding="utf-8") as f:
100+
fixture_path = _find_fixture_file(fixtures_path, fixture_name)
101+
102+
if fixture_path.suffix in {".gz", ".zip"}:
103+
f_data = _read_compressed_file(fixture_path)
104+
else:
105+
# Regular JSON file
106+
with fixture_path.open(mode="r", encoding="utf-8") as f:
32107
f_data = f.read()
33-
return decode_json(f_data)
34-
msg = f"Could not find the {fixture_name} fixture"
35-
raise FileNotFoundError(msg)
108+
109+
return decode_json(f_data)
36110

37111

38112
async def open_fixture_async(fixtures_path: Any, fixture_name: str) -> Any:
39-
"""Load and parse a JSON fixture file asynchronously.
113+
"""Load and parse a JSON fixture file asynchronously with compression support.
114+
115+
Supports reading from:
116+
- Regular JSON files (.json)
117+
- Gzipped JSON files (.json.gz)
118+
- Zipped JSON files (.json.zip)
119+
120+
For compressed files, uses sync reading in a thread pool since gzip and zipfile
121+
don't have native async equivalents.
40122
41123
Args:
42-
fixtures_path: The path to look for fixtures (pathlib.Path or anyio.Path)
124+
fixtures_path: The path to look for fixtures (pathlib.Path)
43125
fixture_name: The fixture name to load.
44126
45-
Raises:
46-
FileNotFoundError: Fixtures not found.
47-
MissingDependencyError: The `anyio` library is required to use this function.
48127
49128
Returns:
50129
The parsed JSON data
51130
"""
131+
# Use sync path finding since it's fast
132+
fixture_path = _find_fixture_file(fixtures_path, fixture_name)
133+
134+
if fixture_path.suffix in {".gz", ".zip"}:
135+
# For compressed files, run in thread pool since they don't have async equivalents
136+
read_func = async_(_read_compressed_file)
137+
f_data = await read_func(fixture_path)
138+
else:
139+
# For regular JSON files, use async file reading
140+
async_read = async_(lambda p: p.read_text(encoding="utf-8"))
141+
f_data = await async_read(fixture_path)
142+
143+
return decode_json(f_data)
144+
145+
146+
def _serialize_data(data: Any) -> str:
147+
"""Serialize data to JSON string, handling different input types.
148+
149+
Args:
150+
data: Data to serialize. Can be dict, list, or SQLSpec model types
151+
152+
Returns:
153+
JSON string representation of the data
154+
"""
155+
if isinstance(data, (list, tuple)):
156+
# List of models or dicts - convert each item, handling primitives
157+
serialized_items: list[Any] = []
158+
for item in data:
159+
# Use schema_dump for structured data, pass primitives through
160+
if isinstance(item, (str, int, float, bool, type(None))):
161+
serialized_items.append(item)
162+
else:
163+
serialized_items.append(schema_dump(item))
164+
return encode_json(serialized_items)
165+
# Single model, dict, or other type - try schema_dump first, fallback for primitives
166+
if isinstance(data, (str, int, float, bool, type(None))):
167+
return encode_json(data)
168+
return encode_json(schema_dump(data))
169+
170+
171+
def write_fixture(
172+
fixtures_path: str,
173+
table_name: str,
174+
data: "Union[ModelDictList, list[dict[str, Any]], SupportedSchemaModel]",
175+
storage_backend: str = "local",
176+
compress: bool = False,
177+
**storage_kwargs: Any,
178+
) -> None:
179+
"""Write fixture data to storage using SQLSpec storage backend.
180+
181+
Args:
182+
fixtures_path: Base path where fixtures should be stored
183+
table_name: Name of the table/fixture (used as filename)
184+
data: Data to write - can be list of dicts, models, or single model
185+
storage_backend: Storage backend to use (default: "local")
186+
compress: Whether to gzip compress the output
187+
**storage_kwargs: Additional arguments for the storage backend
188+
189+
Raises:
190+
ValueError: If storage backend is not found
191+
"""
192+
# Get the storage backend using URI-based registration
193+
# For "local" backend, use file:// URI with base_path parameter
194+
if storage_backend == "local":
195+
uri = "file://"
196+
storage_kwargs["base_path"] = str(Path(fixtures_path).resolve())
197+
else:
198+
uri = storage_backend
199+
52200
try:
53-
from anyio import Path as AsyncPath
54-
except ImportError as exc:
55-
raise MissingDependencyError(package="anyio") from exc
56-
57-
fixture = AsyncPath(fixtures_path / f"{fixture_name}.json")
58-
if await fixture.exists():
59-
async with await fixture.open(mode="r", encoding="utf-8") as f:
60-
f_data = await f.read()
61-
return decode_json(f_data)
62-
msg = f"Could not find the {fixture_name} fixture"
63-
raise FileNotFoundError(msg)
201+
storage = storage_registry.get(uri, **storage_kwargs)
202+
except Exception as exc:
203+
msg = f"Failed to get storage backend for '{storage_backend}': {exc}"
204+
raise ValueError(msg) from exc
205+
206+
# Serialize the data
207+
json_content = _serialize_data(data)
208+
209+
# Determine file path and content - use relative path from the base path
210+
if compress:
211+
file_path = f"{table_name}.json.gz"
212+
content = gzip.compress(json_content.encode("utf-8"))
213+
storage.write_bytes(file_path, content)
214+
else:
215+
file_path = f"{table_name}.json"
216+
storage.write_text(file_path, json_content)
217+
218+
219+
async def write_fixture_async(
220+
fixtures_path: str,
221+
table_name: str,
222+
data: "Union[ModelDictList, list[dict[str, Any]], SupportedSchemaModel]",
223+
storage_backend: str = "local",
224+
compress: bool = False,
225+
**storage_kwargs: Any,
226+
) -> None:
227+
"""Write fixture data to storage using SQLSpec storage backend asynchronously.
228+
229+
Args:
230+
fixtures_path: Base path where fixtures should be stored
231+
table_name: Name of the table/fixture (used as filename)
232+
data: Data to write - can be list of dicts, models, or single model
233+
storage_backend: Storage backend to use (default: "local")
234+
compress: Whether to gzip compress the output
235+
**storage_kwargs: Additional arguments for the storage backend
236+
237+
Raises:
238+
ValueError: If storage backend is not found
239+
"""
240+
# Get the storage backend using URI-based registration
241+
# For "local" backend, use file:// URI with base_path parameter
242+
if storage_backend == "local":
243+
uri = "file://"
244+
storage_kwargs["base_path"] = str(Path(fixtures_path).resolve())
245+
else:
246+
uri = storage_backend
247+
248+
try:
249+
storage = storage_registry.get(uri, **storage_kwargs)
250+
except Exception as exc:
251+
msg = f"Failed to get storage backend for '{storage_backend}': {exc}"
252+
raise ValueError(msg) from exc
253+
254+
# Serialize the data in a thread pool since it might be CPU intensive
255+
serialize_func = async_(_serialize_data)
256+
json_content = await serialize_func(data)
257+
258+
# Determine file path and content
259+
if compress:
260+
file_path = f"{table_name}.json.gz"
261+
# Compress in thread pool since gzip is CPU intensive
262+
compress_func = async_(lambda content: gzip.compress(content.encode("utf-8")))
263+
content = await compress_func(json_content)
264+
await storage.write_bytes_async(file_path, content)
265+
else:
266+
file_path = f"{table_name}.json"
267+
await storage.write_text_async(file_path, json_content)

0 commit comments

Comments
 (0)