Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
460 changes: 297 additions & 163 deletions popframe/method/agglomeration.py

Large diffs are not rendered by default.

198 changes: 98 additions & 100 deletions popframe/method/anchor_settlement.py

Large diffs are not rendered by default.

161 changes: 66 additions & 95 deletions popframe/method/city_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pandas as pd
import json


class CityPopulationScorer:
"""
Calculates population-based scores for hexagonal grid cells based on municipal data.
Expand All @@ -24,11 +25,9 @@ class CityPopulationScorer:
2: "Территория отличается относительно низкой численностью и плотностью населения, что ограничивает возможности развитие инфраструктуры и экономики.",
3: "Территория имеет средние показатели численности и плотности населения, что создаёт возможность развитие инфраструктуры и экономики.",
4: "Территория имеет высокие показатели численности и плотности населения, что способствует развитие инфраструктуры и экономики.",
5: "Территория имеет очень высокими показателями численности и плотности, что указывает на высокий потенциал развития инфраструктуры и экономики."
5: "Территория имеет очень высокими показателями численности и плотности, что указывает на высокий потенциал развития инфраструктуры и экономики.",
}



def __init__(self, gdf_mo: gpd.GeoDataFrame, gdf_hex: gpd.GeoDataFrame, target_crs: int = 3857):
"""
Initializes the HexPopulationScorer.
Expand Down Expand Up @@ -58,9 +57,9 @@ def compute_mo_density(self):
self.gdf_mo: Adds columns 'area_m2', 'area_km2', and 'density_mo'.
"""
self.gdf_mo = self.gdf_mo.to_crs(epsg=self.target_crs)
self.gdf_mo['area_m2'] = self.gdf_mo.geometry.area
self.gdf_mo['area_km2'] = self.gdf_mo['area_m2'] / 1_000_000
self.gdf_mo['density_mo'] = self.gdf_mo['population'] / self.gdf_mo['area_km2']
self.gdf_mo["area_m2"] = self.gdf_mo.geometry.area
self.gdf_mo["area_km2"] = self.gdf_mo["area_m2"] / 1_000_000
self.gdf_mo["density_mo"] = self.gdf_mo["population"] / self.gdf_mo["area_km2"]

def assign_hex_to_mo(self):
"""
Expand All @@ -77,84 +76,61 @@ def assign_hex_to_mo(self):
"""
self.gdf_hex = self.gdf_hex.to_crs(epsg=self.target_crs)

mo_small = self.gdf_mo[['geometry', 'territory_id', 'population', 'density_mo']]
mo_small = self.gdf_mo[["geometry", "territory_id", "population", "density_mo"]]

hex_mo = gpd.overlay(
self.gdf_hex[['geometry', 'hexagon_id']],
mo_small,
how='intersection'
)
hex_mo['inter_area'] = hex_mo.geometry.area
hex_mo = gpd.overlay(self.gdf_hex[["geometry", "hexagon_id"]], mo_small, how="intersection")
hex_mo["inter_area"] = hex_mo.geometry.area

idx = hex_mo.groupby('hexagon_id')['inter_area'].idxmax()
hex_max = hex_mo.loc[idx, ['hexagon_id', 'territory_id', 'population', 'density_mo']]
idx = hex_mo.groupby("hexagon_id")["inter_area"].idxmax()
hex_max = hex_mo.loc[idx, ["hexagon_id", "territory_id", "population", "density_mo"]]

self.gdf_hex = self.gdf_hex.merge(hex_max, on='hexagon_id', how='left')
self.gdf_hex = self.gdf_hex.rename(columns={'density_mo': 'density'})
self.gdf_hex = self.gdf_hex.merge(hex_max, on="hexagon_id", how="left")
self.gdf_hex = self.gdf_hex.rename(columns={"density_mo": "density"})

def normalize_and_score(self):
"""
Нормализует population и density, считает combined_norm,
присваивает score = 1–5 для ячеек с данными и 0 для ячеек без данных.
"""
# 1. Нормируем population
pop_min, pop_max = self.gdf_hex['population'].min(), self.gdf_hex['population'].max()
if pop_max == pop_min:
self.gdf_hex['norm_pop'] = 0.5
else:
self.gdf_hex['norm_pop'] = (
self.gdf_hex['population'] - pop_min
) / (pop_max - pop_min)

# 2. Нормируем density
dens_min, dens_max = self.gdf_hex['density'].min(), self.gdf_hex['density'].max()
if dens_max == dens_min:
self.gdf_hex['norm_dens'] = 0.5
else:
self.gdf_hex['norm_dens'] = (
self.gdf_hex['density'] - dens_min
) / (dens_max - dens_min)

# Заменяем NaN (в том числе от ячеек без данных) на 0
self.gdf_hex['norm_pop'] = self.gdf_hex['norm_pop'].fillna(0)
self.gdf_hex['norm_dens'] = self.gdf_hex['norm_dens'].fillna(0)

# 3. Вычисляем сырое значение
self.gdf_hex['combined_raw'] = (
self.gdf_hex['norm_pop'] + self.gdf_hex['norm_dens']
) / 2

# 4. Мин–макс нормируем combined_raw
raw_min, raw_max = (
self.gdf_hex['combined_raw'].min(),
self.gdf_hex['combined_raw'].max()
)
if raw_max == raw_min:
self.gdf_hex['combined_norm'] = 0.5
else:
self.gdf_hex['combined_norm'] = (
self.gdf_hex['combined_raw'] - raw_min
) / (raw_max - raw_min)

self.gdf_hex['combined_norm'] = self.gdf_hex['combined_norm'].fillna(0)

# 5. Присваиваем оценку:
# – для ячеек с данными: масштабируем в [1;5]
# – для ячеек без данных (вода): оставляем 0
# 5.1 Сначала проставляем всем 1–5
self.gdf_hex['score'] = (
(self.gdf_hex['combined_norm'] * 4 + 1)
.round()
.clip(lower=1, upper=5)
.astype(int)
)

# 5.2 Выставляем 0 там, где нет ни population, ни density
mask_no_data = (
self.gdf_hex['population'].isna()
& self.gdf_hex['density'].isna()
)
self.gdf_hex.loc[mask_no_data, 'score'] = 0
"""
Нормализует population и density, считает combined_norm,
присваивает score = 1–5 для ячеек с данными и 0 для ячеек без данных.
"""
# 1. Нормируем population
pop_min, pop_max = self.gdf_hex["population"].min(), self.gdf_hex["population"].max()
if pop_max == pop_min:
self.gdf_hex["norm_pop"] = 0.5
else:
self.gdf_hex["norm_pop"] = (self.gdf_hex["population"] - pop_min) / (pop_max - pop_min)

# 2. Нормируем density
dens_min, dens_max = self.gdf_hex["density"].min(), self.gdf_hex["density"].max()
if dens_max == dens_min:
self.gdf_hex["norm_dens"] = 0.5
else:
self.gdf_hex["norm_dens"] = (self.gdf_hex["density"] - dens_min) / (dens_max - dens_min)

# Заменяем NaN (в том числе от ячеек без данных) на 0
self.gdf_hex["norm_pop"] = self.gdf_hex["norm_pop"].fillna(0)
self.gdf_hex["norm_dens"] = self.gdf_hex["norm_dens"].fillna(0)

# 3. Вычисляем сырое значение
self.gdf_hex["combined_raw"] = (self.gdf_hex["norm_pop"] + self.gdf_hex["norm_dens"]) / 2

# 4. Мин–макс нормируем combined_raw
raw_min, raw_max = (self.gdf_hex["combined_raw"].min(), self.gdf_hex["combined_raw"].max())
if raw_max == raw_min:
self.gdf_hex["combined_norm"] = 0.5
else:
self.gdf_hex["combined_norm"] = (self.gdf_hex["combined_raw"] - raw_min) / (raw_max - raw_min)

self.gdf_hex["combined_norm"] = self.gdf_hex["combined_norm"].fillna(0)

# 5. Присваиваем оценку:
# – для ячеек с данными: масштабируем в [1;5]
# – для ячеек без данных (вода): оставляем 0
# 5.1 Сначала проставляем всем 1–5
self.gdf_hex["score"] = (self.gdf_hex["combined_norm"] * 4 + 1).round().clip(lower=1, upper=5).astype(int)

# 5.2 Выставляем 0 там, где нет ни population, ни density
mask_no_data = self.gdf_hex["population"].isna() & self.gdf_hex["density"].isna()
self.gdf_hex.loc[mask_no_data, "score"] = 0

def assign_interpretations(self):
"""
Expand All @@ -167,7 +143,7 @@ def assign_interpretations(self):
Modifies:
self.gdf_hex: Adds column 'interpretation'.
"""
self.gdf_hex['interpretation'] = self.gdf_hex['score'].apply(
self.gdf_hex["interpretation"] = self.gdf_hex["score"].apply(
lambda v: CityPopulationScorer.INTERPRETATIONS[int(v)] if pd.notna(v) else None
)

Expand All @@ -188,20 +164,16 @@ def generate_output(self) -> list:
"""
output_list = []
for _, row in self.gdf_hex.iterrows():
output_list.append({
'hexagon_id': row['hexagon_id'],
'project': None,
'average_population_density': (
round(row['density'], 1) if pd.notna(row['density']) else None
),
'total_population': (
int(row['population']) if pd.notna(row['population']) else None
),
'score': (
float(row['score']) if pd.notna(row['score']) else None
),
'interpretation': row['interpretation']
})
output_list.append(
{
"hexagon_id": row["hexagon_id"],
"project": None,
"average_population_density": (round(row["density"], 1) if pd.notna(row["density"]) else None),
"total_population": (int(row["population"]) if pd.notna(row["population"]) else None),
"score": (float(row["score"]) if pd.notna(row["score"]) else None),
"interpretation": row["interpretation"],
}
)
self.output = output_list
return output_list

