Source code for imputation.midas

import numpy as np
[docs] def bootfunc_plain(n): """ Generates bootstrap weights for n observations using simple random sampling with replacement. This function simulates a nonparametric bootstrap by randomly drawing `n` integers from the range 1 to n (inclusive), with replacement. It returns the count of how many times each index (1-based) is selected, producing a frequency table that can be used as weights in e.g. MIDAS imputation. Parameters ---------- n : int The number of observations to sample and also the length of the resulting weight vector. Returns ------- weights : ndarray of shape (n,) An array of integers indicating how often each index (1-based) was selected in the bootstrap sample. """ #random n int with size n from 1:n random = np.random.choice(n, size=n, replace=True)+1 #returns histogram of drawn ints table, _ = np.histogram(random, bins=np.arange(1, n + 2)) return table
[docs] def minmax(x, domin=True, domax=True): maxx = np.sqrt(np.finfo(float).max) minx = np.sqrt(np.finfo(float).eps) if domin: x = np.minimum(x, maxx) if domax: x = np.maximum(x, minx) return x
[docs] def compute_beta(x, m): A = x[:m**2].reshape((m, m)) b = x[m**2:] return np.linalg.solve(A, b)
[docs] def midas(y, id_obs, x, id_mis=None, ridge=1e-5, midas_kappa=None, outout=True, **kwargs): """ MIDAS Imputation: Multiple Imputation with Distant Average Substitution. This function implements the MIDAS imputation algorithm for continuous variables, as introduced by Gaffert et al. (2018). It operates by weighting observed donors based on the similarity between predicted values, with optional leave-one-out model estimation for increased fidelity. Parameters ---------- y : array-like of shape (n_samples,) The target variable with missing values to be imputed. Must be numeric. id_obs : array-like of bool of shape (n_samples,) Logical array indicating observed values in `y`. True where `y` is observed, False where missing. x : array-like of shape (n_samples, n_features) Design matrix of predictor variables. Must be fully observed. id_mis : np.ndarray, optional Boolean mask of missing values to impute. If None, uses ~id_obs. ridge : float, default=1e-5 Ridge penalty used in regularized regression to stabilize the solution in the presence of multicollinearity. - Set lower (e.g. 1e-6) to reduce bias in noisy data. - Set higher (e.g. 1e-4) if collinearity is suspected. midas_kappa : float or None, default=None Controls the sharpness of donor weighting. If None, the optimal value is estimated based on R² as described by Siddique and Belin (2008). A common fallback is 3. outout : bool, default=True If True, uses leave-one-out regression for each donor (slow but MI-proper). If False, a single model is estimated for all donors and recipients. WARNING: Setting `outout=False` may produce biased estimates and is not fully supported. **kwargs : dict Additional arguments (not used in this method). Returns ------- y_imp : np.ndarray Imputed values for missing positions only (matching R implementation). Notes ----- - Based on: Gaffert, P., Meinfelder, F., & van den Bosch, V. (2018). "Towards an MI-proper Predictive Mean Matching." - Related: Siddique, J. & Belin, T. R. (2008). "Multiple Imputation Using an Iterative Hot-Deck with Distance-Based Donor Selection." Examples -------- >>> y = np.array([7, np.nan, 9, 10, 11]) >>> id_obs = ~np.isnan(y) >>> x = np.array([[1, 2], [3, 4], [5, 6], [7, 13], [11, 10]]) >>> midas(y, id_obs, x) array([9.0]) """ # Validate predictors (x): must be numeric and contain no missing values import pandas as pd if isinstance(x, pd.DataFrame): non_numeric_cols = x.select_dtypes(exclude=[np.number]).columns.tolist() if non_numeric_cols: raise ValueError( f"Predictors must be numeric for midas. Non-numeric predictors found: {non_numeric_cols}" ) missing_cols = x.columns[x.isna().any()].tolist() if missing_cols: raise ValueError( f"Predictors must not contain missing values for midas. Columns with missing values: {missing_cols}" ) x = x.to_numpy() else: x = np.asarray(x) try: _ = x.astype(float, copy=False) except Exception: raise ValueError("Predictors must be numeric for midas. Could not convert 'x' to numeric array.") if np.isnan(x).any(): raise ValueError("Predictors must not contain missing values for midas.") # Validate target (y): must be numeric for midas y_array = np.asarray(y) try: y_numeric = y_array.astype(float) except Exception: raise ValueError("Target y must be numeric for midas. Found non-numeric values.") if id_mis is None: id_mis = ~id_obs #machine epsilon sminx = np.finfo(float).eps ** (1 / 4) x = np.asarray(x, dtype=float) x = np.c_[np.ones(x.shape[0]), x] y = y_numeric.astype(float, copy=False) nobs = np.sum(id_obs) n = len(id_obs) m = x.shape[1] yobs = y[id_obs] xobs = x[id_obs, :] xmis = x[id_mis, :] #P Step omega = bootfunc_plain(nobs) CX = omega.reshape(-1, 1) * xobs XCX = xobs.T @ CX ## if ridge > 0: dia = np.diag(XCX) dia = dia * np.concatenate(([1], np.repeat(1 + ridge, m - 1))) np.fill_diagonal(XCX, dia) diag0 = np.where(np.diag(XCX) == 0)[0] if len(diag0) > 0: XCX[diag0, diag0] = max(sminx, ridge) Xy = CX.T @ yobs #CX = observed data * bootstrap frequencies #XCX = observed data * CX beta = np.linalg.solve(XCX, Xy) yhat_obs = xobs @ beta if midas_kappa is None: mean_y = np.dot(yobs, omega) / nobs eps = yobs - yhat_obs r2 = 1 - (np.dot(omega, eps ** 2) / np.dot(omega, (yobs - mean_y) ** 2)) #section 5.3.1 #min function is used correction gets active for r2>.999 only because division by 0 #if r2 cannot be determined (eg zero variance in yhat), use 3 as suggested by Siddique / Belin #if taking delta as in the paper there are numerical errors needing to be fixed if r2 < 1: midas_kappa = min((50 * r2 / (1 - r2))** (3 / 8),100) if np.isnan(midas_kappa): midas_kappa = 3 if outout: XXarray_pre = np.array([np.outer(xobs[i], xobs[i]).flatten() * omega[i] for i in range(nobs)]).T ridgeind = np.arange(1, m) * (m + 1) if ridge > 0: XXarray_pre[ridgeind, :] *= (1 + ridge) XXarray = XCX.ravel()[:, None] - XXarray_pre diag0 = np.where(np.diag(XXarray) == 0)[0] if len(diag0) > 0: XXarray[diag0, diag0] = max(sminx, ridge) Xyarray = Xy.ravel()[:, None] - (xobs * yobs[:, None] * omega[:, None]).T ##solve(a = matrix(head(x, m^2), m), b = tail(x, m)) for each column stacked_array = np.vstack((XXarray, Xyarray)) BETAarray = np.apply_along_axis(compute_beta, axis=0, arr=stacked_array, m=m) # y YHATdon = np.sum(xobs * BETAarray.T, axis=1) YHATrec = xmis @ BETAarray # distance matrix dist_mat = YHATdon - YHATrec else: yhat_mis = xmis @ beta dist_mat = (yhat_obs[:, np.newaxis] - np.tile(yhat_mis, (nobs, 1))).T delta_mat = 1 / (np.abs(dist_mat) ** midas_kappa) delta_mat = minmax(delta_mat) probs = delta_mat * omega csums = minmax(np.nansum(probs, axis=1)) probs /= csums[:, np.newaxis] probs = probs.T index = np.array([ np.random.choice(nobs, size=1, replace=False, p=probs[:, j])[0] for j in range(probs.shape[1]) ]) imputed_values = yobs[index] #PLF correction implemented needs to be saved globally over iterations #mean(1 / rowSums((t(delta.mat) / csums)^2)) #consists row_sums = np.sum((delta_mat / csums[:, np.newaxis])**2, axis=1) #mean(1 / rowSums((t(delta.mat) / csums)^2)) neff = np.mean(1 / row_sums) return imputed_values