Skip to content

Commit ca7ae36

Browse files
committed
TPT's DDL and TDLOAD operator for dagster-teradata
1 parent 26a895f commit ca7ae36

File tree

7 files changed

+1922
-9
lines changed

7 files changed

+1922
-9
lines changed

libraries/dagster-teradata/dagster_teradata/resources.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from . import constants
2121
from dagster_teradata.ttu.bteq import Bteq
22+
from dagster_teradata.ttu.tpt import DdlOperator, TdLoadOperator
2223
from dagster_teradata.teradata_compute_cluster_manager import (
2324
TeradataComputeClusterManager,
2425
)
@@ -253,6 +254,10 @@ def __init__(
253254
self.teradata_connection_resource = teradata_connection_resource
254255
self.compute_cluster_manager = TeradataComputeClusterManager(self, log)
255256
self.bteq = Bteq(self, teradata_connection_resource, log)
257+
self.ddl = DdlOperator(self, teradata_connection_resource, log)
258+
self.tdload = TdLoadOperator(
259+
self, teradata_connection_resource, teradata_connection_resource, log
260+
)
256261
self.log = log
257262

258263
@public
@@ -491,6 +496,176 @@ def bteq_operator(
491496
temp_file_read_encoding=temp_file_read_encoding,
492497
)
493498

499+
def ddl_operator(
500+
self,
501+
ddl: list[str] = None,
502+
error_list: Optional[Union[int, List[int]]] = None,
503+
remote_working_dir: str = "/tmp",
504+
ddl_job_name: Optional[str] = None,
505+
remote_host: Optional[str] = None,
506+
remote_user: Optional[str] = None,
507+
remote_password: Optional[str] = None,
508+
ssh_key_path: Optional[str] = None,
509+
remote_port: int = 22,
510+
) -> Optional[int]:
511+
"""
512+
Execute one or more DDL (Data Definition Language) statements on Teradata.
513+
514+
This method provides a unified interface to run DDL operations with various configurations:
515+
- Local or remote execution
516+
- Direct SQL input or file-based input
517+
- Custom error handling
518+
- Timeout and job naming support
519+
520+
Args:
521+
ddl (Optional[str]): DDL statement(s) to execute. Can be a single statement or a path to a SQL file.
522+
error_list (Optional[Union[int, List[int]]]): Error codes to ignore during execution.
523+
remote_working_dir (str): Working directory on remote machine. Defaults to '/tmp'.
524+
ddl_job_name (Optional[str]): Name for the DDL job.
525+
remote_host (Optional[str]): Hostname or IP for remote execution. None for local execution.
526+
remote_user (Optional[str]): Username for remote authentication. Required if remote_host specified.
527+
remote_password (Optional[str]): Password for remote authentication. Alternative to ssh_key_path.
528+
ssh_key_path (Optional[str]): Path to SSH private key for authentication. Alternative to remote_password.
529+
remote_port (int): SSH port for remote connection. Defaults to 22.
530+
531+
Returns:
532+
Optional[int]: The return code of the DDL execution, or None if not applicable.
533+
534+
Raises:
535+
ValueError: If input validation fails.
536+
Exception: If DDL execution fails and error is not in error_list.
537+
"""
538+
# Validate input parameters
539+
if (
540+
not ddl
541+
or not isinstance(ddl, list)
542+
or not all(isinstance(stmt, str) and stmt.strip() for stmt in ddl)
543+
):
544+
raise ValueError(
545+
"ddl parameter must be a non-empty list of non-empty strings representing DDL statements."
546+
)
547+
548+
# Validate remote execution parameters if needed
549+
if remote_host:
550+
if not remote_user:
551+
raise ValueError("remote_user must be provided for remote execution")
552+
if not ssh_key_path and not remote_password:
553+
raise ValueError(
554+
"Either ssh_key_path or remote_password must be provided for remote execution"
555+
)
556+
if remote_password and ssh_key_path:
557+
raise ValueError(
558+
"Cannot specify both remote_password and ssh_key_path for remote execution"
559+
)
560+
561+
# Validate network port
562+
if remote_port < 1 or remote_port > 65535:
563+
raise ValueError("remote_port must be a valid port number (1-65535)")
564+
565+
# Delegate execution to the underlying DDL implementation
566+
return self.ddl.ddl_operator(
567+
ddl=ddl,
568+
error_list=error_list,
569+
remote_working_dir=remote_working_dir,
570+
ddl_job_name=ddl_job_name,
571+
remote_host=remote_host,
572+
remote_user=remote_user,
573+
remote_password=remote_password,
574+
ssh_key_path=ssh_key_path,
575+
remote_port=remote_port,
576+
)
577+
578+
def tdload_operator(
579+
self,
580+
source_table: Optional[str] = None,
581+
select_stmt: Optional[str] = None,
582+
insert_stmt: Optional[str] = None,
583+
target_table: Optional[str] = None,
584+
source_file_name: Optional[str] = None,
585+
target_file_name: Optional[str] = None,
586+
source_format: Optional[str] = None,
587+
source_text_delimiter: Optional[str] = None,
588+
target_format: Optional[str] = None,
589+
target_text_delimiter: Optional[str] = None,
590+
tdload_options: Optional[str] = None,
591+
tdload_job_name: Optional[str] = None,
592+
tdload_job_var_file: Optional[str] = None,
593+
remote_working_dir: Optional[str] = None,
594+
remote_host: Optional[str] = None,
595+
remote_user: Optional[str] = None,
596+
remote_password: Optional[str] = None,
597+
ssh_key_path: Optional[str] = None,
598+
remote_port: int = 22,
599+
) -> int | None:
600+
"""
601+
Execute a TDLoad operation with the provided parameters.
602+
603+
Args:
604+
teradata_connection_resource: Source Teradata connection resource.
605+
target_teradata_connection_resource: Target Teradata connection resource.
606+
source_table (Optional[str]): Source table name.
607+
select_stmt (Optional[str]): SELECT statement for data extraction.
608+
insert_stmt (Optional[str]): INSERT statement for data loading.
609+
target_table (Optional[str]): Target table name.
610+
source_file_name (Optional[str]): Source file name.
611+
target_file_name (Optional[str]): Target file name.
612+
source_format (Optional[str]): Source file format.
613+
source_text_delimiter (Optional[str]): Delimiter for source file.
614+
target_format (Optional[str]): Target file format.
615+
target_text_delimiter (Optional[str]): Delimiter for target file.
616+
tdload_options (Optional[str]): Additional TDLoad options.
617+
tdload_job_name (Optional[str]): TDLoad job name.
618+
tdload_job_var_file (Optional[str]): TDLoad job variable file.
619+
remote_working_dir (Optional[str]): Remote working directory.
620+
remote_host (Optional[str]): Remote host for execution.
621+
remote_user (Optional[str]): Remote user for execution.
622+
remote_password (Optional[str]): Remote password for execution.
623+
ssh_key_path (Optional[str]): SSH key path for remote execution.
624+
remote_port (int): Remote port for SSH. Defaults to 22.
625+
626+
Returns:
627+
int | None: Return code from the TDLoad operation.
628+
"""
629+
630+
# Validate remote execution parameters if needed
631+
if remote_host:
632+
if not remote_user:
633+
raise ValueError("remote_user must be provided for remote execution")
634+
if not ssh_key_path and not remote_password:
635+
raise ValueError(
636+
"Either ssh_key_path or remote_password must be provided for remote execution"
637+
)
638+
if remote_password and ssh_key_path:
639+
raise ValueError(
640+
"Cannot specify both remote_password and ssh_key_path for remote execution"
641+
)
642+
643+
# Validate network port
644+
if remote_port < 1 or remote_port > 65535:
645+
raise ValueError("remote_port must be a valid port number (1-65535)")
646+
647+
return self.tdload.tdload_operator(
648+
source_table=source_table,
649+
select_stmt=select_stmt,
650+
insert_stmt=insert_stmt,
651+
target_table=target_table,
652+
source_file_name=source_file_name,
653+
target_file_name=target_file_name,
654+
source_format=source_format,
655+
source_text_delimiter=source_text_delimiter,
656+
target_format=target_format,
657+
target_text_delimiter=target_text_delimiter,
658+
tdload_options=tdload_options,
659+
tdload_job_name=tdload_job_name,
660+
tdload_job_var_file=tdload_job_var_file,
661+
remote_working_dir=remote_working_dir,
662+
remote_host=remote_host,
663+
remote_user=remote_user,
664+
remote_password=remote_password,
665+
ssh_key_path=ssh_key_path,
666+
remote_port=remote_port,
667+
)
668+
494669
def drop_database(self, databases: Union[str, Sequence[str]]) -> None:
495670
"""
496671
Drop one or more databases in Teradata.

0 commit comments

Comments
 (0)