Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 185 additions & 1 deletion libraries/dagster-teradata/dagster_teradata/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
from contextlib import closing, contextmanager
from datetime import datetime
from textwrap import dedent
from typing import Any, List, Mapping, Optional, Sequence, Union, TYPE_CHECKING, Literal
from typing import (
Any,
List,
Mapping,
Optional,
Sequence,
Union,
TYPE_CHECKING,
Literal,
)

import re
import dagster._check as check
Expand All @@ -19,6 +28,7 @@

from . import constants
from dagster_teradata.ttu.bteq import Bteq
from dagster_teradata.ttu.tpt import DdlOperator, TdLoadOperator
from dagster_teradata.teradata_compute_cluster_manager import (
TeradataComputeClusterManager,
)
Expand Down Expand Up @@ -253,6 +263,10 @@ def __init__(
self.teradata_connection_resource = teradata_connection_resource
self.compute_cluster_manager = TeradataComputeClusterManager(self, log)
self.bteq = Bteq(self, teradata_connection_resource, log)
self.ddl = DdlOperator(self, teradata_connection_resource, log)
self.tdload = TdLoadOperator(
self, teradata_connection_resource, teradata_connection_resource, log
)
self.log = log

@public
Expand Down Expand Up @@ -491,6 +505,176 @@ def bteq_operator(
temp_file_read_encoding=temp_file_read_encoding,
)

def ddl_operator(
self,
ddl: list[str] = None,
error_list: Optional[Union[int, List[int]]] = None,
remote_working_dir: str = "/tmp",
ddl_job_name: Optional[str] = None,
remote_host: Optional[str] = None,
remote_user: Optional[str] = None,
remote_password: Optional[str] = None,
ssh_key_path: Optional[str] = None,
remote_port: int = 22,
) -> Optional[int]:
"""
Execute one or more DDL (Data Definition Language) statements on Teradata.

This method provides a unified interface to run DDL operations with various configurations:
- Local or remote execution
- Direct SQL input or file-based input
- Custom error handling
- Timeout and job naming support

Args:
ddl (Optional[str]): DDL statement(s) to execute. Can be a single statement or a path to a SQL file.
error_list (Optional[Union[int, List[int]]]): Error codes to ignore during execution.
remote_working_dir (str): Working directory on remote machine. Defaults to '/tmp'.
ddl_job_name (Optional[str]): Name for the DDL job.
remote_host (Optional[str]): Hostname or IP for remote execution. None for local execution.
remote_user (Optional[str]): Username for remote authentication. Required if remote_host specified.
remote_password (Optional[str]): Password for remote authentication. Alternative to ssh_key_path.
ssh_key_path (Optional[str]): Path to SSH private key for authentication. Alternative to remote_password.
remote_port (int): SSH port for remote connection. Defaults to 22.

Returns:
Optional[int]: The return code of the DDL execution, or None if not applicable.

Raises:
ValueError: If input validation fails.
Exception: If DDL execution fails and error is not in error_list.
"""
# Validate input parameters
if (
not ddl
or not isinstance(ddl, list)
or not all(isinstance(stmt, str) and stmt.strip() for stmt in ddl)
):
raise ValueError(
"ddl parameter must be a non-empty list of non-empty strings representing DDL statements."
)

# Validate remote execution parameters if needed
if remote_host:
if not remote_user:
raise ValueError("remote_user must be provided for remote execution")
if not ssh_key_path and not remote_password:
raise ValueError(
"Either ssh_key_path or remote_password must be provided for remote execution"
)
if remote_password and ssh_key_path:
raise ValueError(
"Cannot specify both remote_password and ssh_key_path for remote execution"
)

# Validate network port
if remote_port < 1 or remote_port > 65535:
raise ValueError("remote_port must be a valid port number (1-65535)")

# Delegate execution to the underlying DDL implementation
return self.ddl.ddl_operator(
ddl=ddl,
error_list=error_list,
remote_working_dir=remote_working_dir,
ddl_job_name=ddl_job_name,
remote_host=remote_host,
remote_user=remote_user,
remote_password=remote_password,
ssh_key_path=ssh_key_path,
remote_port=remote_port,
)

def tdload_operator(
self,
source_table: Optional[str] = None,
select_stmt: Optional[str] = None,
insert_stmt: Optional[str] = None,
target_table: Optional[str] = None,
source_file_name: Optional[str] = None,
target_file_name: Optional[str] = None,
source_format: Optional[str] = None,
source_text_delimiter: Optional[str] = None,
target_format: Optional[str] = None,
target_text_delimiter: Optional[str] = None,
tdload_options: Optional[str] = None,
tdload_job_name: Optional[str] = None,
tdload_job_var_file: Optional[str] = None,
remote_working_dir: Optional[str] = None,
remote_host: Optional[str] = None,
remote_user: Optional[str] = None,
remote_password: Optional[str] = None,
ssh_key_path: Optional[str] = None,
remote_port: int = 22,
) -> int | None:
"""
Execute a TDLoad operation with the provided parameters.

Args:
teradata_connection_resource: Source Teradata connection resource.
target_teradata_connection_resource: Target Teradata connection resource.
source_table (Optional[str]): Source table name.
select_stmt (Optional[str]): SELECT statement for data extraction.
insert_stmt (Optional[str]): INSERT statement for data loading.
target_table (Optional[str]): Target table name.
source_file_name (Optional[str]): Source file name.
target_file_name (Optional[str]): Target file name.
source_format (Optional[str]): Source file format.
source_text_delimiter (Optional[str]): Delimiter for source file.
target_format (Optional[str]): Target file format.
target_text_delimiter (Optional[str]): Delimiter for target file.
tdload_options (Optional[str]): Additional TDLoad options.
tdload_job_name (Optional[str]): TDLoad job name.
tdload_job_var_file (Optional[str]): TDLoad job variable file.
remote_working_dir (Optional[str]): Remote working directory.
remote_host (Optional[str]): Remote host for execution.
remote_user (Optional[str]): Remote user for execution.
remote_password (Optional[str]): Remote password for execution.
ssh_key_path (Optional[str]): SSH key path for remote execution.
remote_port (int): Remote port for SSH. Defaults to 22.

Returns:
int | None: Return code from the TDLoad operation.
"""

# Validate remote execution parameters if needed
if remote_host:
if not remote_user:
raise ValueError("remote_user must be provided for remote execution")
if not ssh_key_path and not remote_password:
raise ValueError(
"Either ssh_key_path or remote_password must be provided for remote execution"
)
if remote_password and ssh_key_path:
raise ValueError(
"Cannot specify both remote_password and ssh_key_path for remote execution"
)

# Validate network port
if remote_port < 1 or remote_port > 65535:
raise ValueError("remote_port must be a valid port number (1-65535)")

return self.tdload.tdload_operator(
source_table=source_table,
select_stmt=select_stmt,
insert_stmt=insert_stmt,
target_table=target_table,
source_file_name=source_file_name,
target_file_name=target_file_name,
source_format=source_format,
source_text_delimiter=source_text_delimiter,
target_format=target_format,
target_text_delimiter=target_text_delimiter,
tdload_options=tdload_options,
tdload_job_name=tdload_job_name,
tdload_job_var_file=tdload_job_var_file,
remote_working_dir=remote_working_dir,
remote_host=remote_host,
remote_user=remote_user,
remote_password=remote_password,
ssh_key_path=ssh_key_path,
remote_port=remote_port,
)

def drop_database(self, databases: Union[str, Sequence[str]]) -> None:
"""
Drop one or more databases in Teradata.
Expand Down
3 changes: 3 additions & 0 deletions libraries/dagster-teradata/dagster_teradata/ttu/bteq.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ def _transfer_to_and_execute_bteq_on_remote(
raise DagsterError(
"Failed to establish SSH connection. `ssh_client` is None."
)
self.log.info(
"Successfully established SSH connection with host: %s", remote_host
)
verify_bteq_installed_remote(self.ssh_client)
password = generate_random_password() # Encryption/Decryption password
encrypted_file_path = os.path.join(tmp_dir, "bteq_script.enc")
Expand Down
Loading
Loading