33import subprocess
44import tempfile
55from contextlib import contextmanager
6- from typing import Optional , List , Union , Literal
6+ from typing import Optional , List , Union , Literal , cast
77
88import paramiko
99from dagster import DagsterError
@@ -201,16 +201,20 @@ def bteq_operator(
201201 if self .file_path :
202202 if not self ._setup_ssh_connection (
203203 host = self .remote_host ,
204- user = self .remote_user ,
204+ user = cast ( str , self .remote_user ) ,
205205 password = self .remote_remote_password ,
206206 key_path = self .ssh_key_path ,
207207 port = self .remote_port ,
208208 ):
209209 raise DagsterError (
210210 "Failed to establish SSH connection. Please check the provided credentials."
211211 )
212- if self .file_path and is_valid_remote_bteq_script_file (
213- self .ssh_client , self .file_path
212+ if (
213+ self .file_path
214+ and self .ssh_client
215+ and is_valid_remote_bteq_script_file (
216+ self .ssh_client , self .file_path
217+ )
214218 ):
215219 return self ._handle_remote_bteq_file (
216220 ssh_client = self .ssh_client ,
@@ -230,7 +234,7 @@ def execute_bteq_script(
230234 bteq_script : str ,
231235 remote_working_dir : str | None ,
232236 bteq_script_encoding : str | None ,
233- timeout : int ,
237+ timeout : Optional [ int ], # or: timeout: int | None
234238 timeout_rc : int | None ,
235239 bteq_session_encoding : str | None ,
236240 bteq_quit_rc : int | list [int ] | tuple [int , ...] | None ,
@@ -297,7 +301,7 @@ def execute_bteq_script_at_remote(
297301 bteq_script : str ,
298302 remote_working_dir : str | None ,
299303 bteq_script_encoding : str | None ,
300- timeout : int ,
304+ timeout : Optional [ int ], # or: timeout: int | None
301305 timeout_rc : int | None ,
302306 bteq_session_encoding : str | None ,
303307 bteq_quit_rc : int | list [int ] | tuple [int , ...] | None ,
@@ -347,8 +351,8 @@ def execute_bteq_script_at_remote(
347351 bteq_quit_rc ,
348352 bteq_session_encoding ,
349353 tmp_dir ,
350- remote_host ,
351- remote_user ,
354+ cast ( str , remote_host ) ,
355+ cast ( str , remote_user ) ,
352356 remote_password ,
353357 ssh_key_path ,
354358 remote_port ,
@@ -359,7 +363,7 @@ def _transfer_to_and_execute_bteq_on_remote(
359363 file_path : str ,
360364 remote_working_dir : str | None ,
361365 bteq_script_encoding : str | None ,
362- timeout : int ,
366+ timeout : Optional [ int ], # or: timeout: int | None
363367 timeout_rc : int | None ,
364368 bteq_quit_rc : int | list [int ] | tuple [int , ...] | None ,
365369 bteq_session_encoding : str | None ,
@@ -498,7 +502,7 @@ def execute_bteq_script_at_local(
498502 self ,
499503 bteq_script : str ,
500504 bteq_script_encoding : str | None ,
501- timeout : int ,
505+ timeout : Optional [ int ], # or: timeout: int | None
502506 timeout_rc : int | None ,
503507 bteq_quit_rc : int | list [int ] | tuple [int , ...] | None ,
504508 bteq_session_encoding : str | None ,
@@ -547,7 +551,7 @@ def execute_bteq_script_at_local(
547551 try :
548552 # https://docs.python.org/3.10/library/subprocess.html#subprocess.Popen.wait timeout is in seconds
549553 process .wait (
550- timeout = timeout + 60
554+ timeout = ( timeout or 0 ) + 60
551555 ) # Adding 1 minute extra for BTEQ script timeout
552556 except subprocess .TimeoutExpired :
553557 self .on_kill ()
@@ -583,10 +587,6 @@ def execute_bteq_script_at_local(
583587
584588 return process .returncode
585589
586- def contains_template (parameter_value ):
587- """Check if the parameter contains Jinja templating syntax."""
588- return "{{" in parameter_value and "}}" in parameter_value
589-
590590 def on_kill (self ):
591591 """Terminate the subprocess if running."""
592592 conn = self .connection
@@ -677,7 +677,7 @@ def _handle_remote_bteq_file(
677677 remote_user = self .remote_user ,
678678 remote_password = self .remote_remote_password ,
679679 ssh_key_path = self .ssh_key_path ,
680- remote_port = self .remote_port ,
680+ remote_port = self .remote_port or 22 ,
681681 )
682682 else :
683683 raise ValueError (
@@ -718,7 +718,7 @@ def _handle_local_bteq_file(self, file_path: str) -> int | None:
718718 def _setup_ssh_connection (
719719 self ,
720720 host : str ,
721- user : str ,
721+ user : Optional [ str ] ,
722722 password : Optional [str ],
723723 key_path : Optional [str ],
724724 port : int ,
@@ -753,6 +753,11 @@ def _setup_ssh_connection(
753753 self .ssh_client .connect (host , port = port , username = user , pkey = key )
754754 else :
755755 if not password :
756+ if user is None :
757+ raise ValueError (
758+ "Username is required to fetch stored credentials"
759+ )
760+ # Attempt to retrieve stored credentials
756761 creds = get_stored_credentials (self , host , user )
757762 password = (
758763 self .cred_manager .decrypt (creds ["password" ]) if creds else None
0 commit comments