Expand All @@ -224,4 +196,3 @@ def run(self) -> list:
self.normalize_and_score()
self.assign_interpretations()
return self.generate_output()

33 changes: 17 additions & 16 deletions popframe/method/engineer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Dict, Any, Set
from popframe.utils.const import RADIUS_NPP_M, RADIUS_HPP_M, RADIUS_DEFAULT_INFRA_M


class InfrastructureAnalyzer:
def __init__(self, infrastructure_gdf: gpd.GeoDataFrame, assessment_areas_gdf: gpd.GeoDataFrame) -> None:
"""
Expand All @@ -18,12 +19,12 @@ def __init__(self, infrastructure_gdf: gpd.GeoDataFrame, assessment_areas_gdf: g
# Convert coordinate systems to metric (EPSG:3857) for calculations
self.infrastructure_gdf = infrastructure_gdf.to_crs(epsg=3857)
self.assessment_areas_gdf = assessment_areas_gdf.to_crs(epsg=3857)

# Initialize columns and start analysis
self.assessment_areas_gdf['score'] = 0
self.assessment_areas_gdf['types_in_radius'] = None
self.assessment_areas_gdf["score"] = 0
self.assessment_areas_gdf["types_in_radius"] = None
self._analyze_infrastructure()

@staticmethod
def get_radius(physical_object_type: Dict[str, Any]) -> float:
"""
Expand All @@ -39,13 +40,13 @@ def get_radius(physical_object_type: Dict[str, Any]) -> float:
float
The radius in meters for the given object type.
"""
name = physical_object_type.get('name', '')
name = physical_object_type.get("name", "")
if "Атомная электростанция" in name:
return float(RADIUS_NPP_M)
if "Гидроэлектростанция" in name:
return float(RADIUS_HPP_M)
return float(RADIUS_DEFAULT_INFRA_M)

def _analyze_infrastructure(self) -> None:
"""
Analyzes infrastructure for each territory and adds assessment attributes.
Expand All @@ -58,24 +59,24 @@ def _analyze_infrastructure(self) -> None:
for index, area in self.assessment_areas_gdf.iterrows():
# Create a temporary set of unique types for this assessment area
unique_types_in_radius: Set[str] = set()

