Skip to content

Commit 0dec286

Browse files
[data][llm][doc] Add in resiliency section and refine doc code (#60594)
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
1 parent ba39fe1 commit 0dec286

File tree

7 files changed

+259
-52
lines changed

7 files changed

+259
-52
lines changed

.vale/styles/config/vocabularies/Data/accept.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Modin
2222
[Mm]ultiget(s)?
2323
ndarray(s)?
2424
NLP
25+
[Oo]mni
2526
[Oo]utqueue(s)?
2627
PDFs
2728
PIL

doc/source/data/doc_code/working-with-llms/basic_llm_example.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
"""
55

66
# __basic_llm_example_start__
7+
import os
8+
import shutil
79
import ray
810
from ray.data.llm import vLLMEngineProcessorConfig, build_processor
911

@@ -125,6 +127,66 @@
125127
)
126128
# __s3_config_example_end__
127129

130+
base_dir = "/tmp/llm_checkpoint_demo"
131+
input_path = os.path.join(base_dir, "input")
132+
output_path = os.path.join(base_dir, "output")
133+
checkpoint_path = os.path.join(base_dir, "checkpoint")
134+
135+
# Reset directories
136+
for path in (input_path, output_path, checkpoint_path):
137+
shutil.rmtree(path, ignore_errors=True)
138+
os.makedirs(path)
139+
140+
# __row_level_fault_tolerance_config_example_start__
141+
# Row-level fault tolerance configuration
142+
config = vLLMEngineProcessorConfig(
143+
model_source="unsloth/Llama-3.1-8B-Instruct",
144+
concurrency=1,
145+
batch_size=64,
146+
should_continue_on_error=True,
147+
)
148+
# __row_level_fault_tolerance_config_example_end__
149+
150+
# __checkpoint_config_setup_example_start__
151+
from ray.data.checkpoint import CheckpointConfig
152+
153+
ctx = ray.data.DataContext.get_current()
154+
ctx.checkpoint_config = CheckpointConfig(
155+
id_column="id",
156+
checkpoint_path=checkpoint_path,
157+
delete_checkpoint_on_success=False,
158+
)
159+
# __checkpoint_config_setup_example_end__
160+
161+
# __checkpoint_usage_example_start__
162+
processor_config = vLLMEngineProcessorConfig(
163+
model_source="unsloth/Llama-3.1-8B-Instruct",
164+
concurrency=1,
165+
batch_size=16,
166+
)
167+
168+
processor = build_processor(
169+
processor_config,
170+
preprocess=lambda row: dict(
171+
id=row["id"], # Preserve the ID column for checkpointing
172+
prompt=row["prompt"],
173+
sampling_params=dict(
174+
temperature=0.3,
175+
max_tokens=10,
176+
),
177+
),
178+
postprocess=lambda row: {
179+
"id": row["id"], # Preserve the ID column for checkpointing
180+
"answer": row.get("generated_text"),
181+
},
182+
)
183+
184+
ds = ray.data.read_parquet(input_path)
185+
ds = processor(ds)
186+
ds.write_parquet(output_path)
187+
# __checkpoint_usage_example_end__
188+
189+
128190
# __gpu_memory_config_example_start__
129191
# GPU memory management configuration
130192
# If you encounter CUDA out of memory errors, try these optimizations:
@@ -199,8 +261,8 @@ def create_embedding_processor():
199261
),
200262
batch_size=8,
201263
concurrency=1,
202-
apply_chat_template=False,
203-
detokenize=False,
264+
chat_template_stage=False,
265+
detokenize_stage=False,
204266
)
205267

206268

doc/source/data/doc_code/working-with-llms/classification_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
),
3030
batch_size=8,
3131
concurrency=1,
32-
apply_chat_template=False,
33-
detokenize=False,
32+
chat_template_stage=False,
33+
detokenize_stage=False,
3434
)
3535

3636
classification_processor = build_processor(

doc/source/data/doc_code/working-with-llms/omni_audio_example.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@
5353
"enabled": True,
5454
"chat_template_content_format": "openai",
5555
},
56-
chat_template_stage={"enabled": True},
57-
tokenize_stage={"enabled": True},
58-
detokenize_stage={"enabled": True},
56+
chat_template_stage=True,
57+
tokenize_stage=True,
58+
detokenize_stage=True,
5959
)
6060
# __omni_audio_config_example_end__
6161

@@ -112,6 +112,7 @@ def audio_postprocess(row: dict) -> dict:
112112

113113

114114
def load_audio_dataset():
115+
# __omni_audio_load_dataset_example_start__
115116
"""
116117
Load audio dataset from MRSAudio Hugging Face dataset.
117118
"""
@@ -151,6 +152,7 @@ def load_audio_dataset():
151152
except Exception as e:
152153
print(f"Error loading dataset: {e}")
153154
return None
155+
# __omni_audio_load_dataset_example_end__
154156

155157

156158
def create_omni_audio_config():
@@ -169,13 +171,13 @@ def create_omni_audio_config():
169171
"enabled": True,
170172
"chat_template_content_format": "openai",
171173
},
172-
chat_template_stage={"enabled": True},
173-
tokenize_stage={"enabled": True},
174-
detokenize_stage={"enabled": True},
174+
chat_template_stage=True,
175+
tokenize_stage=True,
176+
detokenize_stage=True,
175177
)
176178

177-
178179
def run_omni_audio_example():
180+
# __omni_audio_run_example_start__
179181
"""Run the complete Omni audio example workflow."""
180182
config = create_omni_audio_config()
181183
audio_dataset = load_audio_dataset()
@@ -191,7 +193,7 @@ def run_omni_audio_example():
191193
print(f"Has multimodal support: {config.prepare_multimodal_stage.get('enabled', False)}")
192194
result = processor(audio_dataset).take_all()
193195
return config, processor, result
194-
# __omni_audio_run_example_end__
196+
# __omni_audio_run_example_end__
195197
return None, None, None
196198

197199

doc/source/data/doc_code/working-with-llms/vlm_image_example.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,27 +48,17 @@
4848
fs = HfFileSystem()
4949
vision_dataset = ray.data.read_parquet(path, filesystem=fs)
5050

51-
HF_TOKEN = "your-hf-token-here" # Replace with actual token if needed
52-
5351
# __vlm_config_example_start__
5452
vision_processor_config = vLLMEngineProcessorConfig(
5553
model_source="Qwen/Qwen2.5-VL-3B-Instruct",
5654
engine_kwargs=dict(
5755
tensor_parallel_size=1,
5856
pipeline_parallel_size=1,
5957
max_model_len=4096,
60-
enable_chunked_prefill=True,
61-
max_num_batched_tokens=2048,
6258
trust_remote_code=True,
6359
limit_mm_per_prompt={"image": 1},
6460
),
65-
runtime_env=dict(
66-
env_vars=dict(
67-
VLLM_USE_V1="1",
68-
),
69-
),
7061
batch_size=16,
71-
accelerator_type="L4",
7262
concurrency=1,
7363
prepare_multimodal_stage={"enabled": True},
7464
)
@@ -146,6 +136,7 @@ def vision_postprocess(row: dict) -> dict:
146136

147137

148138
def load_vision_dataset():
139+
# __vlm_image_load_dataset_example_start__
149140
"""
150141
Load vision dataset from Hugging Face.
151142
@@ -171,6 +162,7 @@ def load_vision_dataset():
171162
except Exception as e:
172163
print(f"Error loading dataset: {e}")
173164
return None
165+
# __vlm_image_load_dataset_example_end__
174166

175167

176168
def create_vlm_config():
@@ -185,13 +177,13 @@ def create_vlm_config():
185177
limit_mm_per_prompt={"image": 1},
186178
),
187179
batch_size=1,
188-
accelerator_type="L4",
189180
concurrency=1,
190181
prepare_multimodal_stage={"enabled": True},
191182
)
192183

