from typing import Callable, List, Optional, Tuple, TypeVar
import numpy as np
import scipy.sparse as sps
from ._myfm import (
ConfigBuilder,
FMLearningConfig,
RelationBlock,
VariationalFM,
VariationalFMHyperParameters,
VariationalLearningHistory,
VariationalPredictor,
create_train_vfm,
)
from .base import (
REAL,
ArrayLike,
ClassifierMixin,
MyFMBase,
RegressorMixin,
check_data_consistency,
)
ArrayOrDenseArray = TypeVar("ArrayOrDenseArray", np.ndarray, float)
def runtime_error_to_optional(
fm: "MyFMVariationalBase",
retrieve_method: Callable[[VariationalFM], ArrayOrDenseArray],
) -> Optional[ArrayOrDenseArray]:
try:
predictor = fm._fetch_predictor()
except:
return None
weights = predictor.weights()
return retrieve_method(weights)
class MyFMVariationalBase(
MyFMBase[
VariationalFM,
VariationalFMHyperParameters,
VariationalPredictor,
VariationalLearningHistory,
]
):
@property
def w0_mean(self) -> Optional[float]:
r"""Mean of variational posterior distribution of global bias `w0`.
If the model is not fit yet, returns `None`.
Returns:
Mean of variational posterior distribution of global bias `w0`.
"""
def _retrieve(fm: VariationalFM) -> float:
return fm.w0
return runtime_error_to_optional(self, _retrieve)
@property
def w0_var(self) -> Optional[float]:
r"""Variance of variational posterior distribution of global bias `w0`.
If the model is not fit yet, returns `None`.
Returns:
Variance of variational posterior distribution of global bias `w0`.
"""
def _retrieve(fm: VariationalFM) -> float:
return fm.w0_var
return runtime_error_to_optional(self, _retrieve)
@property
def w_mean(self) -> Optional[np.ndarray]:
r"""Mean of variational posterior distribution of linear coefficnent `w`.
If the model is not fit yet, returns `None`.
Returns:
Mean of variational posterior distribution of linear coefficnent `w`.
"""
def _retrieve(fm: VariationalFM) -> np.ndarray:
return fm.w
return runtime_error_to_optional(self, _retrieve)
@property
def w_var(self) -> Optional[np.ndarray]:
r"""Variance of variational posterior distribution of linear coefficnent `w`.
If the model is not fit yet, returns `None`.
Returns:
Variance of variational posterior distribution of linear coefficnent `w`.
"""
def _retrieve(fm: VariationalFM) -> np.ndarray:
return fm.w_var
return runtime_error_to_optional(self, _retrieve)
@property
def V_mean(self) -> Optional[np.ndarray]:
r"""Mean of variational posterior distribution of factorized quadratic coefficnent `V`.
If the model is not fit yet, returns `None`.
Returns:
Mean of variational posterior distribution of factorized quadratic coefficient `V`.
"""
def _retrieve(fm: VariationalFM) -> np.ndarray:
return fm.V
return runtime_error_to_optional(self, _retrieve)
@property
def V_var(self) -> Optional[np.ndarray]:
r"""Variance of variational posterior distribution of factorized quadratic coefficnent `V`.
If the model is not fit yet, returns `None`.
Returns:
Variance of variational posterior distribution of factorized quadratic coefficient `V`.
"""
def _retrieve(fm: VariationalFM) -> np.ndarray:
return fm.V_var
return runtime_error_to_optional(self, _retrieve)
@classmethod
def _train_core(
cls,
rank: int,
init_stdev: float,
X: sps.csr_matrix,
X_rel: List[RelationBlock],
y: np.ndarray,
random_seed: int,
config: FMLearningConfig,
callback: Callable[
[
int,
VariationalFM,
VariationalFMHyperParameters,
VariationalLearningHistory,
],
bool,
],
) -> Tuple[VariationalPredictor, VariationalLearningHistory]:
return create_train_vfm(
rank, init_stdev, X, X_rel, y, random_seed, config, callback
)
def _predict_core(
self,
X: Optional[ArrayLike],
X_rel: List[RelationBlock] = [],
) -> np.ndarray:
predictor = self._fetch_predictor()
shape = check_data_consistency(X, X_rel)
if X is None:
X = sps.csr_matrix((shape, 0), dtype=REAL)
else:
X = sps.csr_matrix(X)
return predictor.predict(X, X_rel)
[docs]class VariationalFMRegressor(
RegressorMixin[VariationalFM, VariationalFMHyperParameters],
MyFMVariationalBase,
):
"""Variational Inference for Regression Task."""
[docs] def fit(
self,
X: ArrayLike,
y: np.ndarray,
X_rel: List[RelationBlock] = [],
X_test: Optional[ArrayLike] = None,
y_test: Optional[np.ndarray] = None,
X_rel_test: List[RelationBlock] = [],
n_iter: int = 100,
grouping: Optional[List[int]] = None,
group_shapes: Optional[List[int]] = None,
callback: Optional[
Callable[
[
int,
VariationalFM,
VariationalFMHyperParameters,
VariationalLearningHistory,
],
Tuple[bool, Optional[str]],
]
] = None,
config_builder: Optional[ConfigBuilder] = None,
) -> "VariationalFMRegressor":
r"""Performs batch variational inference fit the data.
Parameters
----------
X : 2D array-like.
Input variable.
y : 1D array-like.
Target variable.
X_rel: list of RelationBlock, optional (default=[])
Relation blocks which supplements X.
n_iter : int, optional (default = 100)
Iterations to perform.
grouping: Integer List, optional (default = None)
If not `None`, this specifies which column of X belongs to which group.
That is, if grouping[i] is g, then, :math:`w_i` and :math:`V_{i, r}`
will be distributed according to
:math:`\mathcal{N}(\mu_w[g], \lambda_w[g])` and :math:`\mathcal{N}(\mu_V[g, r], \lambda_V[g,r])`,
respectively.
If `None`, all the columns of X are assumed to belong to a single group, 0.
group_shapes: Integer array, optional (default = None)
If not `None`, this specifies each variable group's size.
Ignored if grouping is not None.
For example, if ``group_shapes = [n_1, n_2]``,
this is equivalent to ``grouping = [0] * n_1 + [1] * n_2``
callback: function(int, fm, hyper, history) -> bool, optional(default = None)
Called at the every end of each Gibbs iteration.
"""
self._fit(
X,
y,
X_rel=X_rel,
X_test=X_test,
X_rel_test=X_rel_test,
y_test=y_test,
n_iter=n_iter,
grouping=grouping,
callback=callback,
group_shapes=group_shapes,
config_builder=config_builder,
)
return self
[docs] def predict(
self, X: Optional[ArrayLike], X_rel: List[RelationBlock] = []
) -> np.ndarray:
r"""Make a prediction based on variational mean.
Parameters
----------
X : Optional[ArrayLike]
Main Table. When None, treated as a matrix without columns.
X_rel : List[RelationBlock], optional
Relations, by default []
Returns
-------
np.ndarray
[description]
"""
return self._predict_core(X, X_rel)
[docs]class VariationalFMClassifier(
ClassifierMixin[VariationalFM, VariationalFMHyperParameters],
MyFMVariationalBase,
):
"""Variational Inference for Classification Task."""
[docs] def fit(
self,
X: ArrayLike,
y: np.ndarray,
X_rel: List[RelationBlock] = [],
X_test: Optional[ArrayLike] = None,
y_test: Optional[np.ndarray] = None,
X_rel_test: List[RelationBlock] = [],
n_iter: int = 100,
grouping: Optional[List[int]] = None,
group_shapes: Optional[List[int]] = None,
callback: Optional[
Callable[
[
int,
VariationalFM,
VariationalFMHyperParameters,
VariationalLearningHistory,
],
Tuple[bool, Optional[str]],
]
] = None,
config_builder: Optional[ConfigBuilder] = None,
) -> "VariationalFMClassifier":
r"""Performs batch variational inference fit the data.
Parameters
----------
X : Optional[ArrayLike].
Main table. When None, treated as a matrix without columns.
y : 1D array-like.
Target variable.
X_rel: list of RelationBlock, optional (default=[])
Relation blocks which supplements X.
n_iter : int, optional (default = 100)
Iterations to perform.
grouping: Integer List, optional (default = None)
If not `None`, this specifies which column of X belongs to which group.
That is, if grouping[i] is g, then, :math:`w_i` and :math:`V_{i, r}`
will be distributed according to
:math:`\mathcal{N}(\mu_w[g], \lambda_w[g])` and :math:`\mathcal{N}(\mu_V[g, r], \lambda_V[g,r])`,
respectively.
If `None`, all the columns of X are assumed to belong to a single group, 0.
group_shapes: Integer array, optional (default = None)
If not `None`, this specifies each variable group's size.
Ignored if grouping is not None.
For example, if ``group_shapes = [n_1, n_2]``,
this is equivalent to ``grouping = [0] * n_1 + [1] * n_2``
callback: function(int, fm, hyper) -> bool, optional(default = None)
Called at the every end of each Gibbs iteration.
"""
self._fit(
X,
y,
X_rel=X_rel,
X_test=X_test,
X_rel_test=X_rel_test,
y_test=y_test,
n_iter=n_iter,
grouping=grouping,
callback=callback,
group_shapes=group_shapes,
config_builder=config_builder,
)
return self
[docs] def predict(
self, X: Optional[ArrayLike], X_rel: List[RelationBlock] = []
) -> np.ndarray:
r"""Based on the class probability, return binary classified outcome based on threshold = 0.5.
If you want class probability instead, use `predict_proba` method.
Parameters
----------
X : Optional[ArrayLike]
Main Table. When None, treated as a matrix without columns.
X_rel : List[RelationBlock], optional
Relations, by default []
Returns
-------
np.ndarray
0/1 predictions based on the probability.
"""
return self.predict_proba(X, X_rel) > 0.5
[docs] def predict_proba(
self, X: Optional[ArrayLike], X_rel: List[RelationBlock] = []
) -> np.ndarray:
r"""Compute the probability that the outcome will be 1 based on variational mean.
Parameters
----------
X : Optional[ArrayLike]
Main Table. When None, treated as a matrix without columns.
X_rel : List[RelationBlock], optional
Relations, by default []
Returns
-------
np.ndarray
the probability.
"""
return self._predict_core(X, X_rel)