# License: BSD-3-Clause
import os
from os.path import dirname
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.cluster import KMeans
from sklearn.neighbors import NearestNeighbors
from ..impute import get_observed_mod_indicator
from ..utils import check_Xs_y
from .. import octavemodule_installed, oct2py_module_error, Tensor
if octavemodule_installed:
import oct2py
[docs]
class PIMVC(BaseEstimator, ClusterMixin):
r"""
Projective Incomplete Multi-View Clustering (PIMVC). [#pimvcpaper]_ [#pimvccode]_
The objective of PIMVC is to simultaneously discover the projection matrix for each modality and establish a unified
feature representation shared across incomplete multiple views, facilitating clustering. Essentially, PIMVC
transforms the traditional multi-modality matrix factorization model into a multi-modality projection learning model. By
consolidating various modality-specific objective losses into a cohesive subspace of equal dimensions, it adeptly
handles the challenge where a single modality might overly influence consensus representation learning due to
imbalanced information across views stemming from diverse dimensions. Furthermore, to capture the data geometric
structure, PIMVC incorporates a penalty term for graph regularization.
Parameters
----------
n_clusters : int, default=8
The number of clusters to generate.
dele : float, default=0.1
nonnegative.
lamb : float, default=100000
Penalty parameters. Should be greather than 0.
beta : float, default=1
Trade-off parameter.
k : int, default=3
Parameter k of KNN graph.
neighbor_mode : str, default='knn'
Indicates how to construct the graph. Options are 'knn' (default), and 'supervised'.
weight_mode : str, default='binary'
Indicates how to assign weights for each edge in the graph. Options are 'binary' (default), 'cosine' and 'heatkernel'.
max_iter : int, default=100
Maximum number of iterations.
random_state : int, default=None
Determines the randomness. Use an int to make the randomness deterministic.
engine : str, default='python'
Engine to use for computing the model. Options are 'octave' or 'python'.
verbose : bool, default=False
Verbosity mode.
clean_space : bool, default=True
If engine is 'octave' and clean_space is True, the session will be closed after fitting the model.
Attributes
----------
labels_ : array-like of shape (n_samples,)
Labels of each point in training data.
embedding_ : array-like of shape (n_samples, n_clusters)
Consensus clustering matrix to be used as input for the KMeans clustering step.
loss_ : array-like of shape (n_iter_,)
Values of the loss function.
n_iter_ : int
Number of iterations.
References
----------
.. [#pimvcpaper] S. Deng, J. Wen, C. Liu, K. Yan, G. Xu and Y. Xu, "Projective Incomplete Multi-View Clustering,"
in IEEE Transactions on Neural Networks and Learning Systems, doi: 10.1109/TNNLS.2023.3242473.
.. [#pimvccode] https://github.com/Dshijie/PIMVC
Example
--------
>>> import numpy as np
>>> import pandas as pd
>>> from imml.cluster import PIMVC
>>> Xs = [pd.DataFrame(np.random.default_rng(42).random((20, 10))) for i in range(3)]
>>> estimator = PIMVC(n_clusters = 2)
>>> labels = estimator.fit_predict(Xs)
"""
def __init__(self, n_clusters: int = 8, dele: float = 0.1, lamb: int = 100000, beta: int = 1, k: int = 3,
neighbor_mode: str = 'knn', weight_mode: str = 'binary', max_iter: int = 100,
random_state: int = None, engine: str = "python", verbose = False, clean_space: bool = True):
if not isinstance(n_clusters, int):
raise ValueError(f"Invalid n_clusters. It must be an int. A {type(n_clusters)} was passed.")
if n_clusters < 2:
raise ValueError(f"Invalid n_clusters. It must be an greater than 1. {n_clusters} was passed.")
engines_options = ["octave", "python"]
if engine not in engines_options:
raise ValueError(f"Invalid engine. Expected one of {engines_options}. {engine} was passed.")
if (engine == "octave") and (not octavemodule_installed):
raise ImportError(oct2py_module_error)
if lamb <= 0:
raise ValueError(f"Invalid lamb. It must be a positive value. {lamb} was passed.")
if k <= 0:
raise ValueError(f"Invalid k. It must be a positive value. {k} was passed.")
if neighbor_mode not in ["knn", "supervised"]:
raise ValueError(f"Invalid neighbor_mode. Expected one of ['knn', 'supervised']. {neighbor_mode} was passed.")
if weight_mode not in ["binary", "cosine", "heatkernel"]:
raise ValueError(f"Invalid weight_mode. Expected one of ['binary', 'cosine', 'heatkernel']. {weight_mode} was passed.")
if max_iter <= 0:
raise ValueError(f"Invalid max_iter. It must be a positive value. {max_iter} was passed.")
if not isinstance(clean_space, bool):
raise ValueError(f"Invalid clean_space. It must be a boolean. {clean_space} was passed.")
self.n_clusters = n_clusters
self.dele = dele
self.lamb = lamb
self.beta = beta
self.k = k
self.neighbor_mode = neighbor_mode
self.weight_mode = weight_mode
self.max_iter = max_iter
self.random_state = random_state
self.engine = engine
self.verbose = verbose
self.clean_space = clean_space
if self.engine == "octave":
octave_folder = dirname(__file__)
octave_folder = os.path.join(octave_folder, "_" + (os.path.basename(__file__).split(".")[0]))
self._octave_folder = octave_folder
octave_files = [x for x in os.listdir(octave_folder) if x.endswith(".m")]
self._oc = oct2py.Oct2Py(temp_dir= octave_folder)
for octave_file in octave_files:
with open(os.path.join(octave_folder, octave_file)) as f:
self._oc.eval(f.read())
[docs]
def fit(self, Xs, y=None):
r"""
Fit the transformer to the input data.
Parameters
----------
Xs : list of array-likes objects
- Xs length: n_mods
- Xs[i] shape: (n_samples, n_features_i)
A list of different modalities.
y : Ignored
Not used, present here for API consistency by convention.
Returns
-------
self : Fitted estimator.
"""
Xs = check_Xs_y(Xs, ensure_all_finite='allow-nan')
try:
assert self.n_clusters <= min([X.shape[1] for X in Xs])
except AssertionError:
raise ValueError(f"n_clusters ({self.n_clusters}) should be smaller or equal to " +
f"the smallest n_features_i ({min([X.shape[1] for X in Xs])}).")
transformed_Xs, observed_mod_indicator = self._process_xs(Xs)
if self.engine == "octave":
if self.random_state is not None:
self._oc.rand('seed', self.random_state)
v, loss = self._oc.PIMVC(tuple(transformed_Xs), self.n_clusters, observed_mod_indicator, self.lamb,
self.beta, self.max_iter, {"NeighborMode": self.neighbor_mode,
"WeightMode": self.weight_mode,
"k": self.k}, nout=2)
if self.clean_space:
self._clean_space()
elif self.engine == "python":
v, loss = self._pimvc(transformed_Xs, observed_mod_indicator.astype(float))
model = KMeans(n_clusters=self.n_clusters, n_init="auto", random_state=self.random_state)
v = v.T
self.labels_ = model.fit_predict(X=v)
self.embedding_ = v
self.loss_ = np.ravel(loss)
self.n_iter_ = len(self.loss_)
return self
def _predict(self, Xs):
r"""
Return clustering results for samples.
Parameters
----------
Xs : list of array-likes objects
- Xs length: n_mods
- Xs[i] shape: (n_samples, n_features_i)
A list of different modalities.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
return self.labels_
[docs]
def fit_predict(self, Xs, y=None):
r"""
Fit the model and return clustering results.
Convenience method; equivalent to calling fit(X) followed by predict(X).
Parameters
----------
Xs : list of array-likes objects
- Xs length: n_mods
- Xs[i] shape: (n_samples, n_features_i)
A list of different modalities.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
labels = self.fit(Xs)._predict(Xs)
return labels
def _clean_space(self):
[os.remove(os.path.join(self._octave_folder, x)) for x in ["reader.mat", "writer.mat"]]
self._oc.exit()
del self._oc
return None
def _process_xs(self, Xs):
if isinstance(Xs[0], pd.DataFrame):
transformed_Xs = [X.T.values for X in Xs]
observed_mod_indicator = get_observed_mod_indicator(Xs).values
elif isinstance(Xs[0], np.ndarray):
transformed_Xs = [X.T for X in Xs]
observed_mod_indicator = get_observed_mod_indicator(Xs)
elif isinstance(Xs[0], Tensor):
transformed_Xs = [X.T.numpy() for X in Xs]
observed_mod_indicator = get_observed_mod_indicator(Xs).numpy()
return transformed_Xs, observed_mod_indicator
def _pimvc(self, Xs_T, ind_folds):
n_mods = len(Xs_T)
n_samples = ind_folds.shape[0]
norm_x = 0.0
sum_obs_counts = np.zeros(n_samples) # linshi_GG
sum_laplacians = np.zeros((n_samples, n_samples)) # linshi_LS
Piv = [] # per-view projection matrices, (dim, n_features_v)
XG = [] # X_obs @ G.T, (n_features_v, n_samples)
St2_list = [] # whitening matrices, (n_features_v, n_features_v)
St3_list = [] # St2 @ X_obs @ G.T, (n_features_v, n_samples)
G_list = [] # selection matrices, (n_samples, n_observed_v)
X_obs_list = [] # observed data, (n_features_v, n_observed_v)
for iv in range(n_mods):
obs_idx = np.where(ind_folds[:, iv] == 1)[0]
n_obs = len(obs_idx)
# Remove missing sample columns: X{iv}(:, ind_0) = []
X_iv_obs = Xs_T[iv][:, obs_idx].copy()
X_obs_list.append(X_iv_obs)
n_features = X_iv_obs.shape[0]
# G{iv} = diag(ind_folds[:,iv]) with missing columns deleted
# → (n_samples, n_obs) matrix: G[obs_idx[j], j] = 1
G_iv = np.zeros((n_samples, n_obs))
G_iv[obs_idx, np.arange(n_obs)] = 1.0
G_list.append(G_iv)
# St2{iv} = (X_obs @ X_obs.T + lamb * I)^{-0.5}
St = X_iv_obs @ X_iv_obs.T + self.lamb * np.eye(n_features)
eigvals, eigvecs = np.linalg.eigh(St)
St2_iv = eigvecs @ np.diag(eigvals ** (-0.5)) @ eigvecs.T
St2_list.append(St2_iv)
# St3{iv} = St2{iv} * X_obs * G{iv}'
St3_list.append(St2_iv @ X_iv_obs @ G_iv.T)
# linshi_GG += ind_folds[:,iv]
sum_obs_counts += ind_folds[:, iv]
# Construct KNN graph on observed samples, then embed into full space
fea = X_iv_obs.T # (n_obs, n_features)
W_obs = self._construct_w(fea) # (n_obs, n_obs)
W_graph = (W_obs + W_obs.T) * 0.5
W_full = G_iv @ W_graph @ G_iv.T # (n_samples, n_samples)
sum_laplacians += np.diag(W_full.sum(axis=1)) - W_full
# PCA init: Piv{iv} = PCA1(X_obs.T, options_dim)'
eigvec = self._pca_init(X_iv_obs, self.n_clusters) # (n_features, dim)
Piv.append(eigvec.T) # (dim, n_features)
# XG{iv} = X_obs * G{iv}'
XG.append(X_iv_obs @ G_iv.T) # (n_features, n_samples)
norm_x += np.linalg.norm(X_iv_obs, 'fro') ** 2
# inv_GS = inv(linshi_LS * beta + diag(linshi_GG))
inv_GS = np.linalg.inv(sum_laplacians * self.beta + np.diag(sum_obs_counts))
# Y = rand(dim, numInst)
rng = np.random.default_rng(self.random_state)
Y = rng.random((self.n_clusters, n_samples))
obj = []
for iter_idx in range(self.max_iter):
# Update Y: Y = (sum_iv Piv_iv @ XG_iv) @ inv_GS
H1 = sum(Piv[iv] @ XG[iv] for iv in range(n_mods))
Y = H1 @ inv_GS
# Update Piv: closed-form via SVD
for iv in range(n_mods):
M = St3_list[iv] @ Y.T # (n_features, dim)
M = np.nan_to_num(M)
# svd(M', 'econ') where M' is (dim, n_features)
U, _, Vt = np.linalg.svd(M.T, full_matrices=False)
U = np.nan_to_num(U)
Vt = np.nan_to_num(Vt)
# Piv{iv} = U * V' * St2{iv} (V' in MATLAB = Vt in numpy)
Piv[iv] = U @ Vt @ St2_list[iv]
# Compute objective
obj_val = 0.0
for iv in range(n_mods):
diff = Piv[iv] @ X_obs_list[iv] - Y @ G_list[iv]
obj_val += np.sum(diff ** 2) + self.lamb * np.sum(Piv[iv] ** 2)
obj_val = (obj_val + self.beta * np.trace(Y @ sum_laplacians @ Y.T)) / norm_x
obj.append(obj_val)
if iter_idx > 2 and abs(obj[-1] - obj[-2]) < 1e-5:
break
return Y, np.array(obj)
@staticmethod
def _pca_init(X_obs, dim):
data = X_obs.T # (n_obs, n_features), rows are samples
data = data - data.mean(axis=0)
X = data.T # (n_features, n_obs) — argument passed to mySVD
n_smp, m_fea = X.shape # n_smp=n_features, m_fea=n_obs
if m_fea / n_smp > 1.0713:
# Case 1: eig(X @ X.T)
ddata = X @ X.T
ddata = (ddata + ddata.T) * 0.5 # enforce numerical symmetry
eigvals, U = np.linalg.eigh(ddata) # ascending order
idx = np.argsort(-eigvals) # sort descending
eigvals, U = eigvals[idx], U[:, idx]
else:
# Case 2: eig(X.T @ X), then U = X @ V / sqrt(eigvals)
ddata = X.T @ X
ddata = (ddata + ddata.T) * 0.5
eigvals, V = np.linalg.eigh(ddata)
idx = np.argsort(-eigvals)
eigvals, V = eigvals[idx], V[:, idx]
U = X @ (V / np.sqrt(np.maximum(eigvals, 0)))
# Remove near-zero eigenvectors (matching MATLAB's eigIdx filter)
max_eig = np.max(np.abs(eigvals))
keep = np.abs(eigvals) / max_eig >= 1e-10
U = U[:, keep]
return U[:, :dim] # (n_features, dim)
def _construct_w(self, fea):
n = fea.shape[0]
# Find k+1 neighbours (including self); octave code does the same then
# removes self-connections via W = W - diag(diag(W)).
nbrs = NearestNeighbors(n_neighbors=self.k + 1, algorithm='brute', metric='euclidean').fit(fea)
distances, indices = nbrs.kneighbors(fea)
rows = np.repeat(np.arange(n), self.k + 1)
cols = indices.ravel()
W = np.zeros((n, n))
if self.weight_mode == 'binary':
W[rows, cols] = 1.0
elif self.weight_mode == 'heatkernel':
# Compute t as mean of pairwise squared distances, matching MATLAB's EuDist2 + mean
if n > 3000:
sub = fea[:3000]
D2 = np.maximum(
np.sum(sub ** 2, axis=1, keepdims=True)
+ np.sum(sub ** 2, axis=1)
- 2 * sub @ sub.T, 0)
else:
D2 = np.maximum(
np.sum(fea ** 2, axis=1, keepdims=True)
+ np.sum(fea ** 2, axis=1)
- 2 * fea @ fea.T, 0)
t = D2.mean()
W[rows, cols] = np.exp(-distances.ravel() ** 2 / (2 * t ** 2))
elif self.weight_mode == 'cosine':
norms = np.linalg.norm(fea, axis=1, keepdims=True)
norms = np.where(norms == 0, 1e-12, norms)
fea_norm = fea / norms
W[rows, cols] = np.einsum('ij,ij->i', fea_norm[rows], fea_norm[cols])
# Remove self-connections and symmetrise (W = max(W, W'))
np.fill_diagonal(W, 0.0)
W = np.maximum(W, W.T)
return W