Source code for imputation.MICE

import warnings
import numpy as np
import pandas as pd
import logging
import time
from typing import Dict, Union, Optional, List
from datetime import datetime
import os
import statsmodels.formula.api as smf

# Get a logger for this module using the proper package hierarchy
# This will inherit configuration from the package logger when configured
logger = logging.getLogger('imputation.mice')

# Check if logging has been configured; if not, provide helpful guidance
def _check_logging_configured():
    """Check if package logging has been configured and provide guidance if not."""
    package_logger = logging.getLogger('imputation')
    
    # Check if the package logger has any handlers (other than NullHandler)
    has_real_handlers = any(
        not isinstance(handler, logging.NullHandler) 
        for handler in package_logger.handlers
    )
    
    if not has_real_handlers and not package_logger.propagate:
        # Only show warning once per session
        if not hasattr(_check_logging_configured, '_warned'):
            logger.warning(
                "No logging configured for imputation package. "
                "Call imputation.configure_logging() to enable logging, "
                "or imputation.disable_logging() to suppress this warning."
            )
            _check_logging_configured._warned = True


from .validators import (
    validate_dataframe,
    validate_columns,
    check_n_imputations,
    check_maxit,
    check_method,
    check_initial_method,
    validate_predictor_matrix,
    check_visit_sequence,
    validate_formula,
)
from .constants import (
    InitialMethod,
    DEFAULT_METHOD,
    DEFAULT_INITIAL_METHOD,
    VisitSequence,
)

# Import concrete imputation functions
from .utils import get_imputer_func
# External helpers
from .mice_result import MICEresult
from .pooling import pool_descriptive_statistics

