@@ -593,3 +593,215 @@ def predict_proba(self, X):
593593
594594 proba = self ._sigmoid (predictions )
595595 return np .vstack ([1 - proba , proba ]).T
596+
597+
598+
599+ class XGBoostObjective (Enum ):
600+ REG_SQUAREDERROR = "reg:squarederror"
601+ BINARY_LOGISTIC = "binary:logistic"
602+
603+ class XGBoostNode :
604+ def __init__ (self ):
605+ self .feature_idx : int = None
606+ self .threshold : float = None
607+ self .left : 'XGBoostNode' = None
608+ self .right : 'XGBoostNode' = None
609+ self .value : float = None
610+ self .gain : float = 0.0
611+ self .cover : float = 0.0
612+
613+ class XGBoostTree :
614+ def __init__ (self , max_depth : int = 6 , min_child_weight : float = 1.0 ,
615+ lambda_ : float = 1.0 , gamma : float = 0.0 ):
616+ self .max_depth = max_depth
617+ self .min_child_weight = min_child_weight
618+ self .lambda_ = lambda_
619+ self .gamma = gamma
620+ self .root = None
621+
622+ def _calc_leaf_value (self , grad : np .ndarray , hess : np .ndarray ) -> float :
623+ return - np .sum (grad ) / (np .sum (hess ) + self .lambda_ )
624+
625+ def _calc_gain (self , grad : np .ndarray , hess : np .ndarray ) -> float :
626+ G , H = np .sum (grad ), np .sum (hess )
627+ return (G * G ) / (H + self .lambda_ )
628+
629+ def _find_best_split (self , X : np .ndarray , grad : np .ndarray , hess : np .ndarray ) -> tuple :
630+ best_gain = 0.0
631+ best_feature_idx = None
632+ best_threshold = None
633+ total_gain = self ._calc_gain (grad , hess )
634+ n_features = X .shape [1 ]
635+
636+ for feature_idx in range (n_features ):
637+ feature_values = X [:, feature_idx ]
638+ unique_values = np .unique (feature_values )
639+
640+ if len (unique_values ) <= 1 :
641+ continue
642+
643+ thresholds = (unique_values [:- 1 ] + unique_values [1 :]) / 2
644+
645+ for threshold in thresholds :
646+ left_mask = feature_values <= threshold
647+ right_mask = ~ left_mask
648+
649+ if (np .sum (hess [left_mask ]) < self .min_child_weight or
650+ np .sum (hess [right_mask ]) < self .min_child_weight ):
651+ continue
652+
653+ left_gain = self ._calc_gain (grad [left_mask ], hess [left_mask ])
654+ right_gain = self ._calc_gain (grad [right_mask ], hess [right_mask ])
655+ gain = left_gain + right_gain - total_gain - self .gamma
656+
657+ if gain > best_gain :
658+ best_gain = gain
659+ best_feature_idx = feature_idx
660+ best_threshold = threshold
661+
662+ return best_feature_idx , best_threshold , best_gain
663+
664+ def _build_tree (self , X : np .ndarray , grad : np .ndarray , hess : np .ndarray ,
665+ depth : int = 0 ) -> XGBoostNode :
666+ node = XGBoostNode ()
667+ node .cover = np .sum (hess )
668+
669+ if depth >= self .max_depth :
670+ node .value = self ._calc_leaf_value (grad , hess )
671+ return node
672+
673+ feature_idx , threshold , gain = self ._find_best_split (X , grad , hess )
674+
675+ if feature_idx is None or gain <= 0 :
676+ node .value = self ._calc_leaf_value (grad , hess )
677+ return node
678+
679+ left_mask = X [:, feature_idx ] <= threshold
680+ right_mask = ~ left_mask
681+
682+ node .feature_idx = feature_idx
683+ node .threshold = threshold
684+ node .gain = gain
685+ node .left = self ._build_tree (X [left_mask ], grad [left_mask ],
686+ hess [left_mask ], depth + 1 )
687+ node .right = self ._build_tree (X [right_mask ], grad [right_mask ],
688+ hess [right_mask ], depth + 1 )
689+
690+ return node
691+
692+ def _predict_sample (self , x : np .ndarray , node : XGBoostNode ) -> float :
693+ if node .value is not None :
694+ return node .value
695+
696+ if x [node .feature_idx ] <= node .threshold :
697+ return self ._predict_sample (x , node .left )
698+ return self ._predict_sample (x , node .right )
699+
700+ def fit (self , X : np .ndarray , grad : np .ndarray , hess : np .ndarray ) -> 'XGBoostTree' :
701+ self .root = self ._build_tree (X , grad , hess )
702+ return self
703+
704+ def predict (self , X : np .ndarray ) -> np .ndarray :
705+ return np .array ([self ._predict_sample (x , self .root ) for x in X ])
706+
707+ class XGBoost :
708+ def __init__ (self , objective : str = "reg:squarederror" , n_estimators : int = 100 ,
709+ learning_rate : float = 0.3 , max_depth : int = 6 ,
710+ min_child_weight : float = 1.0 , subsample : float = 1.0 ,
711+ colsample_bytree : float = 1.0 , lambda_ : float = 1.0 ,
712+ gamma : float = 0.0 , random_state : int = None ):
713+ self .objective = XGBoostObjective (objective )
714+ self .n_estimators = n_estimators
715+ self .learning_rate = learning_rate
716+ self .max_depth = max_depth
717+ self .min_child_weight = min_child_weight
718+ self .subsample = subsample
719+ self .colsample_bytree = colsample_bytree
720+ self .lambda_ = lambda_
721+ self .gamma = gamma
722+ self .random_state = random_state
723+ self .rng = np .random .default_rng (random_state )
724+
725+ self .trees = []
726+ self .base_score = 0.5
727+
728+ def _sigmoid (self , x : np .ndarray ) -> np .ndarray :
729+ return 1 / (1 + np .exp (- x ))
730+
731+ def _compute_gradients (self , y : np .ndarray , pred : np .ndarray ) -> tuple [np .ndarray , np .ndarray ]:
732+ if self .objective == XGBoostObjective .REG_SQUAREDERROR :
733+ grad = pred - y
734+ hess = np .ones_like (y )
735+ else :
736+ prob = self ._sigmoid (pred )
737+ grad = prob - y
738+ hess = prob * (1 - prob )
739+ return grad , hess
740+
741+ def _subsample_data (self , X : np .ndarray , y : np .ndarray ,
742+ grad : np .ndarray , hess : np .ndarray ) -> tuple :
743+ # Row subsampling
744+ if self .subsample < 1.0 :
745+ n_samples = int (X .shape [0 ] * self .subsample )
746+ indices = self .rng .choice (X .shape [0 ], size = n_samples , replace = False )
747+ X = X [indices ]
748+ y = y [indices ]
749+ grad = grad [indices ]
750+ hess = hess [indices ]
751+
752+ if self .colsample_bytree < 1.0 :
753+ n_features = int (X .shape [1 ] * self .colsample_bytree )
754+ feature_indices = self .rng .choice (X .shape [1 ], size = n_features , replace = False )
755+ X = X [:, feature_indices ]
756+
757+ return X , y , grad , hess
758+
759+ def fit (self , X : np .ndarray , y : np .ndarray ) -> 'XGBoost' :
760+ if self .objective == XGBoostObjective .BINARY_LOGISTIC :
761+ y = (y > 0 ).astype (np .float64 )
762+ self .base_score = np .log (np .mean (y ) / (1 - np .mean (y ) + 1e-6 ))
763+ else :
764+ self .base_score = np .mean (y )
765+
766+ predictions = np .full (X .shape [0 ], self .base_score )
767+ self .trees = []
768+
769+ for _ in range (self .n_estimators ):
770+ grad , hess = self ._compute_gradients (y , predictions )
771+
772+ X_tree , y_tree , grad_tree , hess_tree = self ._subsample_data (X , y , grad , hess )
773+
774+ tree = XGBoostTree (
775+ max_depth = self .max_depth ,
776+ min_child_weight = self .min_child_weight ,
777+ lambda_ = self .lambda_ ,
778+ gamma = self .gamma
779+ )
780+ tree .fit (X_tree , grad_tree , hess_tree )
781+ self .trees .append (tree )
782+
783+ predictions += self .learning_rate * tree .predict (X )
784+
785+ return self
786+
787+ def predict (self , X : np .ndarray ) -> np .ndarray :
788+ predictions = np .full (X .shape [0 ], self .base_score )
789+
790+ for tree in self .trees :
791+ predictions += self .learning_rate * tree .predict (X )
792+
793+ if self .objective == XGBoostObjective .BINARY_LOGISTIC :
794+ return (self ._sigmoid (predictions ) >= 0.5 ).astype (int )
795+ return predictions
796+
797+ def predict_proba (self , X : np .ndarray ) -> np .ndarray :
798+ if self .objective != XGBoostObjective .BINARY_LOGISTIC :
799+ raise ValueError ("predict_proba is only available for binary classification" )
800+
801+ predictions = np .full (X .shape [0 ], self .base_score )
802+
803+ for tree in self .trees :
804+ predictions += self .learning_rate * tree .predict (X )
805+
806+ proba = self ._sigmoid (predictions )
807+ return np .vstack ([1 - proba , proba ]).T
0 commit comments