Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 41 additions & 31 deletions gsppy/accelerate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import os
from typing import Any, Dict, List, Tuple, Optional, cast

from .utils import split_into_batches, is_subsequence_in_list
from .utils import split_into_batches, is_subsequence_in_list, is_subsequence_non_contiguous

# Optional GPU (CuPy) support
_gpu_available = False
Expand Down Expand Up @@ -111,7 +111,7 @@ def _support_counts_gpu_singletons(
vocab_size: int,
) -> List[Tuple[List[int], int]]:
"""GPU-accelerated support counts for singleton candidates using CuPy.

This computes the number of transactions containing each candidate item ID.
It uniquifies items per transaction on CPU to preserve presence semantics,
then performs a single bincount on GPU.
Expand All @@ -126,8 +126,8 @@ def _support_counts_gpu_singletons(
if not flat:
return []

cp_flat = cp.asarray(flat, dtype=cp.int32) # type: ignore[name-defined]
counts = cp.bincount(cp_flat, minlength=vocab_size) # type: ignore[attr-defined]
cp_flat = cp.asarray(flat, dtype=cp.int32) # type: ignore[name-defined]
counts = cp.bincount(cp_flat, minlength=vocab_size) # type: ignore[name-defined]
counts_host: Any = counts.get() # back to host as a NumPy array
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you remove my comments?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry sir i will add it back


out: List[Tuple[List[int], int]] = []
Expand All @@ -143,20 +143,24 @@ def support_counts_python(
candidates: List[Tuple[str, ...]],
min_support_abs: int,
batch_size: int = 100,
contiguous: bool=False ,
) -> Dict[Tuple[str, ...], int]:
"""Pure-Python fallback for support counting (single-process).

Evaluates each candidate pattern's frequency across all transactions
using the same contiguous-subsequence semantics as the Rust backend.

Note: This implementation is single-process and optimized for simplicity.
Heavy workloads may benefit from the Rust backend.
"""
# Simple non-multiprocessing version to avoid import cycles.

results: Dict[Tuple[str, ...], int] = {}
subsequence_checker = is_subsequence_in_list if contiguous else is_subsequence_non_contiguous

for batch in split_into_batches(candidates, batch_size):
for cand in batch:
freq = sum(1 for t in transactions if is_subsequence_in_list(cand, t))
freq = sum(1 for t in transactions if subsequence_checker(cand, t))
if freq >= min_support_abs:
results[cand] = freq
return results
Expand All @@ -168,37 +172,49 @@ def support_counts(
min_support_abs: int,
batch_size: int = 100,
backend: Optional[str] = None,
contiguous: bool = False,
) -> Dict[Tuple[str, ...], int]:
"""Choose the best available backend for support counting.

