Skip to content

Commit 34cda39

Browse files
Add bulk-update command to index/delete records from TIMDEX parquet dataset
Why these changes are being introduced: * The timdex-index-manager (TIM) needs to support the v2 parquet dataset, which now contains records for both indexing and deleting. The new CLI command performs a "bulk update" given a subset of the dataset (filtered by 'run_date' and 'run_id') and uses the timdex-dataset-api library to read records from the TIMDEXDataset. By introducing a new CLI command, it doesn't require the feature flagging approach, allowing the existing CLI commands and helper functions to remain untouched for v1 purposes. How this addresses that need: * Implement 'bulk-update' CLI command * Add unit tests for 'bulk-update' Side effects of this change: * TIM remains backwards v1 compatible but will now support v2 runs. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-428
1 parent b6e8893 commit 34cda39

File tree

6 files changed

+1122
-740
lines changed

6 files changed

+1122
-740
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mypy = "*"
2121
pre-commit = "*"
2222
pytest = "*"
2323
ruff = "*"
24+
timdex-dataset-api = { git = "git+https://github.com/MITLibraries/timdex-dataset-api.git"}
2425
vcrpy = "*"
2526

2627
[requires]

Pipfile.lock

Lines changed: 968 additions & 739 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

compose.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ services:
1010
- discovery.type=single-node
1111
- bootstrap.memory_lock=true
1212
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
13+
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
1314
volumes:
1415
- opensearch-local-data:/usr/share/opensearch/data
1516
networks:
@@ -21,6 +22,7 @@ services:
2122
environment:
2223
- "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
2324
- 'OPENSEARCH_HOSTS=["http://opensearch:9200"]'
25+
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
2426
networks:
2527
- opensearch-local-net
2628
volumes:

tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from click.testing import CliRunner
44

55
from tim.opensearch import configure_opensearch_client
6+
from timdex_dataset_api import TIMDEXDataset
67

78
EXIT_CODES = {
89
"success": 0,

tests/test_cli.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import re
2+
from unittest.mock import MagicMock, patch
23

34
from freezegun import freeze_time
45

56
from tim.cli import main
7+
from tim.errors import BulkIndexingError
68

79
from .conftest import EXIT_CODES, my_vcr
810

@@ -256,3 +258,88 @@ def test_bulk_delete_with_source_success(caplog, runner):
256258
"from index 'alma-2022-09-01t00-00-00'" in caplog.text
257259
)
258260
assert "Bulk deletion complete!" in caplog.text
261+
262+
263+
@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
264+
@patch("tim.helpers.validate_bulk_cli_options")
265+
@patch("tim.opensearch.bulk_delete")
266+
@patch("tim.opensearch.bulk_index")
267+
def test_bulk_update_with_source_success(
268+
mock_bulk_index,
269+
mock_bulk_delete,
270+
mock_validate_bulk_cli_options,
271+
mock_timdex_dataset,
272+
caplog,
273+
monkeypatch,
274+
runner,
275+
):
276+
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
277+
mock_bulk_index.return_value = {
278+
"created": 1000,
279+
"updated": 0,
280+
"errors": 0,
281+
"total": 1000,
282+
}
283+
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
284+
mock_validate_bulk_cli_options.return_value = "alma"
285+
mock_timdex_dataset.return_value = MagicMock()
286+
287+
result = runner.invoke(
288+
main,
289+
[
290+
"bulk-update",
291+
"--source",
292+
"alma",
293+
"--run-date",
294+
"2024-12-01",
295+
"--run-id",
296+
"abc123",
297+
"s3://test-timdex-bucket/dataset",
298+
],
299+
)
300+
assert result.exit_code == EXIT_CODES["success"]
301+
assert (
302+
'Bulk update complete: {"index": {"created": 1000, "updated": 0, "errors": 0, "total": 1000}, "delete": {"deleted": 0, "errors": 0, "total": 0}'
303+
in caplog.text
304+
)
305+
306+
307+
@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
308+
@patch("tim.helpers.validate_bulk_cli_options")
309+
@patch("tim.opensearch.bulk_delete")
310+
@patch("tim.opensearch.bulk_index")
311+
def test_bulk_update_with_source_raise_bulk_indexing_error(
312+
mock_bulk_index,
313+
mock_bulk_delete,
314+
mock_validate_bulk_cli_options,
315+
mock_timdex_dataset,
316+
caplog,
317+
monkeypatch,
318+
runner,
319+
):
320+
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
321+
mock_bulk_index.side_effect = BulkIndexingError(
322+
record="alma:0", index="index", error="exception"
323+
)
324+
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
325+
mock_validate_bulk_cli_options.return_value = "alma"
326+
mock_timdex_dataset.return_value = MagicMock()
327+
328+
result = runner.invoke(
329+
main,
330+
[
331+
"bulk-update",
332+
"--source",
333+
"alma",
334+
"--run-date",
335+
"2024-12-01",
336+
"--run-id",
337+
"abc123",
338+
"s3://test-timdex-bucket/dataset",
339+
],
340+
)
341+
assert result.exit_code == EXIT_CODES["success"]
342+
assert (
343+
'Bulk update complete: {"index": {"created": 0, "updated": 0, "errors": 0, "total": 0}, "delete": {"deleted": 0, "errors": 0, "total": 0}'
344+
in caplog.text
345+
)