[docs] class MICE: """ Multiple Imputation by Chained Equations (MICE) class. This class implements the MICE algorithm for handling missing data through multiple imputations using chained equations. Parameters ---------- data : pd.DataFrame Input data with missing values. Must be a pandas DataFrame. Attributes ---------- data : pd.DataFrame The validated and cleaned input data id_obs : Dict[str, np.ndarray] Dictionary mapping column names to indices of observed values id_mis : Dict[str, np.ndarray] Dictionary mapping column names to indices of missing values """
[docs] def __init__(self, data): """ Initialize the MICE object. Parameters ---------- data : pd.DataFrame Input data with missing values. Must be a pandas DataFrame. Raises ------ ValueError If data is not a pandas DataFrame or contains duplicate column names """ # Check if logging has been configured and provide guidance if needed _check_logging_configured() logger.info("Initializing MICE object") self.data = validate_dataframe(data) self.data = validate_columns(self.data) logger.debug(f"Input data shape: {self.data.shape}") self.id_obs = {} self.id_mis = {} missing_stats = {} for col in self.data.columns: notna = self.data[col].notna() self.id_obs[col] = notna self.id_mis[col] = ~notna missing_stats[col] = { 'missing_count': (~notna).sum(), 'missing_percentage': (~notna).mean() * 100 } logger.debug("Missing value statistics:") for col, stats in missing_stats.items(): logger.debug(f" {col}: {stats['missing_count']} values ({stats['missing_percentage']:.2f}%) missing") # Container for pooled results self.result = None # Will hold the pooled `MICEresult` instance self.run_output_dir = None # For storing analysis model results self.model_results = [] # Required by statsmodels result wrappers self.nobs = self.data.shape[0] logger.info("MICE object initialized successfully")
[docs] def impute( self, n_imputations: int = 5, maxit: int = 10, predictor_matrix: Optional[pd.DataFrame] = None, initial: str = DEFAULT_INITIAL_METHOD, method: Optional[Union[str, Dict[str, str]]] = None, visit_sequence: Union[str, List[str]] = "monotone", **kwargs ) -> None: """ Perform multiple imputation by chained equations. Parameters ---------- n_imputations : int, default=5 Number of imputations to perform maxit : int, default=10 Maximum number of iterations for each imputation cycle. Must be a positive integer. predictor_matrix : pd.DataFrame, optional Binary matrix indicating which variables should be used as predictors for each target variable. Should have column names as both index and columns. A 1 indicates that the column variable is used as predictor for the index variable. If None, a predictor matrix is estimated using `_quickpred`. initial : str, default=DEFAULT_INITIAL_METHOD Initial imputation method. Must be one of SUPPORTED_INITIAL_METHODS. method : Union[str, Dict[str, str]], optional Imputation method(s) to use: - str: use the same method for all columns - Dict[str, str]: dictionary mapping column names to their methods - None: use default method for all columns Must be one of SUPPORTED_METHODS. visit_sequence : Union[str, List[str]], default="monotone" Sequence in which variables should be visited during imputation: - str: "monotone" for monotone missing data pattern - List[str]: list of column names specifying the order to visit variables **kwargs : dict Additional keyword arguments. - `output_dir` (str, optional): Directory to save outputs for this run. If not provided, a timestamped folder is created in `output_figures`. Parameters for specific imputation methods can also be passed. These should be prefixed with the method name and an underscore, e.g., `pmm_donors=5` to pass `donors=5` to the `pmm` imputer. When `predictor_matrix` is not specified, the following can be passed for `_quickpred`: - `min_cor` (float, default=0.1): Minimum correlation for a predictor. - `min_puc` (float, default=0.0): Minimum proportion of usable cases. - `include` (list, optional): Columns to always include as predictors. - `exclude` (list, optional): Columns to always exclude as predictors. - `correlation_method` (str, default="pearson"): Correlation method used to compute the correlation matrix inside `_quickpred`. """ logger.info("Starting imputation process") logger.debug(f"Parameters: n_imputations={n_imputations}, maxit={maxit}, " f"initial={initial}, method={method}, visit_sequence={visit_sequence}") start_time = time.time() check_n_imputations(n_imputations) check_maxit(maxit) check_initial_method(initial) if predictor_matrix is None: min_cor = kwargs.pop('min_cor', 0.1) min_puc = kwargs.pop('min_puc', 0.0) include = kwargs.pop('include', None) exclude = kwargs.pop('exclude', None) correlation_method = kwargs.pop('correlation_method', 'pearson') predictor_matrix = self._quickpred( min_cor=min_cor, min_puc=min_puc, include=include, exclude=exclude, method=correlation_method ) else: predictor_matrix = validate_predictor_matrix(predictor_matrix, list(self.data.columns), self.data) logger.debug("Predictor matrix validated successfully") if method is not None: self.method = check_method(method, list(self.data.columns)) else: self.method = check_method(DEFAULT_METHOD, list(self.data.columns)) logger.debug(f"Using imputation methods: {self.method}") # Store imputation parameters before using them self.imputation_params = kwargs # Warn if user provided method-specific parameters for methods not used if self.imputation_params: provided_prefixes = set() for key in self.imputation_params.keys(): if '_' in key: provided_prefixes.add(key.split('_', 1)[0]) used_methods = set(self.method.values()) unused_provided = provided_prefixes - used_methods if unused_provided: logger.warning( "Method-specific parameters were provided for unused methods: %s. " "These parameters will be ignored.", sorted(list(unused_provided)) ) self.n_imputations = n_imputations self.maxit = maxit self.predictor_matrix = predictor_matrix self.initial = initial self.imputation_params = kwargs self._set_visit_sequence(visit_sequence) logger.debug(f"Visit sequence set to: {self.visit_sequence}") # Prepare chain statistics containers # Only track statistics for numeric columns that will be imputed (i.e., have missing values) numeric_cols = self.data.select_dtypes(include=[np.number]).columns numeric_cols_to_impute = [col for col in self.visit_sequence if col in numeric_cols] self.chain_mean = { col: np.full((self.maxit, self.n_imputations), np.nan, dtype=float) for col in numeric_cols_to_impute } self.chain_var = { col: np.full((self.maxit, self.n_imputations), np.nan, dtype=float) for col in numeric_cols_to_impute } self.imputed_datasets = [] individual_times = [] for chain_idx in range(self.n_imputations): chain_start_time = time.time() logger.info(f"Starting imputation chain {chain_idx + 1}/{self.n_imputations}") self.imputed_datasets.append(self._impute_once(chain_idx)) chain_end_time = time.time() chain_duration = chain_end_time - chain_start_time individual_times.append(chain_duration) logger.info(f"Completed imputation chain {chain_idx + 1} in {chain_duration:.2f} seconds") end_time = time.time() total_duration = end_time - start_time avg_chain_time = sum(individual_times) / len(individual_times) logger.info(f"All {self.n_imputations} imputations completed in {total_duration:.2f} seconds") logger.info(f"Average time per imputation chain: {avg_chain_time:.2f} seconds") logger.debug(f"Individual chain times: {[f'{t:.2f}s' for t in individual_times]}") logger.debug("Final imputation statistics:") logger.debug(f" - Number of imputations: {self.n_imputations}") logger.debug(f" - Maximum iterations: {self.maxit}") logger.debug(f" - Initial method: {self.initial}") logger.debug(f" - Method: {self.method}") logger.debug(f" - Visit sequence: {self.visit_sequence}") logger.debug(f" - Predictor matrix provided: {self.predictor_matrix is not None}") # Create a simple result object to hold the imputed datasets for backward compatibility class ImputationResult: def __init__(self, imputed_datasets): self.imputed_datasets = imputed_datasets self.result = ImputationResult(self.imputed_datasets) logger.debug("Created result object with imputed datasets") return self.imputed_datasets
def _quickpred( self, min_cor: float = 0.1, min_puc: float = 0.0, include: Optional[List[str]] = None, exclude: Optional[List[str]] = None, method: str = "pearson" ) -> pd.DataFrame: """ Generate a predictor matrix based on correlation and proportion of usable cases. This method is inspired by the `quickpred` function from the R `mice` package. Parameters ---------- min_cor : float, default=0.1 The minimum absolute correlation required to be included as a predictor. min_puc : float, default=0.0 The minimum proportion of usable cases for correlation calculation. include : list of str, optional Columns to always include as predictors. exclude : list of str, optional Columns to always exclude as predictors. method : str, default="pearson" The correlation method to use ('pearson', 'kendall', 'spearman'). Returns ------- pd.DataFrame A square binary matrix indicating predictor relationships. """ logger.info(f"Estimating predictor matrix with min_cor={min_cor}, min_puc={min_puc}, method='{method}'") predictor_matrix = pd.DataFrame(0, index=self.data.columns, columns=self.data.columns) # Calculate correlation matrix only for numeric columns numeric_cols = self.data.select_dtypes(include=[np.number]).columns cor_matrix = self.data[numeric_cols].corr(method=method) for target_col in self.data.columns: # Skip targets with no missing values if self.id_obs[target_col].all(): continue for predictor_col in self.data.columns: if target_col == predictor_col: continue # Proportion of usable cases puc = self.data[[target_col, predictor_col]].notna().all(axis=1).mean() if puc >= min_puc: # Only use correlation if both columns are numeric if target_col in cor_matrix.index and predictor_col in cor_matrix.columns: correlation = cor_matrix.loc[target_col, predictor_col] if abs(correlation) >= min_cor: predictor_matrix.loc[target_col, predictor_col] = 1 else: # For non-numeric columns, use them as predictors predictor_matrix.loc[target_col, predictor_col] = 1 # Handle include and exclude lists with validation for unknown columns if include: unknown_includes = [c for c in include if c not in predictor_matrix.columns] if unknown_includes: raise ValueError(f"_quickpred include contains unknown columns: {unknown_includes}") predictor_matrix.loc[:, include] = 1 if exclude: unknown_excludes = [c for c in exclude if c not in predictor_matrix.columns] if unknown_excludes: raise ValueError(f"_quickpred exclude contains unknown columns: {unknown_excludes}") predictor_matrix.loc[:, exclude] = 0 # Ensure diagonal is zero np.fill_diagonal(predictor_matrix.values, 0) logger.debug(f"Estimated predictor matrix:\n{predictor_matrix}") return predictor_matrix
[docs] def fit(self, formula: str) -> None: """ Fit a statistical model to each imputed dataset using the specified formula. This method fits the specified statistical model to each dataset in self.imputed_datasets and stores the results in self.model_results. Parameters ---------- formula : str A formula string in patsy syntax for statsmodels (e.g., 'y ~ x1 + x2') Raises ------ ValueError If no imputed datasets are available or if variables in formula are not in data Examples -------- >>> mice_obj = MICE(data) >>> mice_obj.impute(n_imputations=5) >>> mice_obj.fit('outcome ~ predictor1 + predictor2') """ logger.info(f"Starting analysis with formula: {formula}") # Check if imputation has been performed if not hasattr(self, 'imputed_datasets') or not self.imputed_datasets: msg = "No imputed datasets found. Please run .impute() first." logger.error(msg) raise ValueError(msg) # Validate formula validate_formula(formula, list(self.data.columns)) # Clear any previous model results self.model_results = [] # Fit model to each imputed dataset n_datasets = len(self.imputed_datasets) logger.info(f"Fitting model to {n_datasets} imputed datasets") for i, dataset in enumerate(self.imputed_datasets): logger.debug(f"Fitting model to dataset {i + 1}/{n_datasets}") try: # Fit OLS model using statsmodels model = smf.ols(formula, data=dataset) fitted_model = model.fit() self.model_results.append(fitted_model) logger.debug(f"Successfully fitted model to dataset {i + 1}") except Exception as e: logger.error(f"Error fitting model to dataset {i + 1}: {str(e)}") raise RuntimeError(f"Failed to fit model to dataset {i + 1}: {str(e)}") # Store formula for potential later use self.formula = formula logger.info(f"Analysis completed successfully. Fitted models to {len(self.model_results)} datasets") logger.debug(f"Model results stored in self.model_results with {len(self.model_results)} entries") return self.model_results
[docs] def pool(self, summ: bool = False): """ Pool parameter estimates from fitted models using Rubin's rules. This method combines parameter estimates and their uncertainties from multiple imputed datasets according to Rubin's (1987) rules for multiple imputation inference. Parameters ---------- summ : bool, default=False If True, returns a summary of the pooled results Returns ------- MICEresult or summary If summ=False, returns a MICEresult object containing pooled estimates. If summ=True, returns a summary table of the pooled results. Raises ------ ValueError If no model results are available from analysis Notes ----- Rubin's pooling rules combine: - Point estimates: average across imputations - Within-imputation variance: average of individual model variances - Between-imputation variance: variance of point estimates across imputations - Total variance: within + (1 + 1/m) * between - Fraction of missing information (FMI): proportion of uncertainty due to missingness References ---------- Rubin, D.B. (1987). Multiple Imputation for Nonresponse in Surveys. New York: John Wiley and Sons. """ logger.info("Starting pooling of model results using Rubin's rules") # Check if analysis has been performed if not hasattr(self, 'model_results') or not self.model_results: msg = "No model results found. Please run .fit() first." logger.error(msg) raise ValueError(msg) # Check if formula was stored (should be set by fit()) if not hasattr(self, 'formula'): logger.warning("No formula found. This may indicate .fit() was not called properly.") m = len(self.model_results) # Number of imputations logger.info(f"Pooling estimates from {m} fitted models") # Extract parameters, covariances, and scales from each model params_list = [] cov_within_list = [] scale_list = [] for i, model_result in enumerate(self.model_results): logger.debug(f"Extracting results from model {i + 1}") # Extract parameter estimates params_list.append(model_result.params.values) # Extract covariance matrix (within-imputation variance) cov_within_list.append(model_result.cov_params().values) # Extract scale (residual variance) scale_list.append(model_result.scale) # Convert to numpy arrays for easier computation params_array = np.array(params_list) # Shape: (m, p) where p = number of parameters cov_within_array = np.array(cov_within_list) # Shape: (m, p, p) scale_array = np.array(scale_list) logger.debug(f"Parameter array shape: {params_array.shape}") logger.debug(f"Covariance array shape: {cov_within_array.shape}") # Apply Rubin's pooling rules # 1. Pooled point estimates (qbar): average of individual estimates pooled_params = np.mean(params_array, axis=0) logger.debug(f"Computed pooled parameter estimates: {pooled_params}") # 2. Within-imputation variance (ubar): average of individual covariances cov_within = np.mean(cov_within_array, axis=0) # 3. Between-imputation variance (b): covariance of parameter estimates across imputations if m > 1: cov_between = np.cov(params_array, rowvar=False, ddof=1) else: cov_between = np.zeros_like(cov_within) logger.warning("Only one imputation available. Between-imputation variance set to zero.") # 4. Total covariance matrix using Rubin's rules # Total variance = within + (1 + 1/m) * between f = 1.0 + 1.0 / m # Adjustment factor cov_total = cov_within + f * cov_between # 5. Fraction of missing information (FMI) # FMI = (1 + 1/m) * diag(between) / diag(total) if m > 1: fmi = f * np.diag(cov_between) / np.diag(cov_total) # Ensure FMI is between 0 and 1 fmi = np.clip(fmi, 0.0, 1.0) else: fmi = np.zeros(len(pooled_params)) # 6. Pooled scale (average of individual scales) pooled_scale = np.mean(scale_array) logger.debug(f"Computed within-imputation variance diagonal: {np.diag(cov_within)}") logger.debug(f"Computed between-imputation variance diagonal: {np.diag(cov_between)}") logger.debug(f"Computed total variance diagonal: {np.diag(cov_total)}") logger.debug(f"Computed fraction of missing information: {fmi}") logger.debug(f"Computed pooled scale: {pooled_scale}") # Create parameter names (use from first model) param_names = list(self.model_results[0].params.index) logger.debug(f"Parameter names: {param_names}") # Store results for backward compatibility self.exog_names = param_names if hasattr(self.model_results[0], 'model') and hasattr(self.model_results[0].model, 'endog_names'): self.endog_names = self.model_results[0].model.endog_names # Create MICEresult object logger.debug("Creating MICEresult object") from .mice_result import MICEresult # The MICEresult expects normalized covariance params (divided by scale) normalized_cov_params = cov_total / pooled_scale pooled_result = MICEresult(self, pooled_params, normalized_cov_params) pooled_result.scale = pooled_scale pooled_result.frac_miss_info = fmi # Store additional pooling diagnostics pooled_result.cov_within = cov_within pooled_result.cov_between = cov_between pooled_result.cov_total = cov_total pooled_result.m = m # Store the result self.pooled_result = pooled_result logger.info("Pooling completed successfully using Rubin's rules") logger.debug(f"Pooled estimates: {dict(zip(param_names, pooled_params))}") logger.debug(f"Fraction of missing information: {dict(zip(param_names, fmi))}") if summ: logger.debug("Generating summary") return pooled_result.summary() # Return comprehensive results for analysis comprehensive_result = { 'pooled_result': pooled_result, 'pooled_params': pooled_params, 'pooled_covariance': cov_total, 'within_covariance': cov_within, 'between_covariance': cov_between, 'fraction_missing_info': fmi, 'pooled_scale': pooled_scale, 'n_imputations': m, 'parameter_names': param_names, 'formula': getattr(self, 'formula', None) } return comprehensive_result
def _impute_once(self, chain_idx: int): """ Perform one complete imputation cycle. Returns ------- pd.DataFrame A copy of the data with one complete imputation cycle applied """ logger.debug(f"Starting imputation cycle for chain {chain_idx}") current_data = self.data.copy(deep=True) logger.debug("Performing initial imputation") # Create ONE RNG for this entire chain (matching R behavior) # The RNG state will advance through all iterations rng = np.random.default_rng(42 + chain_idx) self._initial_imputation(current_data, rng) for iter_idx in range(self.maxit): logger.debug(f"Starting iteration {iter_idx + 1}/{self.maxit} for chain {chain_idx}") # Pass the same RNG to each iteration - it will advance with each call current_data = self._iterate(current_data, iter_idx, chain_idx, rng) logger.debug(f"Completed iteration {iter_idx + 1}") logger.debug(f"Completed imputation cycle for chain {chain_idx}") return current_data def _iterate(self, data: pd.DataFrame, iter_idx: int, chain_idx: int, rng: np.random.Generator): """ Perform one iteration of the imputation cycle. Parameters ---------- data : pd.DataFrame Data to iterate over iter_idx : int Current iteration index chain_idx : int Current chain index rng : np.random.Generator Random number generator for this chain (state advances across iterations) Returns ------- pd.DataFrame A copy of the data with one iteration of the imputation cycle applied """ updated_data = data iteration_start_time = time.time() for col in self.visit_sequence: logger.debug(f"Processing column '{col}' (iteration {iter_idx + 1}, chain {chain_idx})") method_name = self.method[col] # Determine predictors if self.predictor_matrix is not None: predictor_flags = self.predictor_matrix.loc[col] predictor_cols = predictor_flags[predictor_flags == 1].index.tolist() predictor_cols = [c for c in predictor_cols if c != col] else: predictor_cols = [c for c in updated_data.columns if c != col] logger.debug(f"Using {len(predictor_cols)} predictors for column '{col}'") predictors = updated_data[predictor_cols] # Prepare arrays/masks y = updated_data[col].to_numpy() id_obs_mask = self.id_obs[col] id_mis_mask = self.id_mis[col] id_obs = id_obs_mask.to_numpy() id_mis = id_mis_mask.to_numpy() # Get imputer function and perform imputation imputer_func = get_imputer_func(method_name) logger.debug(f"Using imputation method '{method_name}' for column '{col}'") # Extract method-specific parameters from kwargs method_params = {} prefix = f"{method_name}_" for key, value in self.imputation_params.items(): if key.startswith(prefix): param_name = key[len(prefix):] method_params[param_name] = value # Pass rng to all imputation methods for reproducibility method_params['rng'] = rng if method_params: logger.debug(f"Passing parameters to imputer: {method_params}") imputed_values = imputer_func(y=y, id_obs=id_obs, id_mis=id_mis, x=predictors, **method_params) logger.debug(f"Successfully imputed {len(imputed_values)} values for column '{col}'") # Assign imputed values updated_data.loc[id_mis_mask, col] = imputed_values # Record chain statistics (only for numeric columns) if id_mis.sum() > 0 and col in self.chain_mean: imputed_arr = np.asarray(imputed_values, dtype=float) mean_val = np.nanmean(imputed_arr) self.chain_mean[col][iter_idx, chain_idx] = mean_val if imputed_arr.size > 1: var_val = np.nanvar(imputed_arr, ddof=1) self.chain_var[col][iter_idx, chain_idx] = var_val logger.debug(f"Chain statistics for '{col}': mean={mean_val:.4f}, variance={var_val:.4f}") else: self.chain_var[col][iter_idx, chain_idx] = np.nan logger.debug(f"Chain statistics for '{col}': mean={mean_val:.4f}, variance=N/A (single value)") iteration_time = time.time() - iteration_start_time logger.debug(f"Iteration {iter_idx + 1} completed in {iteration_time:.2f} seconds") return updated_data # def _initial_imputation(self, data): # """ # Initialize missing values based on the initial method. # Parameters # ---------- # data : pd.DataFrame # Data to initialize missing values in # """ # if self.initial == InitialMethod.SAMPLE.value: # for col in data.columns: # if data[col].isna().any(): # observed_values = data.loc[self.id_obs[col], col].values # # data.loc[self.id_mis[col], col] = np.random.choice(observed_values, size=self.id_mis[col].sum()) # seed = 42 # rng = np.random.default_rng(seed) # independent generator # data.loc[self.id_mis[col], col] = rng.choice( # observed_values, # size=self.id_mis[col].sum() # ) def _initial_imputation(self, data, rng=None): """ Initialize missing values based on the initial method. """ if rng is None: rng = np.random.default_rng() # fresh random generator if self.initial == InitialMethod.SAMPLE.value: for col in data.columns: if data[col].isna().any(): observed_values = data.loc[self.id_obs[col], col].values data.loc[self.id_mis[col], col] = rng.choice( observed_values, size=self.id_mis[col].sum() ) elif self.initial == InitialMethod.MEANOBS.value: for col in data.columns: if data[col].isna().any(): col_mean = data[col].mean() observed_values = data.loc[self.id_obs[col], col] closest_idx = (observed_values - col_mean).abs().idxmin() closest_value = data.loc[closest_idx, col] data.loc[self.id_mis[col], col] = closest_value def _set_visit_sequence(self, visit_sequence): """ Set the visit sequence for imputation based on the input parameter. Parameters ---------- visit_sequence : Union[str, List[str]] Visit sequence specification. Can be: - str: "monotone" or "random" for predefined sequences - List[str]: list of column names specifying the order to visit variables Must include all columns with missing values. Columns without missing values will be ignored with a warning. """ columns_with_missing = [col for col in self.data.columns if self.data[col].isna().any()] # Validate using centralized validator validated_sequence, cols_without_missing = check_visit_sequence( visit_sequence, list(self.data.columns), columns_with_missing ) if isinstance(visit_sequence, list): # Warn about columns without missing values if any were filtered out if cols_without_missing: logger.warning( f"Visit sequence includes columns without missing values: {cols_without_missing}. " f"These columns will be ignored during imputation." ) self.visit_sequence = validated_sequence else: # For string sequences, generate based on type if visit_sequence == VisitSequence.RANDOM.value: # Use seeded RNG for reproducible random visit sequence rng = np.random.default_rng(42) self.visit_sequence = list(rng.permutation(columns_with_missing)) elif visit_sequence == VisitSequence.MONOTONE.value: nmis = np.array([self.id_mis[col].sum() for col in columns_with_missing]) ii = np.argsort(nmis) self.visit_sequence = [columns_with_missing[i] for i in ii]