Backend selection is controlled by the `backend` argument when provided,
otherwise by the env var GSPPY_BACKEND:
- "rust": require Rust extension (raise if missing)
- "gpu": try GPU path when available (currently singletons optimized),
- "gpu": try GPU path when available (currently singletons optimized),
fall back to CPU for the rest
- "python": force pure-Python fallback
- otherwise: try Rust first and fall back to Python
"""
# Intentionally fallback to Python for non-contiguous queries.
# The acceleration path is currently disabled for non-contiguous cases
# to facilitate testing and validation of the contiguous logic.
if not contiguous:
return support_counts_python(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function immediately returns the Python fallback when contiguous is False, even if the user has chosen the Rust or GPU backend. This design silently discards the acceleration path. If that is intentional, it should be clearly documented; otherwise, the accelerators need updating to handle non‑contiguous subsequences.

transactions, candidates, min_support_abs, batch_size, contiguous
)

backend_sel = (backend or _env_backend()).lower()

if backend_sel == "python":
return support_counts_python(
transactions, candidates, min_support_abs, batch_size, contiguous
)

if backend_sel == "gpu":
if not _gpu_available:
raise RuntimeError("GSPPY_BACKEND=gpu but CuPy GPU is not available")
# Encode once
enc_tx, inv_vocab, vocab = _get_encoded_transactions(transactions)
enc_cands = _encode_candidates(candidates, vocab)

# Partition candidates into singletons and non-singletons
singletons: List[Tuple[int, Tuple[str, ...]]] = []
others: List[Tuple[List[int], Tuple[str, ...]]] = []
# Pair original and encoded candidates; lengths should match
assert len(candidates) == len(enc_cands), "Encoded candidates length mismatch"
for orig, enc in zip(candidates, enc_cands): # noqa: B905 - lengths checked above
for orig, enc in zip(candidates, enc_cands, strict=False): # noqa: B905 - lengths checked above
if len(enc) == 1:
singletons.append((enc[0], orig))
else:
others.append((enc, orig))

out: Dict[Tuple[str, ...], int] = {}

# GPU path for singletons
Expand All @@ -210,14 +226,14 @@ def support_counts(
min_support_abs=min_support_abs,
vocab_size=vocab_size,
)
# Map back to original strings
# Map back to original strings
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment does not seems in the right identation

cand_by_id: Dict[int, Tuple[str, ...]] = {cid: orig for cid, orig in singletons}
for enc_cand, freq in gpu_res:
cid = enc_cand[0]
out[cand_by_id[cid]] = int(freq)

# Fallback for others (prefer rust when available)
if others:
other_candidates = [orig for _, orig in others]
if _rust_available:
try:
other_enc = [enc for enc, _ in others]
Expand All @@ -228,19 +244,12 @@ def support_counts(
out[tuple(inv_vocab[i] for i in enc_cand)] = int(freq)
except Exception:
# fallback to python
out.update(
support_counts_python(transactions, [orig for _, orig in others], min_support_abs, batch_size)
)
else:
out.update(
support_counts_python(transactions, [orig for _, orig in others], min_support_abs, batch_size)
)

out.update(support_counts_python(transactions, other_candidates, min_support_abs, batch_size, contiguous ))
else:
out.update(support_counts_python(transactions, other_candidates, min_support_abs, batch_size, contiguous ))
return out

if backend_sel == "python":
return support_counts_python(transactions, candidates, min_support_abs, batch_size)

if backend_sel == "rust":
if not _rust_available:
raise RuntimeError("GSPPY_BACKEND=rust but Rust extension _gsppy_rust is not available")
Expand All @@ -252,18 +261,19 @@ def support_counts(
for enc_cand, freq in result:
out_rust[tuple(inv_vocab[i] for i in enc_cand)] = int(freq)
return out_rust

# auto: try rust then fallback
if _rust_available:
enc_tx, inv_vocab, vocab = _get_encoded_transactions(transactions)
enc_cands = _encode_candidates(candidates, vocab)
try:
enc_tx, inv_vocab, vocab = _get_encoded_transactions(transactions)
enc_cands = _encode_candidates(candidates, vocab)
result = cast(List[Tuple[List[int], int]], _compute_supports_rust(enc_tx, enc_cands, int(min_support_abs)))
out2: Dict[Tuple[str, ...], int] = {}
out_auto: Dict[Tuple[str, ...], int] = {}
for enc_cand, freq in result:
out2[tuple(inv_vocab[i] for i in enc_cand)] = int(freq)
return out2
out_auto[tuple(inv_vocab[i] for i in enc_cand)] = int(freq)
return out_auto
except Exception:
pass

return support_counts_python(transactions, candidates, min_support_abs, batch_size)
return support_counts_python(
transactions, candidates, min_support_abs, batch_size, contiguous
)
10 changes: 8 additions & 2 deletions gsppy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,14 @@ def detect_and_read_file(file_path: str) -> List[List[str]]:
show_default=True,
help="Backend to use for support counting.",
)
@click.option(
"--contiguous",
is_flag=True,
default=False,
help="Enable to find only contiguous patterns (e.g., 'a' then 'b'). Default is non-contiguous.",
)
@click.option("--verbose", is_flag=True, help="Enable verbose output for debugging purposes.")
def main(file_path: str, min_support: float, backend: str, verbose: bool) -> None:
def main(file_path: str, min_support: float, backend: str,contiguous: bool, verbose: bool) -> None:
"""
Run the GSP algorithm on transactional data from a file.
"""
Expand All @@ -189,7 +195,7 @@ def main(file_path: str, min_support: float, backend: str, verbose: bool) -> Non
# Initialize and run GSP algorithm
try:
gsp = GSP(transactions)
patterns: List[Dict[Tuple[str, ...], int]] = gsp.search(min_support=min_support)
patterns: List[Dict[Tuple[str, ...], int]] = gsp.search(min_support=min_support,contiguous=contiguous)
logger.info("Frequent Patterns Found:")
for i, level in enumerate(patterns, start=1):
logger.info(f"\n{i}-Sequence Patterns:")
Expand Down
36 changes: 25 additions & 11 deletions gsppy/gsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@
from itertools import chain
from collections import Counter

from gsppy.utils import split_into_batches, is_subsequence_in_list, generate_candidates_from_previous
from gsppy.utils import (
split_into_batches,
is_subsequence_in_list,
is_subsequence_non_contiguous,
generate_candidates_from_previous,
)
from gsppy.accelerate import support_counts as support_counts_accel

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -180,7 +185,7 @@

@staticmethod
def _worker_batch(
batch: List[Tuple[str, ...]], transactions: List[Tuple[str, ...]], min_support: int
batch: List[Tuple[str, ...]], transactions: List[Tuple[str, ...]], min_support: int, contiguous: bool,
) -> List[Tuple[Tuple[str, ...], int]]:
"""
Evaluate a batch of candidate sequences to compute their support.
Expand All @@ -200,14 +205,16 @@
- The candidate's support count.
"""
results: List[Tuple[Tuple[str, ...], int]] = []
subsequence_checker = is_subsequence_in_list if contiguous else is_subsequence_non_contiguous

for item in batch:
frequency = sum(1 for t in transactions if is_subsequence_in_list(item, t))
frequency = sum(1 for t in transactions if subsequence_checker(item, t))
if frequency >= min_support:
results.append((item, frequency))
return results

def _support_python(
self, items: List[Tuple[str, ...]], min_support: int = 0, batch_size: int = 100
self, items: List[Tuple[str, ...]], min_support: int = 0, batch_size: int = 100,contiguous: bool = False,
) -> Dict[Tuple[str, ...], int]:
"""
Calculate support counts for candidate sequences using Python multiprocessing.
Expand All @@ -223,12 +230,13 @@
"""
# Split candidates into batches
batches = list(split_into_batches(items, batch_size))
#subsequence_checker = is_subsequence_in_list if contiguous else is_subsequence_non_contiguous

Check warning on line 233 in gsppy/gsp.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this commented out code.

See more on https://sonarcloud.io/project/issues?id=jacksonpradolima_gsp-py&issues=AZq1MAQqA9NlUWe5x8NW&open=AZq1MAQqA9NlUWe5x8NW&pullRequest=116

# Use multiprocessing pool to calculate frequency in parallel, batch-wise
with mp.Pool(processes=mp.cpu_count()) as pool:
batch_results = pool.starmap(
self._worker_batch, # Process a batch at a time
[(batch, self.transactions, min_support) for batch in batches],
[(batch, self.transactions, min_support,contiguous) for batch in batches],
)

# Flatten the list of results and convert to a dictionary
Expand All @@ -240,17 +248,18 @@
min_support: int = 0,
batch_size: int = 100,
backend: Optional[str] = None,
contiguous: bool = False,
) -> Dict[Tuple[str, ...], int]:
"""
Calculate support counts for candidate sequences using the fastest available backend.
This will try the Rust extension if available (and configured), otherwise fall back to
the Python multiprocessing implementation.
"""
try:
return support_counts_accel(self.transactions, items, min_support, batch_size, backend=backend)
return support_counts_accel(self.transactions, items, min_support, batch_size, backend=backend, contiguous=contiguous)
except Exception:
# Fallback to Python implementation on any acceleration failure
return self._support_python(items, min_support, batch_size)
return self._support_python(items, min_support, batch_size, contiguous=contiguous)

def _print_status(self, run: int, candidates: List[Tuple[str, ...]]) -> None:
"""
Expand All @@ -270,6 +279,8 @@
min_support: float = 0.2,
max_k: Optional[int] = None,
backend: Optional[str] = None,
batch_size: int = 100,

