55from datetime import datetime
66from pathlib import Path
77from unittest .mock import Mock , patch
8- from typing import List , Tuple , Optional
98
109import pytest
10+ import zstandard
1111
1212from vunnel import workspace
13- from vunnel .tool .fixdate .grype_db_first_observed import Store , normalize_package_name
1413from vunnel .tool .fixdate .finder import Result
14+ from vunnel .tool .fixdate .grype_db_first_observed import Store
1515
1616
1717class DatabaseFixture :
@@ -165,7 +165,7 @@ def insert_runs_data(db_path: Path) -> None:
165165 )
166166
167167 @staticmethod
168- def insert_custom_data (db_path : Path , data : List [ Tuple ], vulnerability_count : Optional [ int ] = None ) -> None :
168+ def insert_custom_data (db_path : Path , data : list [ tuple ], vulnerability_count : int | None = None ) -> None :
169169 """Insert custom fixdate data
170170
171171 Args:
@@ -177,7 +177,7 @@ def insert_custom_data(db_path: Path, data: List[Tuple], vulnerability_count: Op
177177 if vulnerability_count is not None :
178178 conn .execute (
179179 "UPDATE databases SET vulnerability_count = ? WHERE id = 1" ,
180- (vulnerability_count ,)
180+ (vulnerability_count ,),
181181 )
182182
183183 conn .executemany (
@@ -235,25 +235,35 @@ def test_download_success(self, mock_oras_client_class, tmpdir):
235235 mock_client = Mock ()
236236 mock_oras_client_class .return_value = mock_client
237237
238- # create the expected download file
239- download_path = Path (ws .input_path ) / "fix-dates" / "test-db.db"
240- download_path .parent .mkdir (parents = True , exist_ok = True )
241- download_path .write_text ("dummy db content" )
238+ # create the expected zstd-compressed download file
239+ download_zst_path = Path (ws .input_path ) / "fix-dates" / "test-db.db.zst"
240+ download_zst_path .parent .mkdir (parents = True , exist_ok = True )
241+ # compress "dummy db content" with zstd
242+ cctx = zstandard .ZstdCompressor ()
243+ download_zst_path .write_bytes (cctx .compress (b"dummy db content" ))
242244
243245 # run download
244246 store .download ()
245247
246248 # verify ORAS client was called correctly
247249 mock_oras_client_class .assert_called_once ()
248- mock_client .pull .assert_called_once_with (
249- target = "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest" ,
250- outdir = str (download_path .parent ),
251- )
250+ mock_client .pull .assert_called_once ()
251+ # verify pull was called with one of the expected tags (fallback may occur)
252+ call_kwargs = mock_client .pull .call_args [1 ]
253+ assert call_kwargs ["target" ] in [
254+ "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest-zstd" ,
255+ "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest" ,
256+ ]
257+ assert call_kwargs ["outdir" ] == str (download_zst_path .parent )
252258
253259 # verify directory was created
254260 assert store .db_path .parent .exists ()
255261 # verify the file was moved to the correct location
256262 assert store .db_path .exists ()
263+ # verify the decompressed content is correct
264+ assert store .db_path .read_bytes () == b"dummy db content"
265+ # verify the zstd file was removed
266+ assert not download_zst_path .exists ()
257267
258268 @patch ("vunnel.tool.fixdate.grype_db_first_observed._ProgressLoggingOrasClient" )
259269 def test_download_failure (self , mock_oras_client_class , tmpdir ):
@@ -304,11 +314,12 @@ def test_download_creates_directories(self, tmpdir):
304314 mock_client = Mock ()
305315 mock_oras_client_class .return_value = mock_client
306316
307- # create the expected download file after pull is called
317+ # create the expected zstd-compressed download file after pull is called
308318 def side_effect (* args , ** kwargs ):
309- download_path = Path (ws .input_path ) / "fix-dates" / "test-db.db"
310- download_path .parent .mkdir (parents = True , exist_ok = True )
311- download_path .write_text ("dummy db content" )
319+ download_zst_path = Path (ws .input_path ) / "fix-dates" / "test-db.db.zst"
320+ download_zst_path .parent .mkdir (parents = True , exist_ok = True )
321+ cctx = zstandard .ZstdCompressor ()
322+ download_zst_path .write_bytes (cctx .compress (b"dummy db content" ))
312323
313324 mock_client .pull .side_effect = side_effect
314325
@@ -393,7 +404,7 @@ def test_get_without_download_raises_error(self, tmpdir):
393404 fix_version = "1.0.0" ,
394405 )
395406
396- @patch .dict (' os.environ' , {' GITHUB_TOKEN' : ' test-token' })
407+ @patch .dict (" os.environ" , {" GITHUB_TOKEN" : " test-token" })
397408 @patch ("vunnel.tool.fixdate.grype_db_first_observed._ProgressLoggingOrasClient" )
398409 def test_download_with_github_token (self , mock_oras_client_class , tmpdir ):
399410 # create workspace and store
@@ -404,10 +415,11 @@ def test_download_with_github_token(self, mock_oras_client_class, tmpdir):
404415 mock_client = Mock ()
405416 mock_oras_client_class .return_value = mock_client
406417
407- # create the expected download file
408- download_path = Path (ws .input_path ) / "fix-dates" / "test-db.db"
409- download_path .parent .mkdir (parents = True , exist_ok = True )
410- download_path .write_text ("dummy db content" )
418+ # create the expected zstd-compressed download file
419+ download_zst_path = Path (ws .input_path ) / "fix-dates" / "test-db.db.zst"
420+ download_zst_path .parent .mkdir (parents = True , exist_ok = True )
421+ cctx = zstandard .ZstdCompressor ()
422+ download_zst_path .write_bytes (cctx .compress (b"dummy db content" ))
411423
412424 # run download
413425 store .download ()
@@ -419,11 +431,14 @@ def test_download_with_github_token(self, mock_oras_client_class, tmpdir):
419431 password = "test-token" ,
420432 )
421433
422- # verify pull was still called
423- mock_client .pull .assert_called_once_with (
424- target = "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest" ,
425- outdir = str (download_path .parent ),
426- )
434+ # verify pull was called (tag may vary due to fallback logic)
435+ mock_client .pull .assert_called_once ()
436+ call_kwargs = mock_client .pull .call_args [1 ]
437+ assert call_kwargs ["target" ] in [
438+ "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest-zstd" ,
439+ "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest" ,
440+ ]
441+ assert call_kwargs ["outdir" ] == str (download_zst_path .parent )
427442
428443 def test_get_by_cpe (self , tmpdir , helpers ):
429444 # create workspace and store
@@ -825,7 +840,7 @@ def mock_exists(self):
825840 mock_subprocess_run .assert_called_once ()
826841 call_args = mock_subprocess_run .call_args [0 ][0 ]
827842 assert call_args [0 ].endswith ("/.tool/oras" )
828- assert call_args [1 :] == ["resolve" , "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest" ]
843+ assert call_args [1 :] == ["resolve" , "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest-zstd " ]
829844
830845 # verify oras pull was NOT called (download skipped)
831846 mock_client .pull .assert_not_called ()
@@ -857,10 +872,11 @@ def test_download_with_digest_caching_downloads_when_changed(self, mock_oras_cli
857872 mock_client = Mock ()
858873 mock_oras_client_class .return_value = mock_client
859874
860- # create the expected download file
861- download_path = Path (ws .input_path ) / "fix-dates" / "test-db.db"
862- download_path .parent .mkdir (parents = True , exist_ok = True )
863- download_path .write_text ("new db content" )
875+ # create the expected zstd-compressed download file
876+ download_zst_path = Path (ws .input_path ) / "fix-dates" / "test-db.db.zst"
877+ download_zst_path .parent .mkdir (parents = True , exist_ok = True )
878+ cctx = zstandard .ZstdCompressor ()
879+ download_zst_path .write_bytes (cctx .compress (b"new db content" ))
864880
865881 # mock oras binary exists check
866882 original_exists = Path .exists
@@ -881,7 +897,7 @@ def mock_exists(self):
881897
882898 # verify new digest was saved
883899 assert store .digest_path .read_text ().strip () == new_digest
884- assert store .db_path .read_text () == "new db content"
900+ assert store .db_path .read_bytes () == b "new db content"
885901
886902 @patch ("subprocess.run" )
887903 @patch ("vunnel.tool.fixdate.grype_db_first_observed._ProgressLoggingOrasClient" )
@@ -894,10 +910,11 @@ def test_download_without_oras_cli_proceeds_normally(self, mock_oras_client_clas
894910 mock_client = Mock ()
895911 mock_oras_client_class .return_value = mock_client
896912
897- # create the expected download file
898- download_path = Path (ws .input_path ) / "fix-dates" / "test-db.db"
899- download_path .parent .mkdir (parents = True , exist_ok = True )
900- download_path .write_text ("db content" )
913+ # create the expected zstd-compressed download file
914+ download_zst_path = Path (ws .input_path ) / "fix-dates" / "test-db.db.zst"
915+ download_zst_path .parent .mkdir (parents = True , exist_ok = True )
916+ cctx = zstandard .ZstdCompressor ()
917+ download_zst_path .write_bytes (cctx .compress (b"db content" ))
901918
902919 # mock Path.exists to return False for oras binary (oras not found)
903920 original_exists = Path .exists
@@ -917,7 +934,7 @@ def mock_exists(self):
917934 mock_client .pull .assert_called_once ()
918935
919936 # verify database file exists
920- assert store .db_path .read_text () == "db content"
937+ assert store .db_path .read_bytes () == b "db content"
921938
922939 @patch ("subprocess.run" )
923940 @patch ("vunnel.tool.fixdate.grype_db_first_observed._ProgressLoggingOrasClient" )
@@ -941,10 +958,11 @@ def test_download_with_missing_digest_file_downloads(self, mock_oras_client_clas
941958 mock_client = Mock ()
942959 mock_oras_client_class .return_value = mock_client
943960
944- # create the expected download file
945- download_path = Path (ws .input_path ) / "fix-dates" / "test-db.db"
946- download_path .parent .mkdir (parents = True , exist_ok = True )
947- download_path .write_text ("new db content" )
961+ # create the expected zstd-compressed download file
962+ download_zst_path = Path (ws .input_path ) / "fix-dates" / "test-db.db.zst"
963+ download_zst_path .parent .mkdir (parents = True , exist_ok = True )
964+ cctx = zstandard .ZstdCompressor ()
965+ download_zst_path .write_bytes (cctx .compress (b"new db content" ))
948966
949967 # mock oras binary exists check
950968 original_exists = Path .exists
@@ -977,10 +995,11 @@ def test_download_with_oras_resolve_failure_downloads(self, mock_oras_client_cla
977995 mock_client = Mock ()
978996 mock_oras_client_class .return_value = mock_client
979997
980- # create the expected download file
981- download_path = Path (ws .input_path ) / "fix-dates" / "test-db.db"
982- download_path .parent .mkdir (parents = True , exist_ok = True )
983- download_path .write_text ("db content" )
998+ # create the expected zstd-compressed download file
999+ download_zst_path = Path (ws .input_path ) / "fix-dates" / "test-db.db.zst"
1000+ download_zst_path .parent .mkdir (parents = True , exist_ok = True )
1001+ cctx = zstandard .ZstdCompressor ()
1002+ download_zst_path .write_bytes (cctx .compress (b"db content" ))
9841003
9851004 # run download
9861005 store .download ()
@@ -990,3 +1009,69 @@ def test_download_with_oras_resolve_failure_downloads(self, mock_oras_client_cla
9901009
9911010 # verify database file exists
9921011 assert store .db_path .exists ()
1012+
1013+ @patch ("vunnel.tool.fixdate.grype_db_first_observed._ProgressLoggingOrasClient" )
1014+ def test_download_uncompressed_db_file (self , mock_oras_client_class , tmpdir ):
1015+ """test that download handles uncompressed .db file (no .zst)"""
1016+ ws = workspace .Workspace (tmpdir , "test-db" , create = True )
1017+ store = Store (ws )
1018+
1019+ # mock the ORAS client
1020+ mock_client = Mock ()
1021+ mock_oras_client_class .return_value = mock_client
1022+
1023+ # create uncompressed db file (no .zst)
1024+ download_dir = Path (ws .input_path ) / "fix-dates"
1025+ download_dir .mkdir (parents = True , exist_ok = True )
1026+ (download_dir / "test-db.db" ).write_text ("uncompressed db content" )
1027+
1028+ # run download
1029+ store .download ()
1030+
1031+ # verify ORAS client was called
1032+ mock_oras_client_class .assert_called_once ()
1033+ mock_client .pull .assert_called_once ()
1034+
1035+ # verify database file exists with correct content
1036+ assert store .db_path .exists ()
1037+ assert store .db_path .read_text () == "uncompressed db content"
1038+
1039+ @patch ("subprocess.run" )
1040+ @patch ("vunnel.tool.fixdate.grype_db_first_observed._ProgressLoggingOrasClient" )
1041+ def test_resolve_image_ref_fallback (self , mock_oras_client_class , mock_subprocess_run , tmpdir ):
1042+ """test that _resolve_image_ref falls back from latest-zstd to latest"""
1043+ ws = workspace .Workspace (tmpdir , "test-db" , create = True )
1044+ store = Store (ws )
1045+
1046+ # mock oras binary exists
1047+ original_exists = Path .exists
1048+
1049+ def mock_exists (self ):
1050+ if str (self ).endswith ("/.tool/oras" ):
1051+ return True
1052+ return original_exists (self )
1053+
1054+ # first call (latest-zstd) fails, second call (latest) succeeds
1055+ mock_result_fail = Mock ()
1056+ mock_result_fail .returncode = 1
1057+ mock_result_fail .stderr = "not found"
1058+
1059+ mock_result_success = Mock ()
1060+ mock_result_success .returncode = 0
1061+ mock_result_success .stdout = "sha256:latest123\n "
1062+
1063+ mock_subprocess_run .side_effect = [
1064+ subprocess .CalledProcessError (1 , "oras" , stderr = "not found" ),
1065+ mock_result_success ,
1066+ ]
1067+
1068+ # run _resolve_image_ref
1069+ with patch .object (Path , "exists" , mock_exists ):
1070+ image_ref , digest = store ._resolve_image_ref ("ghcr.io/anchore/grype-db-observed-fix-date/test-db" )
1071+
1072+ # verify it returned the latest tag
1073+ assert image_ref == "ghcr.io/anchore/grype-db-observed-fix-date/test-db:latest"
1074+ assert digest == "sha256:latest123"
1075+
1076+ # verify both tags were tried
1077+ assert mock_subprocess_run .call_count == 2
0 commit comments