-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Open
Labels
bugSomething isn't workingSomething isn't workingneeds triageAwaiting prioritization by a maintainerAwaiting prioritization by a maintainerpythonRelated to Python PolarsRelated to Python Polars
Description
Checks
- I have checked that this issue has not already been reported.
- I have confirmed this bug exists on the latest version of Polars.
Reproducible example
# test.py
import json
import sys
import polars as pl
data = dict(a=3, b=2, c=4)
data_s = json.dumps(data)
print("small")
print(pl.from_dicts([dict(s=data_s)]).select(pl.col("s").str.json_decode()).schema)
data_s = r'{"type":"inbound-rtp","id":"IT01V58405073","timestamp":1720146433875283,"ssrc":58405073,"kind":"video","transportId":"T01","codecId":"CIT01_96","jitter":0.017,"packetsLost":132,"trackIdentifier":"mcs-vtrack","mid":"0","packetsReceived":14156623,"bytesReceived":15794472419,"headerBytesReceived":283132460,"lastPacketReceivedTimestamp":1720146433869.354,"jitterBufferDelay":80889.384095,"jitterBufferTargetDelay":56671.523104,"jitterBufferMinimumDelay":56671.523104,"jitterBufferEmittedCount":1510557,"framesReceived":1510557,"frameWidth":1280,"frameHeight":720,"framesPerSecond":30,"framesDecoded":1510557,"keyFramesDecoded":506,"framesDropped":703,"totalDecodeTime":4942.540196,"totalProcessingDelay":85878.07405699999,"totalAssemblyTime":26023.45523,"framesAssembledFromMultiplePackets":1510553,"totalInterFrameDelay":50404.184,"totalSquaredInterFrameDelay":4493.390938020631,"pauseCount":1,"totalPausesDuration":52.935,"freezeCount":49,"totalFreezesDuration":9.993,"decoderImplementation":"libvpx","firCount":0,"pliCount":13,"nackCount":165337,"qpSum":37686223,"powerEfficientDecoder":false,"minPlayoutDelay":0}'
print("large1")
print(pl.from_dicts([dict(s=data_s)]).select(pl.col("s").str.json_decode()).schema)
pl.from_dicts([dict(s=data_s)]).select(pl.col("s").str.json_decode()).write_parquet(
f"data{sys.argv[1]}.parquet"
)
if sys.argv[1] == "3":
dfs = [pl.read_parquet(f"data{i}.parquet") for i in [1, 2, 3]]
for df in dfs:
print(df.schema)
ddf = pl.concat(dfs)Run the above code with, you will see the schema of large json changes, and the pl.concat will fail.
python test.py 1; python test.py 2; python test.py 3
Log output
small
Schema({'s': Struct({'a': Int64, 'b': Int64, 'c': Int64})})
large1
Schema({'s': Struct({'kind': String, 'jitterBufferTargetDelay': Float64, 'totalInterFrameDelay': Float64, 'totalAssemblyTime': Float64, 'id': String, 'jitterBufferMinimumDelay': Float64, 'framesPerSecond': Int64, 'freezeCount': Int64, 'totalProcessingDelay': Float64, 'jitterBufferDelay': Float64, 'framesReceived': Int64, 'type': String, 'timestamp': Int64, 'pauseCount': Int64, 'framesDropped': Int64, 'powerEfficientDecoder': Boolean, 'packetsLost': Int64, 'firCount': Int64, 'minPlayoutDelay': Int64, 'totalSquaredInterFrameDelay': Float64, 'totalDecodeTime': Float64, 'lastPacketReceivedTimestamp': Float64, 'keyFramesDecoded': Int64, 'bytesReceived': Int64, 'jitter': Float64, 'trackIdentifier': String, 'totalPausesDuration': Float64, 'qpSum': Int64, 'packetsReceived': Int64, 'codecId': String, 'jitterBufferEmittedCount': Int64, 'headerBytesReceived': Int64, 'framesAssembledFromMultiplePackets': Int64, 'ssrc': Int64, 'frameHeight': Int64, 'pliCount': Int64, 'frameWidth': Int64, 'mid': String, 'totalFreezesDuration': Float64, 'decoderImplementation': String, 'framesDecoded': Int64, 'nackCount': Int64, 'transportId': String})})
small
Schema({'s': Struct({'a': Int64, 'b': Int64, 'c': Int64})})
large1
Schema({'s': Struct({'framesReceived': Int64, 'totalSquaredInterFrameDelay': Float64, 'type': String, 'pauseCount': Int64, 'freezeCount': Int64, 'qpSum': Int64, 'jitterBufferMinimumDelay': Float64, 'totalInterFrameDelay': Float64, 'jitterBufferEmittedCount': Int64, 'totalProcessingDelay': Float64, 'framesDecoded': Int64, 'kind': String, 'powerEfficientDecoder': Boolean, 'headerBytesReceived': Int64, 'firCount': Int64, 'id': String, 'ssrc': Int64, 'pliCount': Int64, 'bytesReceived': Int64, 'totalPausesDuration': Float64, 'decoderImplementation': String, 'packetsReceived': Int64, 'frameWidth': Int64, 'nackCount': Int64, 'totalFreezesDuration': Float64, 'lastPacketReceivedTimestamp': Float64, 'jitterBufferTargetDelay': Float64, 'minPlayoutDelay': Int64, 'transportId': String, 'mid': String, 'jitterBufferDelay': Float64, 'jitter': Float64, 'totalAssemblyTime': Float64, 'codecId': String, 'packetsLost': Int64, 'frameHeight': Int64, 'timestamp': Int64, 'trackIdentifier': String, 'framesDropped': Int64, 'framesPerSecond': Int64, 'keyFramesDecoded': Int64, 'totalDecodeTime': Float64, 'framesAssembledFromMultiplePackets': Int64})})
small
Schema({'s': Struct({'a': Int64, 'b': Int64, 'c': Int64})})
large1
Schema({'s': Struct({'jitter': Float64, 'pauseCount': Int64, 'pliCount': Int64, 'ssrc': Int64, 'firCount': Int64, 'powerEfficientDecoder': Boolean, 'framesAssembledFromMultiplePackets': Int64, 'totalAssemblyTime': Float64, 'transportId': String, 'kind': String, 'totalPausesDuration': Float64, 'jitterBufferEmittedCount': Int64, 'mid': String, 'framesPerSecond': Int64, 'framesDecoded': Int64, 'jitterBufferTargetDelay': Float64, 'headerBytesReceived': Int64, 'totalSquaredInterFrameDelay': Float64, 'framesReceived': Int64, 'frameWidth': Int64, 'id': String, 'frameHeight': Int64, 'packetsReceived': Int64, 'freezeCount': Int64, 'qpSum': Int64, 'timestamp': Int64, 'decoderImplementation': String, 'totalDecodeTime': Float64, 'packetsLost': Int64, 'totalProcessingDelay': Float64, 'codecId': String, 'bytesReceived': Int64, 'jitterBufferMinimumDelay': Float64, 'totalFreezesDuration': Float64, 'type': String, 'minPlayoutDelay': Int64, 'jitterBufferDelay': Float64, 'trackIdentifier': String, 'keyFramesDecoded': Int64, 'lastPacketReceivedTimestamp': Float64, 'framesDropped': Int64, 'totalInterFrameDelay': Float64, 'nackCount': Int64})})
Schema({'s': Struct({'kind': String, 'jitterBufferTargetDelay': Float64, 'totalInterFrameDelay': Float64, 'totalAssemblyTime': Float64, 'id': String, 'jitterBufferMinimumDelay': Float64, 'framesPerSecond': Int64, 'freezeCount': Int64, 'totalProcessingDelay': Float64, 'jitterBufferDelay': Float64, 'framesReceived': Int64, 'type': String, 'timestamp': Int64, 'pauseCount': Int64, 'framesDropped': Int64, 'powerEfficientDecoder': Boolean, 'packetsLost': Int64, 'firCount': Int64, 'minPlayoutDelay': Int64, 'totalSquaredInterFrameDelay': Float64, 'totalDecodeTime': Float64, 'lastPacketReceivedTimestamp': Float64, 'keyFramesDecoded': Int64, 'bytesReceived': Int64, 'jitter': Float64, 'trackIdentifier': String, 'totalPausesDuration': Float64, 'qpSum': Int64, 'packetsReceived': Int64, 'codecId': String, 'jitterBufferEmittedCount': Int64, 'headerBytesReceived': Int64, 'framesAssembledFromMultiplePackets': Int64, 'ssrc': Int64, 'frameHeight': Int64, 'pliCount': Int64, 'frameWidth': Int64, 'mid': String, 'totalFreezesDuration': Float64, 'decoderImplementation': String, 'framesDecoded': Int64, 'nackCount': Int64, 'transportId': String})})
Schema({'s': Struct({'framesReceived': Int64, 'totalSquaredInterFrameDelay': Float64, 'type': String, 'pauseCount': Int64, 'freezeCount': Int64, 'qpSum': Int64, 'jitterBufferMinimumDelay': Float64, 'totalInterFrameDelay': Float64, 'jitterBufferEmittedCount': Int64, 'totalProcessingDelay': Float64, 'framesDecoded': Int64, 'kind': String, 'powerEfficientDecoder': Boolean, 'headerBytesReceived': Int64, 'firCount': Int64, 'id': String, 'ssrc': Int64, 'pliCount': Int64, 'bytesReceived': Int64, 'totalPausesDuration': Float64, 'decoderImplementation': String, 'packetsReceived': Int64, 'frameWidth': Int64, 'nackCount': Int64, 'totalFreezesDuration': Float64, 'lastPacketReceivedTimestamp': Float64, 'jitterBufferTargetDelay': Float64, 'minPlayoutDelay': Int64, 'transportId': String, 'mid': String, 'jitterBufferDelay': Float64, 'jitter': Float64, 'totalAssemblyTime': Float64, 'codecId': String, 'packetsLost': Int64, 'frameHeight': Int64, 'timestamp': Int64, 'trackIdentifier': String, 'framesDropped': Int64, 'framesPerSecond': Int64, 'keyFramesDecoded': Int64, 'totalDecodeTime': Float64, 'framesAssembledFromMultiplePackets': Int64})})
Schema({'s': Struct({'jitter': Float64, 'pauseCount': Int64, 'pliCount': Int64, 'ssrc': Int64, 'firCount': Int64, 'powerEfficientDecoder': Boolean, 'framesAssembledFromMultiplePackets': Int64, 'totalAssemblyTime': Float64, 'transportId': String, 'kind': String, 'totalPausesDuration': Float64, 'jitterBufferEmittedCount': Int64, 'mid': String, 'framesPerSecond': Int64, 'framesDecoded': Int64, 'jitterBufferTargetDelay': Float64, 'headerBytesReceived': Int64, 'totalSquaredInterFrameDelay': Float64, 'framesReceived': Int64, 'frameWidth': Int64, 'id': String, 'frameHeight': Int64, 'packetsReceived': Int64, 'freezeCount': Int64, 'qpSum': Int64, 'timestamp': Int64, 'decoderImplementation': String, 'totalDecodeTime': Float64, 'packetsLost': Int64, 'totalProcessingDelay': Float64, 'codecId': String, 'bytesReceived': Int64, 'jitterBufferMinimumDelay': Float64, 'totalFreezesDuration': Float64, 'type': String, 'minPlayoutDelay': Int64, 'jitterBufferDelay': Float64, 'trackIdentifier': String, 'keyFramesDecoded': Int64, 'lastPacketReceivedTimestamp': Float64, 'framesDropped': Int64, 'totalInterFrameDelay': Float64, 'nackCount': Int64})})
Traceback (most recent call last):
File "/volume/workspace/test.py", line 24, in <module>
ddf = pl.concat(dfs)
^^^^^^^^^^^^^^
File "/root/micromamba/envs/lib/python3.12/site-packages/polars/functions/eager.py", line 182, in concat
out = wrap_df(plr.concat_df(elems))
^^^^^^^^^^^^^^^^^^^^
polars.exceptions.SchemaError: type Int64 is incompatible with expected type StringIssue description
The same data is write to parquet 3 times in different process, and the json_decode for large json string may randomized the order of fields, which result in different schemas.
When the 3 parquets are loaded and call pl.concat, it fails with SchemaError.
It seems like the schema check of pl.concat regards struct as an ordered dict?
Expected behavior
The expected behavior is not fail for pl.concat
There are two ways to achieve it:
- Regards
Structas an unordered dict in schema check. (I prefer this way, as it is more intuitive) - Preserve the order of fields in
json_decode, or provide an option to control it. (This may hurt the performance?)
Installed versions
Python 3.12.4 | packaged by conda-forge | (main, Jun 17 2024, 10:23:07) [GCC 12.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import polars
>>> polars.show_versions()
--------Version info---------
Polars: 1.0.0
Index type: UInt32
Platform: Linux-3.10.0-957.21.3.el7.x86_64-x86_64-with-glibc2.35
Python: 3.12.4 | packaged by conda-forge | (main, Jun 17 2024, 10:23:07) [GCC 12.3.0]
----Optional dependencies----
adbc_driver_manager: <not installed>
cloudpickle: <not installed>
connectorx: <not installed>
deltalake: <not installed>
fastexcel: <not installed>
fsspec: <not installed>
gevent: <not installed>
great_tables: <not installed>
hvplot: 0.10.0
matplotlib: 3.8.4
nest_asyncio: 1.6.0
numpy: 1.26.4
openpyxl: <not installed>
pandas: 2.2.2
pyarrow: 16.1.0
pydantic: <not installed>
pyiceberg: <not installed>
sqlalchemy: <not installed>
torch: <not installed>
xlsx2csv: <not installed>
xlsxwriter: <not installed>
>>>
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingneeds triageAwaiting prioritization by a maintainerAwaiting prioritization by a maintainerpythonRelated to Python PolarsRelated to Python Polars