Skip to content

Commit 2dfc181

Browse files
[wip]
1 parent b6e8893 commit 2dfc181

File tree

7 files changed

+1189
-740
lines changed

7 files changed

+1189
-740
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mypy = "*"
2121
pre-commit = "*"
2222
pytest = "*"
2323
ruff = "*"
24+
timdex-dataset-api = { git = "git+https://github.com/MITLibraries/timdex-dataset-api.git"}
2425
vcrpy = "*"
2526

2627
[requires]

Pipfile.lock

Lines changed: 968 additions & 739 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/conftest.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import vcr
33
from click.testing import CliRunner
44

5+
from tests.utils import generate_sample_records
56
from tim.opensearch import configure_opensearch_client
7+
from timdex_dataset_api import TIMDEXDataset
68

79
EXIT_CODES = {
810
"success": 0,
@@ -33,3 +35,30 @@ def test_opensearch_client():
3335
@pytest.fixture
3436
def runner():
3537
return CliRunner()
38+
39+
40+
@pytest.fixture
41+
def local_dataset_location(tmp_path):
42+
return str(tmp_path / "fixed_local_dataset/")
43+
44+
45+
@pytest.fixture
46+
def fixed_local_dataset(local_dataset_location):
47+
td = TIMDEXDataset(local_dataset_location)
48+
49+
for source, run_date, run_id, action in [
50+
("alma", "2024-12-01", "abc123", "index"),
51+
("alma", "2024-12-02", "def456", "delete"),
52+
]:
53+
td.write(
54+
generate_sample_records(
55+
num_records=1_000,
56+
timdex_record_id_prefix=source,
57+
source=source,
58+
run_date=run_date,
59+
action=action,
60+
run_id=run_id,
61+
)
62+
)
63+
td.load()
64+
return td

tests/test_cli.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import re
2+
from unittest.mock import patch
23

34
from freezegun import freeze_time
45

56
from tim.cli import main
7+
from tim.errors import BulkIndexingError
68

79
from .conftest import EXIT_CODES, my_vcr
810

@@ -256,3 +258,85 @@ def test_bulk_delete_with_source_success(caplog, runner):
256258
"from index 'alma-2022-09-01t00-00-00'" in caplog.text
257259
)
258260
assert "Bulk deletion complete!" in caplog.text
261+
262+
263+
@patch("tim.helpers.validate_bulk_cli_options")
264+
@patch("tim.opensearch.bulk_delete")
265+
@patch("tim.opensearch.bulk_index")
266+
def test_bulk_update_with_source_success(
267+
mock_bulk_index,
268+
mock_bulk_delete,
269+
mock_validate_bulk_cli_options,
270+
caplog,
271+
monkeypatch,
272+
runner,
273+
fixed_local_dataset,
274+
):
275+
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
276+
mock_bulk_index.return_value = {
277+
"created": 1000,
278+
"updated": 0,
279+
"errors": 0,
280+
"total": 1000,
281+
}
282+
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
283+
mock_validate_bulk_cli_options.return_value = "index"
284+
285+
result = runner.invoke(
286+
main,
287+
[
288+
"bulk-update",
289+
"--source",
290+
"alma",
291+
"--run-date",
292+
"2024-12-01",
293+
"--run-id",
294+
"abc123",
295+
fixed_local_dataset.location,
296+
],
297+
)
298+
assert result.exit_code == EXIT_CODES["success"]
299+
assert (
300+
"Bulk update complete: {'created': 1000, 'updated': 0, 'deleted': 0, 'total': 1000, 'bulk_index_errors': 0, 'bulk_delete_errors': 0}"
301+
in caplog.text
302+
)
303+
304+
305+
@patch("tim.helpers.validate_bulk_cli_options")
306+
@patch("tim.opensearch.bulk_delete")
307+
@patch("tim.opensearch.bulk_index")
308+
def test_bulk_update_with_source_raise_bulk_indexing_error(
309+
mock_bulk_index,
310+
mock_bulk_delete,
311+
mock_validate_bulk_cli_options,
312+
caplog,
313+
monkeypatch,
314+
runner,
315+
fixed_local_dataset,
316+
):
317+
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
318+
mock_bulk_index.side_effect = BulkIndexingError(
319+
record="alma:0", index="index", error="exception"
320+
)
321+
322+
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
323+
mock_validate_bulk_cli_options.return_value = "index"
324+
325+
result = runner.invoke(
326+
main,
327+
[
328+
"bulk-update",
329+
"--source",
330+
"alma",
331+
"--run-date",
332+
"2024-12-01",
333+
"--run-id",
334+
"abc123",
335+
fixed_local_dataset.location,
336+
],
337+
)
338+
assert result.exit_code == EXIT_CODES["success"]
339+
assert (
340+
"Bulk update complete: {'created': 0, 'updated': 0, 'deleted': 0, 'total': 0, 'bulk_index_errors': 0, 'bulk_delete_errors': 0}"
341+
in caplog.text
342+
)

