|
| 1 | +<!-- title: How to detect anomalies in a Kafka topic with Flink SQL --> |
| 2 | +<!-- description: In this tutorial, learn how to detect anomalies in a Kafka topic with Flink SQL, with step-by-step instructions and supporting code. --> |
| 3 | + |
| 4 | +# How to detect anomalies in a Kafka topic with Flink SQL |
| 5 | + |
| 6 | +This tutorial demonstrates how to detect anomalies in a Kafka topic using Confluent Cloud for Apache Flink®. You will set up the necessary resources in Confluent Cloud and run built-in ARIMA model-based anomaly detection queries against example order data. |
| 7 | + |
| 8 | +## Prerequisites |
| 9 | + |
| 10 | +* A [Confluent Cloud](https://confluent.cloud/signup) account |
| 11 | +* The [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) installed on your machine |
| 12 | + |
| 13 | +## Create Confluent Cloud resources |
| 14 | + |
| 15 | +Login to your Confluent Cloud account: |
| 16 | + |
| 17 | +```shell |
| 18 | +confluent login --prompt --save |
| 19 | +``` |
| 20 | + |
| 21 | +Install a CLI plugin that streamlines resource creation in Confluent Cloud: |
| 22 | + |
| 23 | +```shell |
| 24 | +confluent plugin install confluent-quickstart |
| 25 | +``` |
| 26 | + |
| 27 | +Run the plugin from the top-level directory of the `tutorials` repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (`gcp` or `azure`) or region. You can find supported regions in a given cloud provider by running `confluent kafka region list --cloud <CLOUD>`. |
| 28 | + |
| 29 | +```shell |
| 30 | +confluent quickstart \ |
| 31 | + --environment-name anomaly-detection-env \ |
| 32 | + --kafka-cluster-name anomaly-detection-cluster \ |
| 33 | + --compute-pool-name anomaly-detection-pool |
| 34 | +``` |
| 35 | + |
| 36 | +The plugin should complete in under a minute. |
| 37 | + |
| 38 | +## Open Flink shell |
| 39 | + |
| 40 | +Start a Flink SQL shell: |
| 41 | + |
| 42 | +```shell |
| 43 | +confluent flink shell --compute-pool \ |
| 44 | + $(confluent flink compute-pool list -o json | jq -r ".[0].id") |
| 45 | +``` |
| 46 | + |
| 47 | +## Run anomaly detection queries |
| 48 | + |
| 49 | +Next, you’ll run a series of anomaly detection queries against [mock data streams](https://docs.confluent.io/cloud/current/flink/reference/example-data.html) in Confluent Cloud, specifically the [`orders` stream](https://docs.confluent.io/cloud/current/flink/reference/example-data.html#orders-table). |
| 50 | + |
| 51 | +Confluent Cloud for Apache Flink® provides a built-in anomaly detection function, [`ML_DETECT_ANOMALIES`](https://docs.confluent.io/cloud/current/flink/reference/functions/model-inference-functions.html#flink-sql-ml-anomaly-detect-function), which identifies outliers in a data stream based on an [ARIMA model](https://docs.confluent.io/cloud/current/ai/builtin-functions/detect-anomalies.html#arima-model). |
| 52 | + |
| 53 | +Anomaly detection operates over a defined window. For example, to find global outliers, specify an `OVER` window that includes previous rows: |
| 54 | + |
| 55 | +```sql |
| 56 | +OVER (ORDER BY $rowtime |
| 57 | + RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) |
| 58 | +``` |
| 59 | + |
| 60 | +The following query extracts the Boolean value that indicates whether a record is an anomaly, ignoring rows for which no prediction is made (because there isn’t enough data yet). Here, we set two [model hyperparameters](https://docs.confluent.io/cloud/current/ai/builtin-functions/detect-anomalies.html#anomaly-detection-parameters): |
| 61 | + - horizon = 1 (forecast one time period ahead) |
| 62 | + - confidencePercentage = 90.0 |
| 63 | + |
| 64 | +```sql |
| 65 | +SELECT |
| 66 | + customer_id, |
| 67 | + ts, |
| 68 | + price, |
| 69 | + anomaly_results[6] AS is_anomaly |
| 70 | +FROM ( |
| 71 | + SELECT |
| 72 | + customer_id, |
| 73 | + $rowtime as ts, |
| 74 | + price, |
| 75 | + ML_DETECT_ANOMALIES(price, $rowtime, JSON_OBJECT('horizon' VALUE 1, 'confidencePercentage' VALUE 90.0)) |
| 76 | + OVER (ORDER BY $rowtime |
| 77 | + RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly_results |
| 78 | + FROM `examples`.`marketplace`.`orders`) |
| 79 | +WHERE anomaly_results[6] IS NOT NULL; |
| 80 | +``` |
| 81 | + |
| 82 | +You’ll notice that some very large or small orders are flagged as anomalies, for example: |
| 83 | + |
| 84 | +```plaintext |
| 85 | +customer_id ts price is_anomaly |
| 86 | +3012 2025-11-03 10:19:34.558 17.37 FALSE |
| 87 | +3180 2025-11-03 10:19:34.598 10.43 TRUE |
| 88 | +3218 2025-11-03 10:19:34.558 27.97 FALSE |
| 89 | +3161 2025-11-03 10:19:34.559 63.72 FALSE |
| 90 | +3171 2025-11-03 10:19:34.578 69.95 FALSE |
| 91 | +3163 2025-11-03 10:19:34.598 79.18 FALSE |
| 92 | +3063 2025-11-03 10:19:34.618 40.93 FALSE |
| 93 | +3058 2025-11-03 10:19:34.638 99.69 TRUE |
| 94 | +``` |
| 95 | + |
| 96 | +Because anomaly detection works as an `OVER` aggregation query, you can define exogenous variables that establish different cohorts with unique definitions of "outlier." For example, a large order from a platinum member who frequently places large orders may be typical, whereas a large order from a new anonymous customer may be anomalous. |
| 97 | + |
| 98 | +The following query is similar to the previous one, but partitions the window by `customer_id` and lowers the `minTrainingSize` hyperparameter to 16 to get results sooner. This effectively defines anomalous behavior per customer: |
| 99 | + |
| 100 | +```sql |
| 101 | +SELECT |
| 102 | + customer_id, |
| 103 | + ts, |
| 104 | + price, |
| 105 | + anomaly_results[6] AS is_anomaly |
| 106 | +FROM ( |
| 107 | + SELECT |
| 108 | + customer_id, |
| 109 | + $rowtime as ts, |
| 110 | + price, |
| 111 | + ML_DETECT_ANOMALIES(price, $rowtime, JSON_OBJECT('horizon' VALUE 1, 'minTrainingSize' VALUE 16, 'confidencePercentage' VALUE 90.0)) |
| 112 | + OVER (PARTITION BY customer_id |
| 113 | + ORDER BY $rowtime |
| 114 | + RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly_results |
| 115 | + FROM `examples`.`marketplace`.`orders`) |
| 116 | +WHERE anomaly_results[6] IS NOT NULL; |
| 117 | +``` |
| 118 | + |
| 119 | +If you let the query run long enough, you’ll observe that some customers who usually place small orders will have large outlier orders. Others who typically place large orders will have small outlier orders. For example: |
| 120 | + |
| 121 | +```plaintext |
| 122 | +customer_id ts price is_anomaly |
| 123 | +... |
| 124 | +3020 2025-11-03 11:15:46.524 43.48 FALSE |
| 125 | +3020 2025-11-03 11:16:10.421 30.02 FALSE |
| 126 | +3020 2025-11-03 11:15:08.424 21.39 FALSE |
| 127 | +3020 2025-11-03 11:16:10.918 97.86 TRUE |
| 128 | +... |
| 129 | +3183 2025-11-03 11:10:08.829 70.91 FALSE |
| 130 | +3183 2025-11-03 11:12:11.514 10.53 TRUE |
| 131 | +3183 2025-11-03 11:13:21.092 91.22 FALSE |
| 132 | +3183 2025-11-03 11:16:09.783 87.10 FALSE |
| 133 | +... |
| 134 | +``` |
| 135 | + |
| 136 | +## Clean up |
| 137 | + |
| 138 | +When you are finished, delete the `anomaly-detection-env` environment by first getting the environment ID of the form `env-123456` corresponding to it: |
| 139 | + |
| 140 | +```shell |
| 141 | +confluent environment list |
| 142 | +``` |
| 143 | + |
| 144 | +Delete the environment, including all resources created for this tutorial: |
| 145 | + |
| 146 | +```shell |
| 147 | +confluent environment delete <ENVIRONMENT ID> |
| 148 | +``` |
0 commit comments