|
5 | 5 | import logging
|
6 | 6 | import re
|
7 | 7 | import unicodedata
|
8 |
| -from typing import Any, Dict, Iterator, List, Optional, Tuple |
| 8 | +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union |
9 | 9 | from urllib.parse import quote_plus
|
10 | 10 |
|
11 | 11 | import boto3 # type: ignore
|
@@ -978,11 +978,21 @@ def _create_table(
|
978 | 978 | session: boto3.Session = _utils.ensure_session(session=boto3_session)
|
979 | 979 | client_glue: boto3.client = _utils.client(service_name="glue", session=session)
|
980 | 980 | exist: bool = does_table_exist(database=database, table=table, boto3_session=session)
|
981 |
| - if mode not in ("overwrite", "append"): # pragma: no cover |
982 |
| - raise exceptions.InvalidArgument(f"{mode} is not a valid mode. It must be 'overwrite' or 'append'.") |
| 981 | + if mode not in ("overwrite", "append", "overwrite_partitions"): # pragma: no cover |
| 982 | + raise exceptions.InvalidArgument( |
| 983 | + f"{mode} is not a valid mode. It must be 'overwrite', 'append' or 'overwrite_partitions'." |
| 984 | + ) |
983 | 985 | if (exist is True) and (mode == "overwrite"):
|
984 | 986 | skip_archive: bool = not catalog_versioning
|
| 987 | + partitions_values: List[List[str]] = list( |
| 988 | + _get_partitions(database=database, table=table, boto3_session=session).values() |
| 989 | + ) |
| 990 | + client_glue.batch_delete_partition( |
| 991 | + DatabaseName=database, TableName=table, PartitionsToDelete=[{"Values": v} for v in partitions_values] |
| 992 | + ) |
985 | 993 | client_glue.update_table(DatabaseName=database, TableInput=table_input, SkipArchive=skip_archive)
|
| 994 | + elif (exist is True) and (mode in ("append", "overwrite_partitions")) and (parameters is not None): |
| 995 | + upsert_table_parameters(parameters=parameters, database=database, table=table, boto3_session=session) |
986 | 996 | elif exist is False:
|
987 | 997 | client_glue.create_table(DatabaseName=database, TableInput=table_input)
|
988 | 998 |
|
@@ -1327,3 +1337,155 @@ def extract_athena_types(
|
1327 | 1337 | return _data_types.athena_types_from_pandas_partitioned(
|
1328 | 1338 | df=df, index=index, partition_cols=partition_cols, dtype=dtype, index_left=index_left
|
1329 | 1339 | )
|
| 1340 | + |
| 1341 | + |
| 1342 | +def get_table_parameters( |
| 1343 | + database: str, table: str, catalog_id: Optional[str] = None, boto3_session: Optional[boto3.Session] = None |
| 1344 | +) -> Dict[str, str]: |
| 1345 | + """Get all parameters. |
| 1346 | +
|
| 1347 | + Parameters |
| 1348 | + ---------- |
| 1349 | + database : str |
| 1350 | + Database name. |
| 1351 | + table : str |
| 1352 | + Table name. |
| 1353 | + catalog_id : str, optional |
| 1354 | + The ID of the Data Catalog from which to retrieve Databases. |
| 1355 | + If none is provided, the AWS account ID is used by default. |
| 1356 | + boto3_session : boto3.Session(), optional |
| 1357 | + Boto3 Session. The default boto3 session will be used if boto3_session receive None. |
| 1358 | +
|
| 1359 | + Returns |
| 1360 | + ------- |
| 1361 | + Dict[str, str] |
| 1362 | + Dictionary of parameters. |
| 1363 | +
|
| 1364 | + Examples |
| 1365 | + -------- |
| 1366 | + >>> import awswrangler as wr |
| 1367 | + >>> pars = wr.catalog.get_table_parameters(database="...", table="...") |
| 1368 | +
|
| 1369 | + """ |
| 1370 | + client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session) |
| 1371 | + args: Dict[str, str] = {} |
| 1372 | + if catalog_id is not None: |
| 1373 | + args["CatalogId"] = catalog_id # pragma: no cover |
| 1374 | + args["DatabaseName"] = database |
| 1375 | + args["Name"] = table |
| 1376 | + response: Dict[str, Any] = client_glue.get_table(**args) |
| 1377 | + parameters: Dict[str, str] = response["Table"]["Parameters"] |
| 1378 | + return parameters |
| 1379 | + |
| 1380 | + |
| 1381 | +def upsert_table_parameters( |
| 1382 | + parameters: Dict[str, str], |
| 1383 | + database: str, |
| 1384 | + table: str, |
| 1385 | + catalog_id: Optional[str] = None, |
| 1386 | + boto3_session: Optional[boto3.Session] = None, |
| 1387 | +) -> Dict[str, str]: |
| 1388 | + """Insert or Update the received parameters. |
| 1389 | +
|
| 1390 | + Parameters |
| 1391 | + ---------- |
| 1392 | + parameters : Dict[str, str] |
| 1393 | + e.g. {"source": "mysql", "destination": "datalake"} |
| 1394 | + database : str |
| 1395 | + Database name. |
| 1396 | + table : str |
| 1397 | + Table name. |
| 1398 | + catalog_id : str, optional |
| 1399 | + The ID of the Data Catalog from which to retrieve Databases. |
| 1400 | + If none is provided, the AWS account ID is used by default. |
| 1401 | + boto3_session : boto3.Session(), optional |
| 1402 | + Boto3 Session. The default boto3 session will be used if boto3_session receive None. |
| 1403 | +
|
| 1404 | + Returns |
| 1405 | + ------- |
| 1406 | + Dict[str, str] |
| 1407 | + All parameters after the upsert. |
| 1408 | +
|
| 1409 | + Examples |
| 1410 | + -------- |
| 1411 | + >>> import awswrangler as wr |
| 1412 | + >>> pars = wr.catalog.upsert_table_parameters( |
| 1413 | + ... parameters={"source": "mysql", "destination": "datalake"}, |
| 1414 | + ... database="...", |
| 1415 | + ... table="...") |
| 1416 | +
|
| 1417 | + """ |
| 1418 | + session: boto3.Session = _utils.ensure_session(session=boto3_session) |
| 1419 | + pars: Dict[str, str] = get_table_parameters( |
| 1420 | + database=database, table=table, catalog_id=catalog_id, boto3_session=session |
| 1421 | + ) |
| 1422 | + for k, v in parameters.items(): |
| 1423 | + pars[k] = v |
| 1424 | + overwrite_table_parameters( |
| 1425 | + parameters=pars, database=database, table=table, catalog_id=catalog_id, boto3_session=session |
| 1426 | + ) |
| 1427 | + return pars |
| 1428 | + |
| 1429 | + |
| 1430 | +def overwrite_table_parameters( |
| 1431 | + parameters: Dict[str, str], |
| 1432 | + database: str, |
| 1433 | + table: str, |
| 1434 | + catalog_id: Optional[str] = None, |
| 1435 | + boto3_session: Optional[boto3.Session] = None, |
| 1436 | +) -> Dict[str, str]: |
| 1437 | + """Overwrite all existing parameters. |
| 1438 | +
|
| 1439 | + Parameters |
| 1440 | + ---------- |
| 1441 | + parameters : Dict[str, str] |
| 1442 | + e.g. {"source": "mysql", "destination": "datalake"} |
| 1443 | + database : str |
| 1444 | + Database name. |
| 1445 | + table : str |
| 1446 | + Table name. |
| 1447 | + catalog_id : str, optional |
| 1448 | + The ID of the Data Catalog from which to retrieve Databases. |
| 1449 | + If none is provided, the AWS account ID is used by default. |
| 1450 | + boto3_session : boto3.Session(), optional |
| 1451 | + Boto3 Session. The default boto3 session will be used if boto3_session receive None. |
| 1452 | +
|
| 1453 | + Returns |
| 1454 | + ------- |
| 1455 | + Dict[str, str] |
| 1456 | + All parameters after the overwrite (The same received). |
| 1457 | +
|
| 1458 | + Examples |
| 1459 | + -------- |
| 1460 | + >>> import awswrangler as wr |
| 1461 | + >>> pars = wr.catalog.overwrite_table_parameters( |
| 1462 | + ... parameters={"source": "mysql", "destination": "datalake"}, |
| 1463 | + ... database="...", |
| 1464 | + ... table="...") |
| 1465 | +
|
| 1466 | + """ |
| 1467 | + client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session) |
| 1468 | + args: Dict[str, str] = {} |
| 1469 | + if catalog_id is not None: |
| 1470 | + args["CatalogId"] = catalog_id # pragma: no cover |
| 1471 | + args["DatabaseName"] = database |
| 1472 | + args["Name"] = table |
| 1473 | + response: Dict[str, Any] = client_glue.get_table(**args) |
| 1474 | + response["Table"]["Parameters"] = parameters |
| 1475 | + if "DatabaseName" in response["Table"]: |
| 1476 | + del response["Table"]["DatabaseName"] |
| 1477 | + if "CreateTime" in response["Table"]: |
| 1478 | + del response["Table"]["CreateTime"] |
| 1479 | + if "UpdateTime" in response["Table"]: |
| 1480 | + del response["Table"]["UpdateTime"] |
| 1481 | + if "CreatedBy" in response["Table"]: |
| 1482 | + del response["Table"]["CreatedBy"] |
| 1483 | + if "IsRegisteredWithLakeFormation" in response["Table"]: |
| 1484 | + del response["Table"]["IsRegisteredWithLakeFormation"] |
| 1485 | + args2: Dict[str, Union[str, Dict[str, Any]]] = {} |
| 1486 | + if catalog_id is not None: |
| 1487 | + args2["CatalogId"] = catalog_id # pragma: no cover |
| 1488 | + args2["DatabaseName"] = database |
| 1489 | + args2["TableInput"] = response["Table"] |
| 1490 | + client_glue.update_table(**args2) |
| 1491 | + return parameters |
0 commit comments