# Check each object in infrastructure_gdf for inclusion in the buffer
for _, obj in self.infrastructure_gdf.iterrows():
# Extract `physical_object_type` dictionary to determine the radius
physical_object_info: Dict[str, Any] = obj['physical_object_type']
physical_object_info: Dict[str, Any] = obj["physical_object_type"]
buffer_distance: float = self.get_radius(physical_object_info)

# Create a buffer for the current assessment area
area_buffer: BaseGeometry = area.geometry.buffer(buffer_distance)

# If the object is within the buffer, add its `type` to the set
if obj.geometry.within(area_buffer):
unique_types_in_radius.add(obj['type'])
unique_types_in_radius.add(obj["type"])

# Count unique types and assign them to 'score' and 'types_in_radius' attributes
self.assessment_areas_gdf.at[index, 'score'] = len(unique_types_in_radius)
self.assessment_areas_gdf.at[index, 'types_in_radius'] = list(unique_types_in_radius)
self.assessment_areas_gdf.at[index, "score"] = len(unique_types_in_radius)
self.assessment_areas_gdf.at[index, "types_in_radius"] = list(unique_types_in_radius)

def get_results(self) -> gpd.GeoDataFrame:
"""
Returns the result with columns 'id', 'score', 'types_in_radius', and 'geometry' in CRS 4326.
Expand All @@ -86,4 +87,4 @@ def get_results(self) -> gpd.GeoDataFrame:
GeoDataFrame with the results in CRS 4326.
"""
# Convert back to CRS 4326 before returning
return self.assessment_areas_gdf[['score', 'types_in_radius', 'geometry']].to_crs(epsg=4326)
return self.assessment_areas_gdf[["score", "types_in_radius", "geometry"]].to_crs(epsg=4326)
Loading
Loading