Check warning on line 282 in gsppy/gsp.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused function parameter "batch_size".

See more on https://sonarcloud.io/project/issues?id=jacksonpradolima_gsp-py&issues=AZq1MAQqA9NlUWe5x8NX&open=AZq1MAQqA9NlUWe5x8NX&pullRequest=116
contiguous: bool = False,
) -> List[Dict[Tuple[str, ...], int]]:
"""
Execute the Generalized Sequential Pattern (GSP) mining algorithm.
Expand All @@ -282,6 +293,8 @@
min_support (float): Minimum support threshold as a fraction of total transactions.
For example, `0.3` means that a sequence is frequent if it
appears in at least 30% of all transactions.
contiguous (bool): If True, finds only contiguous patterns (e.g., ['a', 'b'] in ['a', 'b', 'c']).
If False (default), finds non-contiguous patterns (e.g., ['a', 'c'] in ['a', 'b', 'c']).

Returns:
List[Dict[Tuple[str, ...], int]]: A list of dictionaries containing frequent patterns
Expand All @@ -295,10 +308,11 @@
- Information about the algorithm's start, intermediate progress (candidates filtered),
and completion.
- Status updates for each iteration until the algorithm terminates.

"""
if not 0.0 < min_support <= 1.0:
raise ValueError("Minimum support must be in the range (0.0, 1.0]")

