Skip to content

Commit 998e6d3

Browse files
#1622: Fix more cases (including CI)
# Summary The CI tests identified some issues that don't show up on a normal test run. This commit fixes those issues. It also highlighted that there were numerous areas that didn't have sufficient test coverage for the case that the caller had already opened the resource. The indexer has some notable changes, but the biggest area affected is the parsers when writing from an already opened source. This commit adds unit tests for the index and all the parser formats for this case, and fixes the code to support the lack of nested contexts. # Tests - Setup the required databases for CI by copying the commands in the github actions - Run `hatch run +py=3.11 ci:test` and ensure all tests pass and coverage remains sufficient - Run `hatch run test` in case it is different and ensure all tests pass and coverage remains sufficient This also means that all linting etc. has been run too.
1 parent bdccca0 commit 998e6d3

File tree

31 files changed

+405
-49
lines changed

31 files changed

+405
-49
lines changed

frictionless/analyzer/analyzer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ def analyze_table_resource(
3434
# Iterate rows
3535
columns_data: Dict[str, List[Any]] = {}
3636
numeric = ["integer", "numeric", "number"]
37-
with resource:
37+
# Use a copy of the resource to avoid side effects (see #1622)
38+
with resource.to_copy() as resource:
3839
for row in resource.row_stream:
3940
null_columns = 0
4041
for field_name in row:

frictionless/formats/gsheets/parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ def write_row_stream(self, source: TableResource):
5353
sh = gc.open_by_key(key)
5454
wks = sh.worksheet_by_id(gid) if gid else sh[0] # type: ignore
5555
data: List[Any] = []
56-
with source:
56+
# Use a copy of the source to avoid side effects (see #1622)
57+
with source.to_copy() as source:
5758
data.append(source.schema.field_names)
5859
for row in source.row_stream:
5960
data.append(row.to_list())

frictionless/formats/html/parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def read_cell_stream_create(self) -> types.ICellStream:
5757
# It will give us an ability to support HtmlDialect
5858
def write_row_stream(self, source: TableResource):
5959
html = "<html><body><table>\n"
60-
with source:
60+
# Use a copy of the source to avoid side effects (see #1622)
61+
with source.to_copy() as source:
6162
html += "<tr>"
6263
for name in source.schema.field_names:
6364
html += f"<td>{name}</td>"

frictionless/formats/pandas/parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ def write_row_stream(self, source: TableResource):
128128
data_rows: List[Tuple[Any]] = []
129129
index_rows: List[Tuple[Any]] = []
130130
fixed_types = {}
131-
with source:
131+
# Use a copy of the source to avoid side effects (see #1622)
132+
with source.to_copy() as source:
132133
for row in source.row_stream:
133134
data_values: List[Any] = []
134135
index_values: List[Any] = []

frictionless/formats/qsv/adapter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ def read_schema(self, resource: Resource) -> Schema:
2727
command = [self.qsv_path, "stats", "--infer-dates", "--dates-whitelist", "all"]
2828
process = sp.Popen(command, stdout=sp.PIPE, stdin=sp.PIPE)
2929
# TODO: Use FileResource here (or future resource.stream_bytes())
30-
with resource:
30+
# Use a copy of the resource to avoid side effects (see #1622)
31+
with resource.to_copy() as resource:
3132
while True:
3233
chunk = resource.read_bytes(size=BLOCK_SIZE)
3334
if not chunk:

frictionless/formats/spss/parser.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ def write_row_stream(self, source: TableResource):
9999

100100
# Write rows
101101
with sav.SavWriter(self.resource.normpath, ioUtf8=True, **spss_schema) as writer: # type: ignore
102-
with source:
102+
# Use a copy of the source to avoid side effects (see #1622)
103+
with source.to_copy() as source:
103104
for row in source.row_stream: # type: ignore
104105
cells: List[Any] = []
105106
for field in source.schema.fields: # type: ignore
@@ -130,7 +131,8 @@ def __write_convert_schema(self, source: TableResource):
130131
"varTypes": {},
131132
"formats": {},
132133
}
133-
with source:
134+
# Use a copy of the source to avoid side effects (see #1622)
135+
with source.to_copy() as source:
134136
# Add fields
135137
sizes: Dict[str, int] = {}
136138
mapping = self.__write_convert_type()

