diff --git a/skew-detector/README.md b/skew-detector/README.md new file mode 100644 index 000000000000..28df2c2a24e0 --- /dev/null +++ b/skew-detector/README.md @@ -0,0 +1,184 @@ +# Spark Scheduling Skew Demo + +This project demonstrates a **driver-side scheduling skew detector** for Apache Spark, implemented as a custom `SparkListener`. +It shows how Spark can be extended to automatically detect **load imbalance / data skew** and emit a **machine-readable event** suitable for automated remediation. + +The demo includes: + +- A Spark job that deliberately creates a skewed workload. +- A `SparkListener` that detects skew in task execution times. +- A simple external “self-healing” script that reacts to skew events. + +--- + +## 1. Motivation + +Scheduling-related performance issues such as **data skew** and **load imbalance** remain difficult to diagnose automatically in open-source systems like Spark. + +Current limitations: + +- detection of skew requires **manual inspection** of the Spark UI and logs; +- Spark does not emit a clear **real-time signal** that: + *“this stage is suffering from scheduling-related performance problems”*; +- automated remediation tools cannot act without such a signal. + +This project implements a small but practical step towards **automated remediation**: + +> A lightweight driver-side skew detector that emits a structured event whenever scheduling-related issues occur. + +--- + +## 2. How it works + +### 2.1. SkewDetectionListener (core component) + +The listener: + +1. Subscribes to Spark events: + - `SparkListenerTaskEnd` + - `SparkListenerStageCompleted` +2. Collects per-task execution durations. +3. On stage completion: + - computes: + - mean task duration, + - variance of task durations; + - if: + **variance > mean × threshold** + → considers the stage skewed. +4. Emits: + - a human-readable warning, and + - a machine-readable event: + + ``` + SCHEDULING_SKEW_DETECTED stageId=... tasks=... meanMs=... variance=... factor=... + ``` + +This event can be consumed by external controllers (Kubernetes operators, alerting systems, etc.) + +--- + +### 2.2. SkewDemoJob (Spark job with real skew) + +The demo job creates a highly skewed dataset: + +- key `"HOT_KEY"` appears **1,000,000** times, +- keys `"a"`, `"b"`, `"c"`, `"d"`, `"e"` appear once. + +A `reduceByKey` operation produces: + +- one extremely heavy task, +- many trivial tasks. + +This reliably triggers **task duration skew**. + +--- + +### 2.3. watch_skew.sh (self-healing demo) + +This small script: + +- reads Spark logs from `stdin`, +- prints them unchanged, +- and whenever it sees `SCHEDULING_SKEW_DETECTED`, + prints an additional message: + +[SELF-HEALING DEMO] Detected scheduling skew event! +Here we could automatically trigger remediation logic. + + +This simulates how an external self-healing system could react. + +--- + +## 3. Project structure +```yml +spark-scheduling-skew-demo/ + pom.xml + src/ + main/ + java/ + org/example/spark/skew/ + SkewDetectionListener.java + SkewDemoJob.java + watch_skew.sh +``` +--- + +## 4. Build instructions + +Requirements: + +- Java 11+ +- Maven +- Apache Spark 3.x (e.g. spark-3.5.x-bin-hadoop3) + +Build: + +```bash +mvn clean package +``` +## 5. Run the demo +5.1 Basic run +```bash +/path/to/spark/bin/spark-submit \ + --class org.example.spark.skew.SkewDemoJob \ + --master "local[*]" \ + --conf spark.extraListeners=org.example.spark.skew.SkewDetectionListener \ + target/spark-scheduling-skew-demo-1.0-SNAPSHOT.jar +``` +Expected output includes: +aggregation results: +```puthon-repl +HOT_KEY -> 1000000 +a -> 1 +... +``` +skew warning: +```yaml +WARN Scheduling skew detected in stage 1: ... +``` +machine-readable event: +```yaml +SCHEDULING_SKEW_DETECTED stageId=1 tasks=100 ... +``` +5.2 Run with self-healing demo +Make script executable: +```bash +chmod +x watch_skew.sh +``` +Run: +```bash +/path/to/spark/bin/spark-submit \ + --class org.example.spark.skew.SkewDemoJob \ + --master "local[*]" \ + --conf spark.extraListeners=org.example.spark.skew.SkewDetectionListener \ + target/spark-scheduling-skew-demo-1.0-SNAPSHOT.jar \ + 2>&1 | ./watch_skew.sh +``` +You will see the self-healing message: +```python-repl +>>> [SELF-HEALING DEMO] Detected scheduling skew event! +>>> Here we could automatically trigger remediation logic. +``` +## 6. Why this matters (link to automated remediation) +This contribution addresses a known challenge in open-source distributed systems: +>Spark does not provide real-time, machine-readable detection of ?scheduling-related performance issues. + +This project fills that gap: +- automatic detection of skew → no manual UI inspection; +- structured events → easy integration with monitoring / operators; +- safe, driver-side logic → no changes to the scheduler needed. + +It provides the missing building block needed to implement: +- dynamic repartitioning, +- auto-tuning of Spark configs, +- intelligent speculative execution, +- job resubmission policies, +- Kubernetes operators for self-healing pipelines. + +## 7. Future improvements +Export skew data via Spark’s MetricsSystem. +Add Prometheus metrics for integration with alerting systems. +Use advanced statistical methods for skew detection. +Implement a real Kubernetes operator that reacts to skew events. +Integrate with adaptive execution (AQE) studies. diff --git a/skew-detector/pom.xml b/skew-detector/pom.xml new file mode 100644 index 000000000000..42f9ca13977a --- /dev/null +++ b/skew-detector/pom.xml @@ -0,0 +1,67 @@ + + + 4.0.0 + + org.example + spark-scheduling-skew-demo + 1.0-SNAPSHOT + + + 11 + 11 + 3.5.0 + 2.12 + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + org.slf4j + slf4j-api + 1.7.36 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + ${maven.compiler.source} + ${maven.compiler.target} + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + + org.example.spark.skew.SkewDemoJob + + + + + + + diff --git a/skew-detector/src/main/java/org/example/spark/skew/SkewDemoJob.java b/skew-detector/src/main/java/org/example/spark/skew/SkewDemoJob.java new file mode 100644 index 000000000000..93c81a50e750 --- /dev/null +++ b/skew-detector/src/main/java/org/example/spark/skew/SkewDemoJob.java @@ -0,0 +1,53 @@ +package org.example.spark.skew; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; + +/** + * A demo Spark job that specifically creates a data skew. + * + * - 1_000_000 times the "HOT_KEY" key + * - keys "a", "b", "c", "d", "e" are used once each + * + * reduceByKey will create a skew in task execution times. + */ +public class SkewDemoJob { + + public static void main(String[] args) { + SparkSession spark = SparkSession.builder() + .appName("SkewDemoJob") + .getOrCreate(); + + JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + List data = new ArrayList<>(); + + for (int i = 0; i < 1_000_000; i++) { + data.add("HOT_KEY"); + } + // Several "cold" keys + data.add("a"); + data.add("b"); + data.add("c"); + data.add("d"); + data.add("e"); + + // 100 partitions to have potential for skew + JavaRDD rdd = sc.parallelize(data, 100); + + JavaPairRDD pairs = rdd.mapToPair(key -> new Tuple2<>(key, 1)); + + JavaPairRDD reduced = pairs.reduceByKey(Integer::sum); + + // Just to make the job do something + reduced.collect().forEach(t -> System.out.println(t._1 + " -> " + t._2)); + + spark.stop(); + } +} diff --git a/skew-detector/src/main/java/org/example/spark/skew/SkewDetectionListener.java b/skew-detector/src/main/java/org/example/spark/skew/SkewDetectionListener.java new file mode 100644 index 000000000000..8506208d5ceb --- /dev/null +++ b/skew-detector/src/main/java/org/example/spark/skew/SkewDetectionListener.java @@ -0,0 +1,105 @@ +package org.example.spark.skew; + +import org.apache.spark.scheduler.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Driver-side scheduling skew detector for Spark. + * + * Idea: + * - Collect task durations by stageId + * - Calculate mean and variance at stage completion + * - If variance is significantly greater than mean, we assume there is skew + * - Log a warning + structured event (for a "self-healing" agent) + */ +public class SkewDetectionListener extends SparkListener { + + private static final Logger LOG = LoggerFactory.getLogger(SkewDetectionListener.class); + + // stageId -> durations (ms) + private final Map> stageDurations = new HashMap<>(); + + private final double varianceFactorThreshold; + private final int minTasksForAnalysis; + + public SkewDetectionListener() { + this(1.0, 3); // default: variance > 1 * mean, min 3 tasks + } + + public SkewDetectionListener(double varianceFactorThreshold, int minTasksForAnalysis) { + this.varianceFactorThreshold = varianceFactorThreshold; + this.minTasksForAnalysis = minTasksForAnalysis; + } + + @Override + public void onTaskEnd(SparkListenerTaskEnd taskEnd) { + int stageId = taskEnd.stageId(); + long duration = taskEnd.taskInfo().duration(); + + stageDurations + .computeIfAbsent(stageId, k -> new ArrayList<>()) + .add(duration); + } + + @Override + public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { + int stageId = stageCompleted.stageInfo().stageId(); + + List durations = stageDurations.get(stageId); + if (durations == null || durations.size() < minTasksForAnalysis) { + return; + } + + Stats stats = computeStats(durations); + + boolean skewSuspected = stats.variance > stats.mean * varianceFactorThreshold; + + if (skewSuspected) { + // 1) Human-readable log + LOG.warn( + "Scheduling skew detected in stage {}: tasks={}, meanDurationMs={}, variance={}, thresholdFactor={}", + stageId, durations.size(), stats.mean, stats.variance, varianceFactorThreshold + ); + + // 2) Machine-readable log + LOG.warn("SCHEDULING_SKEW_DETECTED " + + "stageId={} tasks={} meanMs={} variance={} factor={}", + stageId, durations.size(), stats.mean, stats.variance, varianceFactorThreshold + ); + } else { + LOG.info( + "Stage {} completed without significant skew: tasks={}, meanDurationMs={}, variance={}", + stageId, durations.size(), stats.mean, stats.variance + ); + } + + stageDurations.remove(stageId); + } + + private Stats computeStats(List values) { + int n = values.size(); + double mean = values.stream().mapToLong(v -> v).average().orElse(0.0); + + double variance = 0.0; + for (Long v : values) { + double diff = v - mean; + variance += diff * diff; + } + variance = variance / n; + + return new Stats(mean, variance); + } + + private static class Stats { + final double mean; + final double variance; + + Stats(double mean, double variance) { + this.mean = mean; + this.variance = variance; + } + } +} diff --git a/skew-detector/watch_skew.sh b/skew-detector/watch_skew.sh new file mode 100755 index 000000000000..4e79d5ce48db --- /dev/null +++ b/skew-detector/watch_skew.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +# A demo "controller" that listens to Spark logs +# and responds to the SCHEDULING_SKEW_DETECTED event. + +while read line; do + if echo "$line" | grep -q "SCHEDULING_SKEW_DETECTED"; then + echo + echo ">>> [SELF-HEALING DEMO] Detected scheduling skew event!" + echo ">>> Here we could automatically trigger remediation logic (e.g. resubmit job with different settings)." + echo + fi +done