self.freq_patterns=[]
logger.info(f"Starting GSP algorithm with min_support={min_support}...")

# Convert fractional support to absolute count (ceil to preserve threshold semantics)
Expand All @@ -311,7 +325,7 @@

# scan transactions to collect support count for each candidate
# sequence & filter
self.freq_patterns.append(self._support(candidates, abs_min_support, backend=backend))
self.freq_patterns.append(self._support(candidates, abs_min_support, backend=backend,contiguous=contiguous))

# (k-itemsets/k-sequence = 1)
k_items = 1
Expand All @@ -332,8 +346,8 @@

# candidate pruning - eliminates candidates who are not potentially
# frequent (using support as threshold)
self.freq_patterns.append(self._support(candidates, abs_min_support, backend=backend))
self.freq_patterns.append(self._support(candidates, abs_min_support, backend=backend, contiguous=contiguous))

self._print_status(k_items, candidates)
logger.info("GSP algorithm completed.")
return self.freq_patterns[:-1]
return [level for level in self.freq_patterns if level]
9 changes: 9 additions & 0 deletions gsppy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ def is_subsequence_in_list(subsequence: Tuple[str, ...], sequence: Tuple[str, ..
# Use any to check if any slice matches the sequence
return any(sequence[i : i + len_sub] == subsequence for i in range(len_seq - len_sub + 1))

@lru_cache(maxsize=32768)
def is_subsequence_non_contiguous(subsequence: Tuple[str, ...], sequence: Tuple[str, ...]) -> bool:
"""
Check if a subsequence exists within a sequence, allowing for gaps (non-contiguous).
"""
if not subsequence:
return True
it = iter(sequence)
return all(item in it for item in subsequence)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns False for an empty subsequence. By definition, the empty subsequence exists in every sequence. This edge case doesn’t affect the current algorithm (it never checks empty candidates), but a trivial fix would return True when subsequence is empty. A bounded lru_cache (maxsize) could also prevent unbounded memory use.


def generate_candidates_from_previous(prev_patterns: Dict[Tuple[str, ...], int]) -> List[Tuple[str, ...]]:
"""
Expand Down
Loading