193184

194185
def run_vlm_example():
186+
# __vlm_run_example_start__
195187
"""Run the complete VLM example workflow."""
196188
config = create_vlm_config()
197189
vision_dataset = load_vision_dataset()
@@ -207,7 +199,7 @@ def run_vlm_example():
207199
print(f"Has multimodal support: {config.prepare_multimodal_stage.get('enabled', False)}")
208200
result = processor(vision_dataset).take_all()
209201
return config, processor, result
210-
# __vlm_run_example_end__
202+
# __vlm_run_example_end__
211203
return None, None, None
212204

213205

doc/source/data/doc_code/working-with-llms/vlm_video_example.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@
5858
allowed_local_media_path="/tmp",
5959
),
6060
},
61-
chat_template_stage={"enabled": True},
62-
tokenize_stage={"enabled": True},
63-
detokenize_stage={"enabled": True},
61+
chat_template_stage=True,
62+
tokenize_stage=True,
63+
detokenize_stage=True,
6464
)
6565
# __vlm_video_config_example_end__
6666

@@ -123,6 +123,7 @@ def video_postprocess(row: dict) -> dict:
123123

124124

125125
def load_video_dataset():
126+
# __vlm_video_load_dataset_example_start__
126127
"""
127128
Load video dataset from ShareGPTVideo Hugging Face dataset.
128129
"""
@@ -166,7 +167,7 @@ def load_video_dataset():
166167
except Exception as e:
167168
print(f"Error loading dataset: {e}")
168169
return None
169-
170+
# __vlm_video_load_dataset_example_end__
170171

171172
def create_vlm_video_config():
172173
"""Create VLM video configuration."""
@@ -188,13 +189,14 @@ def create_vlm_video_config():
188189
allowed_local_media_path="/tmp",
189190
),
190191
},
191-
chat_template_stage={"enabled": True},
192-
tokenize_stage={"enabled": True},
193-
detokenize_stage={"enabled": True},
192+
chat_template_stage=True,
193+
tokenize_stage=True,
194+
detokenize_stage=True,
194195
)
195196

196197

197198
def run_vlm_video_example():
199+
# __vlm_video_run_example_start__
198200
"""Run the complete VLM video example workflow."""
199201
config = create_vlm_video_config()
200202
video_dataset = load_video_dataset()
@@ -210,7 +212,7 @@ def run_vlm_video_example():
210212
print(f"Has multimodal support: {config.prepare_multimodal_stage.get('enabled', False)}")
211213
result = processor(video_dataset).take_all()
212214
return config, processor, result
213-
# __vlm_video_run_example_end__
215+
# __vlm_video_run_example_end__
214216
return None, None, None
215217

216218

0 commit comments

Comments
 (0)