frictionless/formats/sql/adapter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ def write_package(self, package: Package):
109109
for table in self.metadata.sorted_tables:
110110
if package.has_table_resource(table.name):
111111
resource = package.get_table_resource(table.name)
112-
with resource:
112+
# Use a copy of the resource to avoid side effects (see #1622)
113+
with resource.to_copy() as resource:
113114
self.write_row_stream(resource.row_stream, table_name=table.name)
114115
return models.PublishResult(
115116
url=self.engine.url.render_as_string(hide_password=True),

frictionless/formats/sql/parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def write_row_stream(self, source: TableResource):
5151
adapter = SqlAdapter(engine, control=control)
5252
if not adapter:
5353
raise FrictionlessException(f"Not supported source: {self.resource.normpath}")
54-
with source:
54+
# Write from a copy to prevent side effects (see #1622)
55+
with source.to_copy() as source:
5556
adapter.write_schema(source.schema, table_name=control.table)
5657
adapter.write_row_stream(source.row_stream, table_name=control.table)

frictionless/indexer/indexer.py

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,24 @@ def __attrs_post_init__(self):
4545

4646
def index(self) -> Optional[Report]:
4747
self.prepare_resource()
48-
with self.resource:
49-
# Index is resouce-based operation not supporting FKs
50-
if self.resource.schema.foreign_keys:
51-
self.resource.schema.foreign_keys = []
52-
self.create_table()
53-
while True:
54-
try:
55-
return self.populate_table()
56-
except Exception:
57-
if self.fast and self.use_fallback:
58-
self.fast = False
59-
continue
60-
self.delete_table()
61-
raise
48+
49+
# Infer resource if needed
50+
if self.resource.closed:
51+
self.resource.infer()
52+
53+
# Index is resouce-based operation not supporting FKs
54+
if self.resource.schema.foreign_keys:
55+
self.resource.schema.foreign_keys = []
56+
self.create_table()
57+
while True:
58+
try:
59+
return self.populate_table()
60+
except Exception:
61+
if self.fast and self.use_fallback:
62+
self.fast = False
63+
continue
64+
self.delete_table()
65+
raise
6266

6367
def prepare_resource(self):
6468
if self.qsv_path:
@@ -108,25 +112,29 @@ def populate_table_fast_sqlite(self):
108112
sql_command = f".import '|cat -' \"{self.table_name}\""
109113
command = ["sqlite3", "-csv", self.adapter.engine.url.database, sql_command]
110114
process = subprocess.Popen(command, stdin=PIPE, stdout=PIPE)
111-
for line_number, line in enumerate(self.resource.byte_stream, start=1):
112-
if line_number > 1:
113-
process.stdin.write(line) # type: ignore
114-
self.report_progress(f"{self.resource.stats.bytes} bytes")
115+
# Iterate over a copy of the resouce to avoid side effects (see #1622)
116+
with self.resource.to_copy() as resource:
117+
for line_number, line in enumerate(resource.byte_stream, start=1):
118+
if line_number > 1:
119+
process.stdin.write(line) # type: ignore
120+
self.report_progress(f"{self.resource.stats.bytes} bytes")
115121
process.stdin.close() # type: ignore
116122
process.wait()
117123

118124
def populate_table_fast_postgresql(self):
119125
database_url = self.adapter.engine.url.render_as_string(hide_password=False)
120126
with platform.psycopg.connect(database_url) as connection:
121127
with connection.cursor() as cursor:
122-
query = 'COPY "%s" FROM STDIN CSV HEADER' % self.table_name
123-
with cursor.copy(query) as copy: # type: ignore
124-
while True:
125-
chunk = self.resource.read_bytes(size=settings.BLOCK_SIZE)
126-
if not chunk:
127-
break
128-
copy.write(chunk)
129-
self.report_progress(f"{self.resource.stats.bytes} bytes")
128+
# Iterate over a copy of the resouce to avoid side effects (see #1622)
129+
with self.resource.to_copy() as resource:
130+
query = 'COPY "%s" FROM STDIN CSV HEADER' % self.table_name
131+
with cursor.copy(query) as copy: # type: ignore
132+
while True:
133+
chunk = resource.read_bytes(size=settings.BLOCK_SIZE)
134+
if not chunk:
135+
break
136+
copy.write(chunk)
137+
self.report_progress(f"{self.resource.stats.bytes} bytes")
130138

131139
def delete_table(self):
132140
self.adapter.delete_resource(self.table_name)

frictionless/steps/table/table_debug.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ def transform_resource(self, resource: Resource):
3333

3434
# Data
3535
def data(): # type: ignore
36-
with current:
37-
for row in current.row_stream: # type: ignore
36+
# Use a copy of the source to avoid side effects (see #1622)
37+
with current.to_copy() as current_copy:
38+
for row in current_copy.row_stream: # type: ignore
3839
self.function(row) # type: ignore
3940
yield row
4041

0 commit comments

Comments
 (0)