@@ -1097,12 +1097,13 @@ def send_stream_data(
1097
1097
elif stream_config_file_path is not None :
1098
1098
stream_config = utils .read_yaml (stream_config_file_path )
1099
1099
1100
- stream_config ["label" ] = "production"
1100
+ stream_config_to_validate = dict (stream_config )
1101
+ stream_config_to_validate ["label" ] = "production"
1101
1102
1102
1103
# Validate stream of data
1103
1104
stream_validator = dataset_validators .get_validator (
1104
1105
task_type = task_type ,
1105
- dataset_config = stream_config ,
1106
+ dataset_config = stream_config_to_validate ,
1106
1107
dataset_config_file_path = stream_config_file_path ,
1107
1108
dataset_df = stream_df ,
1108
1109
)
@@ -1115,13 +1116,9 @@ def send_stream_data(
1115
1116
) from None
1116
1117
1117
1118
# Load dataset config and augment with defaults
1118
- stream_data = DatasetSchema ().load (
1119
- {"task_type" : task_type .value , ** stream_config }
1120
- )
1119
+ stream_data = dict (stream_config )
1121
1120
1122
1121
# Add default columns if not present
1123
- if stream_data .get ("columnNames" ) is None :
1124
- stream_data ["columnNames" ] = list (stream_df .columns )
1125
1122
columns_to_add = {"timestampColumnName" , "inferenceIdColumnName" }
1126
1123
for column in columns_to_add :
1127
1124
if stream_data .get (column ) is None :
@@ -1131,10 +1128,12 @@ def send_stream_data(
1131
1128
1132
1129
1133
1130
body = {
1134
- "datasetConfig " : stream_data ,
1135
- "dataset " : stream_df .to_dict (orient = "records" ),
1131
+ "config " : stream_data ,
1132
+ "rows " : stream_df .to_dict (orient = "records" ),
1136
1133
}
1137
-
1134
+
1135
+ print ("This is the body!" )
1136
+ print (body )
1138
1137
self .api .post_request (
1139
1138
endpoint = f"inference-pipelines/{ inference_pipeline_id } /data-stream" ,
1140
1139
body = body ,
@@ -1182,11 +1181,6 @@ def publish_batch_data(
1182
1181
"Make sure to fix all of the issues listed above before the upload." ,
1183
1182
) from None
1184
1183
1185
- # Load dataset config and augment with defaults
1186
- batch_data = DatasetSchema ().load (
1187
- {"task_type" : task_type .value , ** batch_config }
1188
- )
1189
-
1190
1184
# Add default columns if not present
1191
1185
if batch_data .get ("columnNames" ) is None :
1192
1186
batch_data ["columnNames" ] = list (batch_df .columns )
0 commit comments