Source code for imputation.rf

import numpy as np
import pandas as pd
from typing import Union, Optional
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
import logging

# Get a logger for the current module.
# This will be a child of the 'imputation' logger configured in MICE.py
logger = logging.getLogger('imputation.rf')

[docs] def rf( y: Union[pd.Series, np.ndarray], id_obs: np.ndarray, x: Union[pd.DataFrame, np.ndarray], id_mis: Optional[np.ndarray] = None, n_estimators: int = 10, rng: Optional[np.random.Generator] = None, **kwargs ) -> np.ndarray: """ Impute missing values using Random Forests with donor sampling. This function is designed to be compatible with the MICE framework, following the same interface as PMM, midas, CART, and sample methods. Parameters ---------- y : Union[pd.Series, np.ndarray] Target variable with missing values id_obs : np.ndarray Boolean mask of observed values in y (True = observed, False = missing) x : Union[pd.DataFrame, np.ndarray] Predictor variables (should be the current completed columns) id_mis : np.ndarray, optional Boolean mask of missing values. If None, uses ~id_obs. n_estimators : int, default=10 Number of trees in the forest rng : np.random.Generator, optional Random number generator for reproducibility. If None, a fresh generator is used. **kwargs : dict Additional parameters passed to the random forest model. Returns ------- np.ndarray Imputed values for missing positions only. Notes ----- Algorithm (Doove et al., 2014; mirrors R mice): 1. Fit a random forest on observed data. 2. For each missing case, find terminal nodes across all trees. 3. For each tree, collect donors (observed cases in same node). 4. Randomly sample one donor per tree. 5. Take final imputation as a random draw from those donor predictions. Bootstrapping is inherent to Random Forest (bagging), so no additional bootstrap is applied (matching R mice behavior). Each tree is already built on a bootstrap sample of the data. """ logger.debug("Starting Random Forest imputation.") if rng is None: rng = np.random.default_rng() # One-hot encode categoricals if isinstance(x, pd.DataFrame) and (x.select_dtypes(include=["object", "category"]).shape[1] > 0): logger.debug("One-hot encoding categorical predictors.") x = pd.get_dummies(x, drop_first=True) x = np.asarray(x) id_obs = np.asarray(id_obs, bool) if id_mis is None: id_mis = ~id_obs x_obs, y_obs = x[id_obs], np.asarray(y)[id_obs] x_mis = x[id_mis] if x_mis.shape[0] == 0: return np.array([]) if len(y_obs) < 2: logger.warning("Not enough observed data for RF. Using fallback imputation.") if pd.api.types.is_numeric_dtype(y_obs): return np.full(np.sum(id_mis), np.mean(y_obs)) else: from collections import Counter most_frequent = Counter(y_obs).most_common(1)[0][0] return np.full(np.sum(id_mis), most_frequent) is_numeric = pd.api.types.is_numeric_dtype(y_obs) if is_numeric: logger.debug("Performing regression random forest imputation.") imputed_values = _rf_regression_impute(x_obs, x_mis, y_obs, n_estimators, rng, **kwargs) else: logger.debug("Performing classification random forest imputation.") imputed_values = _rf_classification_impute(x_obs, x_mis, y_obs, n_estimators, rng, **kwargs) logger.debug(f"Random Forest imputation complete. Imputed {len(imputed_values)} values.") return imputed_values
def _rf_regression_impute(x_obs, x_mis, y_obs, n_estimators, rng, **kwargs): """Helper for regression random forest imputation.""" rf_seed = int(rng.integers(2**31 - 1)) kwargs.pop("random_state", None) rf = RandomForestRegressor( n_estimators=n_estimators, max_depth=kwargs.pop("max_depth", None), min_samples_leaf=kwargs.pop("min_samples_leaf", 1), random_state=rf_seed, n_jobs=-1, **kwargs ) rf.fit(x_obs, y_obs) nodes_obs = np.array([t.apply(x_obs) for t in rf.estimators_]).T nodes_mis = np.array([t.apply(x_mis) for t in rf.estimators_]).T imputed = np.empty(x_mis.shape[0]) fallback_count = 0 for i in range(x_mis.shape[0]): donors_all = [] for j, _ in enumerate(rf.estimators_): same = nodes_obs[:, j] == nodes_mis[i, j] if same.any(): donors_all.append(rng.choice(y_obs[same])) if donors_all: imputed[i] = rng.choice(donors_all) else: imputed[i] = y_obs.mean() fallback_count += 1 if fallback_count: logger.debug(f"RF regression donor fallback used {fallback_count}/{len(imputed)} times.") return imputed def _rf_classification_impute(x_obs, x_mis, y_obs, n_estimators, rng, **kwargs): """Helper for classification random forest imputation.""" unique_cats = np.unique(y_obs) if len(unique_cats) == 1: return np.repeat(unique_cats[0], x_mis.shape[0]) rf_seed = int(rng.integers(2**31 - 1)) kwargs.pop("random_state", None) rf = RandomForestClassifier( n_estimators=n_estimators, max_depth=kwargs.pop("max_depth", None), min_samples_leaf=kwargs.pop("min_samples_leaf", 1), random_state=rf_seed, n_jobs=-1, **kwargs ) rf.fit(x_obs, y_obs) nodes_obs = np.array([t.apply(x_obs) for t in rf.estimators_]).T nodes_mis = np.array([t.apply(x_mis) for t in rf.estimators_]).T imputed = np.empty(x_mis.shape[0], dtype=y_obs.dtype) fallback_count = 0 for i in range(x_mis.shape[0]): donors_all = [] for j, _ in enumerate(rf.estimators_): same = nodes_obs[:, j] == nodes_mis[i, j] if same.any(): donors_all.append(rng.choice(y_obs[same])) if donors_all: imputed[i] = rng.choice(donors_all) else: vals, counts = np.unique(y_obs, return_counts=True) imputed[i] = vals[rng.choice(np.flatnonzero(counts == counts.max()))] fallback_count += 1 if fallback_count: logger.debug(f"RF classification donor fallback used {fallback_count}/{len(imputed)} times.") return imputed