Skip to content

Commit 7e62c90

Browse files
fix(materialize tests): remove and refactor try blocks, address feedback
1 parent 10779b3 commit 7e62c90

File tree

12 files changed

+803
-1012
lines changed

12 files changed

+803
-1012
lines changed

compose.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,8 @@ services:
570570
test:
571571
- CMD-SHELL
572572
- bash -c 'printf "GET / HTTP/1.1\n\n" > /dev/tcp/127.0.0.1/6875; exit $$?;'
573+
- bash -c 'printf "GET / HTTP/1.1\n\n" > /dev/tcp/127.0.0.1:6876; exit $$?;'
574+
- bash -c 'printf "GET / HTTP/1.1\n\n" > /dev/tcp/127.0.0.1:6874; exit $$?;'
573575
interval: 2s
574576
retries: 30
575577
timeout: 5s

ibis/backends/materialize/__init__.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import Any
66

77
import sqlglot as sg
8+
from psycopg.errors import InternalError_, ProgrammingError, UndefinedFunction
89
from sqlglot import expressions as sge
910

1011
import ibis
@@ -141,8 +142,13 @@ def version(self):
141142
else version_str
142143
)
143144
return "unknown"
144-
except Exception: # noqa: BLE001
145-
# Fallback to server_version if mz_version() doesn't work
145+
except (
146+
UndefinedFunction,
147+
InternalError_,
148+
ProgrammingError,
149+
):
150+
# Fallback to server_version if mz_version() doesn't exist or fails
151+
# Note: Materialize returns InternalError_ for undefined functions
146152
return super().version
147153

148154
@property
@@ -1242,14 +1248,13 @@ def subscribe(
12421248
while True:
12431249
# Fetch a batch of rows
12441250
cursor.execute(f"FETCH {batch_size} {cursor_name}")
1245-
rows = cursor.fetchall()
12461251

12471252
# Get columns from first fetch
12481253
if columns is None and cursor.description:
12491254
columns = [desc[0] for desc in cursor.description]
12501255

12511256
# If no rows and up_to was specified, we're done
1252-
if not rows:
1257+
if not (rows := cursor.fetchall()):
12531258
if up_to is not None:
12541259
break
12551260
# Otherwise wait a bit and try again
@@ -1939,7 +1944,7 @@ def alter_connection(
19391944
- For SSH key rotation, update bastion server keys manually after rotation
19401945
19411946
"""
1942-
if not any([set_options, reset_options, rotate_keys]):
1947+
if not (set_options or reset_options or rotate_keys):
19431948
raise ValueError(
19441949
"Must specify at least one of: set_options, reset_options, rotate_keys"
19451950
)

ibis/backends/materialize/tests/conftest.py

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -82,27 +82,21 @@ def _load_data(self, **_: Any) -> None:
8282
for csv_file in self.test_files:
8383
table_name = csv_file.stem
8484
if table_name in self.connection.list_tables() and csv_file.exists():
85-
try:
86-
# Get column list from schema
87-
schema = self.connection.get_schema(table_name)
88-
columns = list(schema.keys())
89-
col_list = ", ".join(f'"{c}"' for c in columns)
90-
91-
# Use COPY FROM STDIN with CSV format (psycopg3 API)
92-
copy_sql = f'COPY "{table_name}" ({col_list}) FROM STDIN WITH (FORMAT CSV, HEADER true)'
93-
94-
with con.cursor() as cur:
95-
# Open CSV file and use copy() context manager for psycopg3
96-
with open(csv_file) as f:
97-
with cur.copy(copy_sql) as copy:
98-
while data := f.read(8192):
99-
copy.write(data)
100-
con.commit()
101-
except Exception as e: # noqa: BLE001
102-
# Log but don't fail - some tables might be pre-populated
103-
import warnings
104-
105-
warnings.warn(f"Could not load {table_name}: {e}")
85+
# Get column list from schema
86+
schema = self.connection.get_schema(table_name)
87+
columns = list(schema.keys())
88+
col_list = ", ".join(f'"{c}"' for c in columns)
89+
90+
# Use COPY FROM STDIN with CSV format (psycopg3 API)
91+
copy_sql = f'COPY "{table_name}" ({col_list}) FROM STDIN WITH (FORMAT CSV, HEADER true)'
92+
93+
with con.cursor() as cur:
94+
# Open CSV file and use copy() context manager for psycopg3
95+
with open(csv_file) as f:
96+
with cur.copy(copy_sql) as copy:
97+
while data := f.read(8192):
98+
copy.write(data)
99+
con.commit()
106100

107101
@staticmethod
108102
def connect(*, tmpdir, worker_id, **kw): # noqa: ARG004

0 commit comments

Comments
 (0)