Skip to content

Commit 0c4f648

Browse files
authored
Merge pull request #36 from softeerbootcamp-7th/feat/stage1-anomaly-detection
[REFACTOR] stage1 anomaly detection optimize
2 parents e448716 + ab62143 commit 0c4f648

File tree

4 files changed

+165
-159
lines changed

4 files changed

+165
-159
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
storage:
2+
input_base_path: "./data/raw-sensor-data"
3+
output_base_path: "./data/stage1_anomaly_detected"
4+
speed_bump_path: "./data/yeosu_provincial_road_speed_bump_data.csv"
5+
input_partition_format: "dt=YYYY-MM-DD"
6+
ttl_days: 14
7+
8+
spark:
9+
master: "local[*]"
10+
app_name: "Stage1-AnomalyDetection-Local"
11+
partition_size_mb: 128
12+
min_partition_size_mb: 128
13+
shuffle_partitions: 4
14+
coalesce_partitions: 4
15+
adaptive_enabled: true
16+
adaptive_coalesce_enabled: true
17+
skew_join_enabled: true
18+
target_file_size_mb: 256
19+
20+
context_filtering:
21+
velocity_threshold: 5.0
22+
hdop_threshold: 5.0
23+
min_satellites: 4
24+
25+
normalization:
26+
sensor_columns:
27+
- accel_z
28+
- gyro_y
29+
partition_by: trip_id
30+
31+
impact_score:
32+
weights:
33+
accel_z: 0.7
34+
gyro_y: 0.3
35+
threshold: 3.0
36+
37+
logging:
38+
level: INFO
39+
log_metrics: true
40+
log_sample_size: 5

processing_service/stage1/config_stage1.yaml

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
storage:
22
input_base_path: "s3a://softeer-7-de3-bucket/raw-sensor-data"
33
output_base_path: "s3a://softeer-7-de3-bucket/stage1_anomaly_detected"
4+
speed_bump_path : "s3a://softeer-7-de3-bucket/speed-bump/yeosu_provincial_road_speed_bump_data.csv"
45
input_partition_format: "dt=YYYY-MM-DD"
56
ttl_days: 14
67

78
spark:
8-
master: null
9-
app_name: "Stage1-AnomalyDetection-Prod"
9+
master: null # spark-submit 클러스터 사용
10+
app_name: "Stage1-AnomalyDetection-prod"
1011
partition_size_mb: 128
1112
min_partition_size_mb: 128
12-
shuffle_partitions: "auto"
13-
coalesce_partitions: 16
13+
shuffle_partitions: 16
14+
coalesce_partitions: 4
1415
adaptive_enabled: true
1516
adaptive_coalesce_enabled: true
1617
skew_join_enabled: true
@@ -33,15 +34,6 @@ impact_score:
3334
gyro_y: 0.3
3435
threshold: 3.0
3536

36-
stage2:
37-
enable_salting: true
38-
salting_buckets: 10
39-
road_network_format: parquet
40-
41-
rdb:
42-
url: ""
43-
pothole_segments_table: pothole_segments
44-
4537
logging:
4638
level: INFO
4739
log_metrics: true
Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,3 @@
1-
"""
2-
Stage 1: 설정 및 Spark 연결 관리 모듈
3-
4-
이 모듈은 Stage 1 Anomaly Detection 파이프라인의 초기화를 담당합니다.
5-
YAML 설정 파일 로드 및 Spark 세션 생성, Adaptive Query Execution 최적화를 제공합니다.
6-
7-
Module Functions:
8-
load_config: YAML 설정 파일을 로컬 경로 또는 S3에서 로드하고 파싱
9-
get_spark_session: Spark 설정을 기반으로 SparkSession 인스턴스 생성
10-
11-
Example:
12-
>>> config = load_config("config_prod.yaml")
13-
>>> spark = get_spark_session(config)
14-
>>> input_df = spark.read.parquet("s3a://bucket/data")
15-
16-
Supported Config Formats:
17-
- Local: /path/to/config.yaml
18-
- S3: s3a://bucket-name/path/to/config.yaml
19-
20-
Error Handling:
21-
- 설정 파일 로드 실패 시 경고 로그 기록 후 빈 dict 반환
22-
- KeyError는 상위 호출자가 처리 (pipeline 유효성 검증)
23-
"""
24-
251
import os
262
import yaml
273
import logging
@@ -32,9 +8,6 @@
328

339

3410
def load_config(config_path: str) -> Dict[str, Any]:
35-
"""
36-
YAML 설정 파일 로드 (로컬 또는 s3a://)
37-
"""
3811
path = config_path.strip()
3912
try:
4013
if path.startswith("s3a://"):
@@ -56,9 +29,7 @@ def load_config(config_path: str) -> Dict[str, Any]:
5629

5730

5831
def get_spark_session(config: Dict[str, Any]) -> SparkSession:
59-
"""
60-
config의 spark 설정으로 SparkSession 생성
61-
"""
32+
#config의 spark 설정으로 SparkSession 생성
6233
spark_cfg = config.get("spark", {})
6334
app_name = spark_cfg.get("app_name", "Stage1-AnomalyDetection")
6435
master = spark_cfg.get("master")
@@ -69,19 +40,27 @@ def get_spark_session(config: Dict[str, Any]) -> SparkSession:
6940
logger.info("Spark Master: %s", master)
7041
else:
7142
logger.info("Spark Master: 미설정 (클러스터 모드)")
72-
7343
builder = (
7444
builder
75-
.config("spark.sql.adaptive.enabled", spark_cfg.get("adaptive_enabled", True))
76-
.config("spark.sql.adaptive.coalescePartitions.enabled", spark_cfg.get("adaptive_coalesce_enabled", True))
77-
.config("spark.sql.adaptive.skewJoin.enabled", spark_cfg.get("skew_join_enabled", True))
78-
.config("spark.sql.files.openCostInBytes", f"{spark_cfg.get('min_partition_size_mb', 128) * 1024 * 1024}")
45+
.config("spark.sql.adaptive.enabled", spark_cfg.get("adaptive_enabled", False))
46+
.config("spark.sql.adaptive.coalescePartitions.enabled", spark_cfg.get("adaptive_coalesce_enabled", False))
47+
.config("spark.sql.adaptive.skewJoin.enabled", spark_cfg.get("skew_join_enabled", False))
7948
.config("spark.sql.files.maxPartitionBytes", f"{spark_cfg.get('partition_size_mb', 128) * 1024 * 1024}")
8049
)
8150
shuffle = spark_cfg.get("shuffle_partitions")
8251
if shuffle is not None and shuffle != "auto":
8352
builder = builder.config("spark.sql.shuffle.partitions", shuffle)
8453

8554
spark = builder.getOrCreate()
55+
56+
conf = spark.sparkContext.getConf()
57+
logger.info("executor-memory : %s", conf.get("spark.executor.memory", "미설정"))
58+
logger.info("executor-cores : %s", conf.get("spark.executor.cores", "미설정"))
59+
logger.info("num-executors : %s", conf.get("spark.executor.instances", "미설정"))
60+
logger.info("driver-memory : %s", conf.get("spark.driver.memory", "미설정"))
61+
logger.info("shuffle.partitions : %s", conf.get("spark.sql.shuffle.partitions", "미설정"))
62+
logger.info("adaptive.enabled : %s", conf.get("spark.sql.adaptive.enabled", "미설정"))
63+
logger.info("maxPartitionBytes : %s", conf.get("spark.sql.files.maxPartitionBytes", "미설정"))
64+
8665
logger.info("SparkSession 생성 완료")
8766
return spark

0 commit comments

Comments
 (0)