tests/utils.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import uuid
2+
from collections.abc import Iterator
3+
4+
from timdex_dataset_api import DatasetRecord
5+
6+
7+
def generate_sample_records(
8+
num_records: int,
9+
timdex_record_id_prefix: str = "alma",
10+
source: str | None = "alma",
11+
run_date: str | None = "2024-12-01",
12+
run_type: str | None = "daily",
13+
action: str | None = "index",
14+
run_id: str | None = None,
15+
) -> Iterator[DatasetRecord]:
16+
"""Generate sample DatasetRecords."""
17+
if not run_id:
18+
run_id = str(uuid.uuid4())
19+
20+
for x in range(num_records):
21+
yield DatasetRecord(
22+
timdex_record_id=f"{timdex_record_id_prefix}:{x}",
23+
source_record=b"<record><title>Hello World.</title></record>",
24+
transformed_record=b"""{"title":["Hello World."]}""",
25+
source=source,
26+
run_date=run_date,
27+
run_type=run_type,
28+
action=action,
29+
run_id=run_id,
30+
)

tim/cli.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
# ruff: noqa: TRY003, EM101
2+
import json
23
import logging
34
from datetime import timedelta
45
from time import perf_counter
56

67
import rich_click as click
7-
88
from tim import errors, helpers
99
from tim import opensearch as tim_os
1010
from tim.config import PRIMARY_ALIAS, VALID_SOURCES, configure_logger, configure_sentry
11+
from tim.errors import BulkIndexingError
12+
from timdex_dataset_api import TIMDEXDataset
1113

1214
logger = logging.getLogger(__name__)
1315

@@ -252,6 +254,7 @@ def promote(ctx: click.Context, index: str, alias: list[str]) -> None:
252254
# Bulk record processing commands
253255

254256

257+
# NOTE: FEATURE FLAG: 'bulk_index' may be removed entirely when v2 work done
255258
@main.command()
256259
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
257260
@click.option(
@@ -295,6 +298,7 @@ def bulk_index(ctx: click.Context, index: str, source: str, filepath: str) -> No
295298
)
296299

297300

301+
# NOTE: FEATURE FLAG: 'bulk_index' may be removed entirely when v2 work done
298302
@main.command()
299303
@click.option("-i", "--index", help="Name of the index to bulk delete records from.")
300304
@click.option(
@@ -334,3 +338,62 @@ def bulk_delete(ctx: click.Context, index: str, source: str, filepath: str) -> N
334338
results["deleted"],
335339
results["total"],
336340
)
341+
342+
343+
@main.command()
344+
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
345+
@click.option(
346+
"-s",
347+
"--source",
348+
type=click.Choice(VALID_SOURCES),
349+
help="Source whose primary-aliased index to bulk index records into.",
350+
)
351+
@click.option("-d", "--run-date", help="Run date, formatted as YYYY-MM-DD.")
352+
@click.option("-rid", "--run-id", help="Run ID.")
353+
@click.argument("dataset_path", type=click.Path())
354+
@click.pass_context
355+
def bulk_update(
356+
ctx: click.Context,
357+
index: str,
358+
source: str,
359+
run_date: str,
360+
run_id: str,
361+
dataset_path: str,
362+
):
363+
"""Bulk update records for an index.
364+
365+
Must provide either the name of an existing index in the cluster or a valid source.
366+
If source is provided, it will perform indexing and/or deletion of records for
367+
the primary-aliased index for the source.
368+
369+
The method will read transformed records from a TIMDEXDataset
370+
located at dataset_path using the 'timdex-dataset-api' library. The dataset
371+
is filtered by run date and run ID.
372+
373+
Logs an error and aborts if the provided index doesn't exist in the cluster.
374+
"""
375+
client = ctx.obj["CLIENT"]
376+
index = helpers.validate_bulk_cli_options(index, source, client)
377+
378+
logger.info(f"Bulk updating records from dataset '{dataset_path}' into '{index}'")
379+
380+
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
381+
delete_results = {"deleted": 0, "errors": 0, "total": 0}
382+
383+
td = TIMDEXDataset(location=dataset_path)
384+
td.load(run_date=run_date, run_id=run_id)
385+
386+
# bulk index records
387+
records_to_index = td.read_transformed_records_iter(action="index")
388+
try:
389+
index_results.update(tim_os.bulk_index(client, index, records_to_index))
390+
except BulkIndexingError as exception:
391+
logger.info(f"Bulk indexing failed: {exception}")
392+
393+
# bulk delete records
394+
records_to_delete = td.read_transformed_records_iter(action="delete")
395+
delete_results.update(tim_os.bulk_delete(client, index, records_to_delete))
396+
397+
logger.info(
398+
f"Bulk update complete: {helpers.sum_bulk_update_results(index_results, delete_results)}"
399+
)

tim/helpers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,19 @@ def parse_deleted_records(filepath: str) -> Generator[dict, None, None]:
8383
yield {"timdex_record_id": item.rstrip()}
8484

8585

86+
def sum_bulk_update_results(
87+
index_results: dict[str, int], delete_results: dict[str, int]
88+
):
89+
return {
90+
"created": index_results["created"],
91+
"updated": index_results["updated"],
92+
"deleted": delete_results["deleted"],
93+
"total": index_results["total"] + delete_results["total"],
94+
"bulk_index_errors": index_results["errors"],
95+
"bulk_delete_errors": delete_results["errors"],
96+
}
97+
98+
8699
def validate_bulk_cli_options(
87100
index: str | None, source: str, client: tim_os.OpenSearch
88101
) -> str:

0 commit comments

Comments
 (0)