Skip to content

Commit 67f49d9

Browse files
perf: Add batched commits and LRU caching for database operations (#957)
* perf: Add batched commits and LRU caching for database operations Optimize database operations to reduce execution time by eliminating per-record commits and adding query result caching. Changes: 1. Batched commits for fix-date tracking (vunnel_first_observed.py) - Previously: commit after every single insert - Now: batch commits every 2000 operations (configurable) - Auto-flushes on context exit to ensure data integrity 2. Batched commits for result writes (result.py) - Previously: separate transaction per CVE write - Now: batch commits every 2000 operations (configurable) - Maintains single active transaction, commits periodically 3. LRU caching for fix-date lookups (finder.py) - Previously: every fixdater.best() call hit database - Now: functools.lru_cache(maxsize=10000) caches results - Eliminates duplicate queries for same (CVE, CPE, version, ecosystem) - Cache cleared on context exit Performance impact: - Tested on NVD provider with 322k CVEs - Reduces execution time from 17 minutes to 11 minutes (35% faster) - Reduces database commits from ~322k to ~161 - Eliminates duplicate database queries through caching Testing: - All existing tests pass - Added tests for batching behavior - Added tests for cache functionality Signed-off-by: James Gardner <james.gardner@chainguard.dev> * fix: Add trailing comma for linter Signed-off-by: James Gardner <james.gardner@chainguard.dev> * fix: Add thread safety with locking for concurrent operations Add threading.Lock() to SQLiteStore to ensure thread-safe access to transaction state. While current provider implementations call writer.write() sequentially from the main thread, Ubuntu and RHEL providers use ThreadPoolExecutor internally. Adding defensive locking prevents potential data corruption if providers are refactored to parallelize writes in the future. Also removes duplicate test definition in test_finder.py. Signed-off-by: James Gardner <james.gardner@chainguard.dev> * use thread local for pending ops counter Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com> --------- Signed-off-by: James Gardner <james.gardner@chainguard.dev> Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com> Co-authored-by: Alex Goodman <wagoodman@users.noreply.github.com>
1 parent 964bd40 commit 67f49d9

File tree

5 files changed

+161
-10
lines changed

5 files changed

+161
-10
lines changed

src/vunnel/result.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77
import shutil
8+
import threading
89
import time
910
from dataclasses import asdict, dataclass
1011
from typing import TYPE_CHECKING, Any
@@ -114,12 +115,16 @@ class SQLiteStore(Store):
114115
temp_filename = "results.db.tmp"
115116
table_name = "results"
116117

117-
def __init__(self, *args: Any, write_location: str | None = None, **kwargs: Any):
118+
def __init__(self, *args: Any, write_location: str | None = None, batch_size: int = 2000, **kwargs: Any):
118119
super().__init__(*args, **kwargs)
119120
self.conn: db.engine.Connection | None = None
120121
self.engine: db.engine.Engine | None = None
121122
self.table: db.Table | None = None
122123
self.write_location = write_location
124+
self.batch_size = batch_size
125+
self._pending_operations = 0
126+
self._transaction: Any = None # Active transaction context
127+
self._lock = threading.Lock() # Protects transaction state for thread safety
123128
if self.write_location:
124129
self.filename = os.path.basename(self.write_location)
125130
self.temp_filename = f"{self.filename}.tmp"
@@ -171,7 +176,11 @@ def store(self, identifier: str, record: Envelope) -> None:
171176
record_str = orjson.dumps(asdict(record))
172177
conn, table = self.connection()
173178

174-
with conn.begin():
179+
with self._lock:
180+
# Start a transaction if we don't have one active
181+
if self._transaction is None:
182+
self._transaction = conn.begin()
183+
175184
# upsert the record conditionally based on the skip_duplicates configuration
176185
existing = conn.execute(table.select().where(table.c.id == identifier)).first()
177186
if existing:
@@ -185,6 +194,24 @@ def store(self, identifier: str, record: Envelope) -> None:
185194
statement = db.insert(table).values(id=identifier, record=record_str) # type: ignore[assignment]
186195

187196
conn.execute(statement)
197+
self._pending_operations += 1
198+
199+
# Auto-flush every batch_size operations to limit memory usage
200+
if self._pending_operations >= self.batch_size:
201+
self._flush_unlocked()
202+
203+
def flush(self) -> None:
204+
"""Commit any pending database operations (thread-safe)."""
205+
with self._lock:
206+
self._flush_unlocked()
207+
208+
def _flush_unlocked(self) -> None:
209+
"""Internal flush helper - caller must hold lock."""
210+
if self._pending_operations > 0 and self._transaction is not None:
211+
self._transaction.commit()
212+
self.logger.debug(f"flushed {self._pending_operations} operations to database")
213+
self._transaction = None
214+
self._pending_operations = 0
188215

189216
def read(self, identifier: str) -> Envelope:
190217
conn, table = self.connection()
@@ -204,6 +231,9 @@ def prepare(self) -> None:
204231
shutil.copy2(self.db_file_path, self.temp_db_file_path)
205232

206233
def close(self, successful: bool) -> None:
234+
# Flush any remaining operations before closing
235+
self.flush()
236+
207237
if self.conn:
208238
self.conn.close()
209239
if self.engine:

src/vunnel/tool/fixdate/finder.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import abc
22
import datetime
3+
import functools
34
import logging
45
from dataclasses import dataclass
56

@@ -81,9 +82,11 @@ def get_changed_vuln_ids_since(self, since_date: datetime.datetime) -> set[str]:
8182

8283

8384
class Finder:
84-
def __init__(self, strategies: list[Strategy], first_observed: Strategy):
85+
def __init__(self, strategies: list[Strategy], first_observed: Strategy, cache_size: int = 10000):
8586
self.strategies = strategies
8687
self.first_observed = first_observed
88+
# Create cached version of database lookups
89+
self._cached_find_from_strategies = functools.lru_cache(maxsize=cache_size)(self._find_from_strategies_uncached)
8790

8891
def __enter__(self) -> "Finder":
8992
for s in self.strategies:
@@ -95,6 +98,8 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untype
9598
for s in self.strategies:
9699
s.__exit__(exc_type, exc_val, exc_tb)
97100
self.first_observed.__exit__(exc_type, exc_val, exc_tb)
101+
# Clear cache on exit
102+
self._cached_find_from_strategies.cache_clear()
98103

99104
def download(self) -> None:
100105
self.first_observed.download()
@@ -109,6 +114,26 @@ def _normalize_ecosystem(self, ecosystem: str | None) -> str | None:
109114

110115
return ecosystem_mapping.get(ecosystem, ecosystem)
111116

117+
def _find_from_strategies_uncached(
118+
self,
119+
vuln_id: str,
120+
cpe_or_package: str,
121+
fix_version: str,
122+
ecosystem: str | None,
123+
) -> tuple[list[Result], list[Result]]:
124+
"""Perform database lookups - uncached version for LRU wrapper.
125+
126+
Returns:
127+
Tuple of (strategy_results, first_observed_results)
128+
"""
129+
results = []
130+
for s in self.strategies:
131+
results.extend(s.find(vuln_id, cpe_or_package, fix_version, ecosystem))
132+
133+
first_observed_results = self.first_observed.find(vuln_id, cpe_or_package, fix_version, ecosystem)
134+
135+
return (results, first_observed_results)
136+
112137
def best(
113138
self,
114139
vuln_id: str,
@@ -129,16 +154,19 @@ def best(
129154
if candidates:
130155
results.extend([c for c in candidates if c.accurate and c.date])
131156

132-
# add results from finders in order of priority (set by the constructor)
133-
for s in self.strategies:
134-
results.extend(s.find(vuln_id, cpe_or_package, fix_version, ecosystem))
157+
# Use cached database lookups
158+
strategy_results, first_observed_results = self._cached_find_from_strategies(
159+
vuln_id,
160+
cpe_or_package,
161+
fix_version,
162+
ecosystem,
163+
)
164+
results.extend(strategy_results)
135165

136166
# add low quality candidates last
137167
if candidates:
138168
results.extend([c for c in candidates if not c.accurate and c.date])
139169

140-
first_observed_results = self.first_observed.find(vuln_id, cpe_or_package, fix_version, ecosystem)
141-
142170
# we should select the date from the set of finders that is the highest quality (earlier in the s
143171
# results list) but should never be after the first observed date. However, first observed dates are not always
144172
# accurate, so we should only enforce this if we have an accurate first observed date (not part of the

src/vunnel/tool/fixdate/vunnel_first_observed.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,24 @@ class FixDate:
3131

3232

3333
class Store:
34-
def __init__(self, ws: workspace.Workspace) -> None:
34+
def __init__(self, ws: workspace.Workspace, batch_size: int = 2000) -> None:
3535
self.workspace = ws
3636
self.provider = ws.name
3737
self.db_path = Path(ws.results_path) / "observed-fix-dates.db"
3838
self.logger = logging.getLogger("fixes-" + self.provider)
3939
self.engine: db.engine.Engine | None = None
4040
self._thread_local = threading.local()
41+
self.batch_size = batch_size
42+
43+
@property
44+
def _pending_operations(self) -> int:
45+
"""Get pending operation count for the current thread."""
46+
return getattr(self._thread_local, "pending_operations", 0)
47+
48+
@_pending_operations.setter
49+
def _pending_operations(self, value: int) -> None:
50+
"""Set pending operation count for the current thread."""
51+
self._thread_local.pending_operations = value
4152

4253
def add(
4354
self,
@@ -73,7 +84,19 @@ def add(
7384
)
7485

7586
conn.execute(insert_stmt)
76-
conn.commit()
87+
self._pending_operations += 1
88+
89+
# auto-flush every batch_size operations to limit memory usage
90+
if self._pending_operations >= self.batch_size:
91+
self.flush()
92+
93+
def flush(self) -> None:
94+
"""Commit any pending database operations for the current thread."""
95+
if self._pending_operations > 0:
96+
conn, _ = self._get_connection()
97+
conn.commit()
98+
self.logger.debug(f"flushed {self._pending_operations} operations to database")
99+
self._pending_operations = 0
77100

78101
def get(
79102
self,
@@ -246,4 +269,6 @@ def __enter__(self) -> "Store":
246269
return self
247270

248271
def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def]
272+
# Flush any remaining operations before cleanup
273+
self.flush()
249274
self.cleanup_thread_connections()

tests/unit/tool/test_finder.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,3 +364,30 @@ def test_best_normalizes_ecosystems(self):
364364
strategy1.find.assert_called_once_with("CVE-2023-0001", "package", "1.0.0", "php-composer")
365365
strategy2.find.assert_called_once_with("CVE-2023-0001", "package", "1.0.0", "php-composer")
366366
first_observed.find.assert_called_once_with("CVE-2023-0001", "package", "1.0.0", "php-composer")
367+
368+
def test_caching_reduces_database_calls(self):
369+
"""test that repeated queries use cache instead of hitting database"""
370+
# Create mock strategy that counts calls
371+
call_count = {"count": 0}
372+
test_result = self.create_result("2023-01-01", "test")
373+
374+
def counted_find(*args, **kwargs):
375+
call_count["count"] += 1
376+
return [test_result]
377+
378+
mock_strategy = Mock(spec=Strategy)
379+
mock_strategy.find = counted_find
380+
381+
mock_first_observed = Mock(spec=Strategy)
382+
mock_first_observed.find = Mock(return_value=[])
383+
384+
finder = Finder(strategies=[mock_strategy], first_observed=mock_first_observed)
385+
386+
# Make the same query three times
387+
result1 = finder.best("CVE-2023-1234", "pkg:pypi/requests", "2.28.0", "pypi")
388+
result2 = finder.best("CVE-2023-1234", "pkg:pypi/requests", "2.28.0", "pypi")
389+
result3 = finder.best("CVE-2023-1234", "pkg:pypi/requests", "2.28.0", "pypi")
390+
391+
# Should only call find once - subsequent calls use cache
392+
assert call_count["count"] == 1, f"Expected 1 database call, got {call_count['count']}"
393+
assert result1 == result2 == result3, "Cached results should match original"

tests/unit/tool/test_vunnel_first_observed.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,44 @@ def test_cleanup_thread_connections(self, tmpdir):
403403
# verify thread-local storage is cleared
404404
assert not hasattr(store._thread_local, "conn")
405405
assert not hasattr(store._thread_local, "table")
406+
407+
def test_batch_commit_performance(self, tmpdir, mocker):
408+
"""test that commits are batched for performance"""
409+
ws = workspace.Workspace(tmpdir, "test-db", create=True)
410+
411+
# Use small batch size for testing
412+
store = Store(ws, batch_size=5)
413+
db = DatabaseFixture(store.db_path)
414+
415+
# Mock commit to count calls
416+
conn, _ = store._get_connection()
417+
original_commit = conn.commit
418+
commit_count = [0]
419+
420+
def tracked_commit():
421+
commit_count[0] += 1
422+
return original_commit()
423+
424+
conn.commit = tracked_commit
425+
426+
# Add 12 entries (should trigger 2 auto-flushes at 5 and 10, plus final flush)
427+
for i in range(12):
428+
store.add(
429+
first_observed_date=date(2023, 1, 1),
430+
vuln_id=f"CVE-2023-{i:04d}",
431+
cpe_or_package=f"cpe:2.3:a:vendor:product:{i}:*:*:*:*:*:*:*",
432+
fix_version="1.0.0",
433+
)
434+
435+
# Flush remaining
436+
store.flush()
437+
438+
# Should have committed 3 times: at 5, at 10, and final flush
439+
assert commit_count[0] == 3, f"Expected 3 commits but got {commit_count[0]}"
440+
441+
# Verify all records were saved
442+
results = store.find("CVE-2023-0000", "cpe:2.3:a:vendor:product:0:*:*:*:*:*:*:*", "1.0.0")
443+
assert len(results) == 1
444+
445+
results = store.find("CVE-2023-0011", "cpe:2.3:a:vendor:product:11:*:*:*:*:*:*:*", "1.0.0")
446+
assert len(results) == 1

0 commit comments

Comments
 (0)