tim/cli.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
# ruff: noqa: TRY003, EM101
2+
import json
23
import logging
34
from datetime import timedelta
45
from time import perf_counter
56

67
import rich_click as click
7-
88
from tim import errors, helpers
99
from tim import opensearch as tim_os
1010
from tim.config import PRIMARY_ALIAS, VALID_SOURCES, configure_logger, configure_sentry
11+
from tim.errors import BulkIndexingError
12+
from timdex_dataset_api import TIMDEXDataset
1113

1214
logger = logging.getLogger(__name__)
1315

@@ -252,6 +254,7 @@ def promote(ctx: click.Context, index: str, alias: list[str]) -> None:
252254
# Bulk record processing commands
253255

254256

257+
# NOTE: FEATURE FLAG: 'bulk_index' may be removed entirely when v2 work done
255258
@main.command()
256259
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
257260
@click.option(
@@ -295,6 +298,7 @@ def bulk_index(ctx: click.Context, index: str, source: str, filepath: str) -> No
295298
)
296299

297300

301+
# NOTE: FEATURE FLAG: 'bulk_index' may be removed entirely when v2 work done
298302
@main.command()
299303
@click.option("-i", "--index", help="Name of the index to bulk delete records from.")
300304
@click.option(
@@ -334,3 +338,61 @@ def bulk_delete(ctx: click.Context, index: str, source: str, filepath: str) -> N
334338
results["deleted"],
335339
results["total"],
336340
)
341+
342+
343+
@main.command()
344+
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
345+
@click.option(
346+
"-s",
347+
"--source",
348+
type=click.Choice(VALID_SOURCES),
349+
help="Source whose primary-aliased index to bulk index records into.",
350+
)
351+
@click.option("-d", "--run-date", help="Run date, formatted as YYYY-MM-DD.")
352+
@click.option("-rid", "--run-id", help="Run ID.")
353+
@click.argument("dataset_path", type=click.Path())
354+
@click.pass_context
355+
def bulk_update(
356+
ctx: click.Context,
357+
index: str,
358+
source: str,
359+
run_date: str,
360+
run_id: str,
361+
dataset_path: str,
362+
):
363+
"""Bulk update records for an index.
364+
365+
Must provide either the name of an existing index in the cluster or a valid source.
366+
If source is provided, it will perform indexing and/or deletion of records for
367+
the primary-aliased index for the source.
368+
369+
The method will read transformed records from a TIMDEXDataset
370+
located at dataset_path using the 'timdex-dataset-api' library. The dataset
371+
is filtered by run date and run ID.
372+
373+
Logs an error and aborts if the provided index doesn't exist in the cluster.
374+
"""
375+
client = ctx.obj["CLIENT"]
376+
index = helpers.validate_bulk_cli_options(index, source, client)
377+
378+
logger.info(f"Bulk updating records from dataset '{dataset_path}' into '{index}'")
379+
380+
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
381+
delete_results = {"deleted": 0, "errors": 0, "total": 0}
382+
383+
td = TIMDEXDataset(location=dataset_path)
384+
td.load(run_date=run_date, run_id=run_id)
385+
386+
# bulk index records
387+
records_to_index = td.read_transformed_records_iter(action="index")
388+
try:
389+
index_results.update(tim_os.bulk_index(client, index, records_to_index))
390+
except BulkIndexingError as exception:
391+
logger.info(f"Bulk indexing failed: {exception}")
392+
393+
# bulk delete records
394+
records_to_delete = td.read_dicts_iter(columns=["timdex_record_id"], action="delete")
395+
delete_results.update(tim_os.bulk_delete(client, index, records_to_delete))
396+
397+
summary_results = {"index": index_results, "delete": delete_results}
398+
logger.info(f"Bulk update complete: {json.dumps(summary_results)}")

0 commit comments

Comments
 (0)