import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.metrics import (calinski_harabasz_score, davies_bouldin_score,
silhouette_score)
from sklearn.model_selection import ParameterGrid
from sklearn.pipeline import Pipeline
# 2021.09.14 Created by Daniel SY wang
[docs]class ClusterScorePlot(Pipeline):
"""
ClusterScorePlot inherited from Pipeline,score and plot cluster without labels.
It's same as pipeline+gridseach.
Parameters
----------
steps : list
List of (name, transform) tuples (implementing fit/transform) that are
chained, in the order in which they are chained, with the last object
a cluster estimator.
param_grid : list of dict
usage reference example part.
scoring : list
list of 'si','ca','da' for cluster scoring,
'si' means metrics.silhouette_score,
'ca' means metrics.calinski_harabasz_score,
'da' means metrics.davies_bouldin_score.
memory : str or object with the joblib.Memory interface, default=None
Used to cache the fitted transformers of the pipeline. By default,
no caching is performed. If a string is given, it is the path to
the caching directory. Enabling caching triggers a clone of
the transformers before fitting. Therefore, the transformer
instance given to the pipeline cannot be inspected
directly. Use the attribute ``named_steps`` or ``steps`` to
inspect estimators within the pipeline. Caching the
transformers is advantageous when fitting is time consuming.
verbose : bool, default=False
If True, the time elapsed while fitting each step will be printed as it
is completed.
Attributes
----------
named_steps : :class:`~sklearn.utils.Bunch`
Dictionary-like object, with the following attributes.
Read-only attribute to access any step parameter by user given name.
Keys are step names and values are steps parameters.
Examples
--------
Get the best socres and paramters
::
#setup steps:
cls = [('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('cluster',cluster.AgglomerativeClustering(n_clusters=5))]
#setup param_grid:
lr = LinearRegression()
imp = IterativeImputer(estimator=lr,missing_values=np.nan, max_iter=10, verbose=2, imputation_order='roman',random_state=0)
knn = KNNImputer(n_neighbors=2, add_indicator=True)
param_grid = [
{ 'imputer': [SimpleImputer(strategy='median'),imp,knn, None],
'scaler': [MinMaxScaler(),StandardScaler(),RobustScaler(),None],
'cluster':[cluster.AgglomerativeClustering(n_clusters=5),
cluster.DBSCAN(eps=0.30, min_samples=10),
cluster.KMeans(n_clusters=5),
GaussianMixture(n_components=5)]}]
#setup scoring:
scoring = ['da', 'si', 'ca']
#get the best scores:
clustersearch=Cluster_Score_Plot(steps=cls,param_grid=param_grid,scoring=scoring)
score_df=clustersearch.get_score(data)
score_df
Polt scatter diagram for best paramters
::
#param_grid : setup param_grid using best_score_df param list
best_score_df=clustersearch.get_best_score(score_df,n_best_score=3)
param_grid=list(best_score_df['param'])
#plot scatter using best_score_df param
clustersearch=Cluster_Score_Plot(steps=cls,param_grid=best_score_df,scoring=scoring)
clustersearch.plot(data,param_type='',ncol=1)
"""
def __init__(self, steps, param_grid, scoring, memory=None, verbose=False):
self.param_grid = param_grid
self.scoring = scoring
self.X_transformed = None
super().__init__(steps=steps, memory=memory, verbose=verbose)
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the model
Fit all the transforms one after the other and transform the
data, then fit the transformed data using the final estimator.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first step of the
pipeline.
y : iterable, default=None
Training targets. Must fulfill label requirements for all steps of
the pipeline.
**fit_params : dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
Returns
-------
self : Pipeline
This estimator
"""
fit_params_steps = self._check_fit_params(**fit_params)
Xt = self._fit(X, y, **fit_params_steps)
# 从数组删除含有nan的行
self.X_transformed = Xt[~np.isnan(Xt).any(axis=1)]
if self._final_estimator != 'passthrough':
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(
self.X_transformed, y, **fit_params_last_step)
return self
[docs] def predict(self, X=None, **predict_params):
"""Apply transforms to the data, and predict with the final estimator
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step
of the pipeline.
**predict_params : dict of string -> object
Parameters to the ``predict`` called at the end of all
transformations in the pipeline. Note that while this may be
used to return uncertainties from some models with return_std
or return_cov, uncertainties that are generated by the
transformations in the pipeline are not propagated to the
final estimator.
.. versionadded:: 0.20
Returns
-------
y_pred : array-like
"""
last_step = self._final_estimator
fit_params_steps = self._check_fit_params(**predict_params)
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
if hasattr(last_step, 'fit_predict'):
return last_step.fit_predict(self.X_transformed, **fit_params_last_step)
else:
return last_step.predict(self.X_transformed, **predict_params)
[docs] def get_score(self, X):
"""Caculate X's cluster score
Parameters
----------
X : df
DataFrame
Returns
-------
df_score : df
DataFrame contain param and scores
"""
all_score_list = []
param_grid_list = list(ParameterGrid(self.param_grid))
for param in param_grid_list:
self.set_params(**param)
self.fit(X)
labels = self.predict()
score_list = []
# score_list.append(self.steps)
score_list.append(param)
for scorer in self.scoring:
try:
if scorer == 'si':
cluster_score = silhouette_score(
self.X_transformed, labels)
elif scorer == 'ca':
cluster_score = calinski_harabasz_score(
self.X_transformed, labels)
elif scorer == 'da':
cluster_score = davies_bouldin_score(
self.X_transformed, labels)
except:
cluster_score = np.nan
score_list.append(cluster_score)
#score_list.insert(0, self.named_steps)
all_score_list.append(score_list)
column_list = self.scoring.copy()
column_list.insert(0, 'param')
df_score = pd.DataFrame(all_score_list, columns=column_list)
return df_score
[docs] def get_best_score(self, data, n_best_score=3):
"""Return the best score DataFrame
Parameters
----------
data : df
DataFrame of param and scores
n_best_score : n best score for each scoring method
Returns
-------
data_best_score : df
DataFrame of the best score and param
"""
score_list = list(data.columns)
score_list.remove('param')
str_a = score_list.pop()
if str_a == 'da':
idx1 = data.sort_values(str_a, ascending=True).head(n_best_score).index
else:
idx1 = data.sort_values(str_a, ascending=False).head(n_best_score).index
while score_list:
str_a = score_list.pop()
if str_a == 'da':
idx2 = data.sort_values(str_a, ascending=True).head(n_best_score).index
else:
idx2 = data.sort_values(str_a, ascending=False).head(n_best_score).index
idx1 = idx1.union(idx2)
data_best_score = data.iloc[idx1]
return data_best_score
[docs] def plot(self, X, param_type='grid', ncol=1, figsize_x=10, scale_y=1):
"""Preprocessing transform X,then TSNE transform, plot scatterplot
Parameters
----------
X : data
DataFrame
param_type : string
'grid'(default) for param_grid,'' for best_score param.
ncol: int
figure number in a row
figsize_x : float
sub-figure width
scale_y : float
scale height
Returns
-------
return : None
plot multiple figures
"""
if param_type == 'grid':
param_grid_list = list(ParameterGrid(self.param_grid))
else:
param_grid_list = self.param_grid
nrow = math.ceil(len(param_grid_list)/ncol)
figsize_y = nrow*figsize_x/ncol*scale_y
fig, axes = plt.subplots(
nrows=nrow, ncols=ncol, figsize=(figsize_x, figsize_y))
for param, ax in zip(param_grid_list, axes.flat):
self.set_params(**param)
self.fit(X)
labels = self.predict()
tsne = TSNE(random_state=42)
X_tsne = tsne.fit_transform(self.X_transformed)
ax.set_title(str(param))
sns.scatterplot(x=X_tsne[:, 0], y=X_tsne[:, 1], hue=labels, size=labels, style=labels,
data=X_tsne, ax=ax)
return