Source code for mlcluster

import math

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.metrics import (calinski_harabasz_score, davies_bouldin_score,
from sklearn.model_selection import ParameterGrid
from sklearn.pipeline import Pipeline

# 2021.09.14 Created by Daniel SY wang

[docs]class ClusterScorePlot(Pipeline): """ ClusterScorePlot inherited from Pipeline,score and plot cluster without labels. It's same as pipeline+gridseach. Parameters ---------- steps : list List of (name, transform) tuples (implementing fit/transform) that are chained, in the order in which they are chained, with the last object a cluster estimator. param_grid : list of dict usage reference example part. scoring : list list of 'si','ca','da' for cluster scoring, 'si' means metrics.silhouette_score, 'ca' means metrics.calinski_harabasz_score, 'da' means metrics.davies_bouldin_score. memory : str or object with the joblib.Memory interface, default=None Used to cache the fitted transformers of the pipeline. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attribute ``named_steps`` or ``steps`` to inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming. verbose : bool, default=False If True, the time elapsed while fitting each step will be printed as it is completed. Attributes ---------- named_steps : :class:`~sklearn.utils.Bunch` Dictionary-like object, with the following attributes. Read-only attribute to access any step parameter by user given name. Keys are step names and values are steps parameters. Examples -------- Get the best socres and paramters :: #setup steps: cls = [('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()), ('cluster',cluster.AgglomerativeClustering(n_clusters=5))] #setup param_grid: lr = LinearRegression() imp = IterativeImputer(estimator=lr,missing_values=np.nan, max_iter=10, verbose=2, imputation_order='roman',random_state=0) knn = KNNImputer(n_neighbors=2, add_indicator=True) param_grid = [ { 'imputer': [SimpleImputer(strategy='median'),imp,knn, None], 'scaler': [MinMaxScaler(),StandardScaler(),RobustScaler(),None], 'cluster':[cluster.AgglomerativeClustering(n_clusters=5), cluster.DBSCAN(eps=0.30, min_samples=10), cluster.KMeans(n_clusters=5), GaussianMixture(n_components=5)]}] #setup scoring: scoring = ['da', 'si', 'ca'] #get the best scores: clustersearch=Cluster_Score_Plot(steps=cls,param_grid=param_grid,scoring=scoring) score_df=clustersearch.get_score(data) score_df Polt scatter diagram for best paramters :: #param_grid : setup param_grid using best_score_df param list best_score_df=clustersearch.get_best_score(score_df,n_best_score=3) param_grid=list(best_score_df['param']) #plot scatter using best_score_df param clustersearch=Cluster_Score_Plot(steps=cls,param_grid=best_score_df,scoring=scoring) clustersearch.plot(data,param_type='',ncol=1) """ def __init__(self, steps, param_grid, scoring, memory=None, verbose=False): self.param_grid = param_grid self.scoring = scoring self.X_transformed = None super().__init__(steps=steps, memory=memory, verbose=verbose)
[docs] def fit(self, X, y=None, **fit_params): """Fit the model Fit all the transforms one after the other and transform the data, then fit the transformed data using the final estimator. Parameters ---------- X : iterable Training data. Must fulfill input requirements of first step of the pipeline. y : iterable, default=None Training targets. Must fulfill label requirements for all steps of the pipeline. **fit_params : dict of string -> object Parameters passed to the ``fit`` method of each step, where each parameter name is prefixed such that parameter ``p`` for step ``s`` has key ``s__p``. Returns ------- self : Pipeline This estimator """ fit_params_steps = self._check_fit_params(**fit_params) Xt = self._fit(X, y, **fit_params_steps) # 从数组删除含有nan的行 self.X_transformed = Xt[~np.isnan(Xt).any(axis=1)] if self._final_estimator != 'passthrough': fit_params_last_step = fit_params_steps[self.steps[-1][0]] self.X_transformed, y, **fit_params_last_step) return self
[docs] def predict(self, X=None, **predict_params): """Apply transforms to the data, and predict with the final estimator Parameters ---------- X : iterable Data to predict on. Must fulfill input requirements of first step of the pipeline. **predict_params : dict of string -> object Parameters to the ``predict`` called at the end of all transformations in the pipeline. Note that while this may be used to return uncertainties from some models with return_std or return_cov, uncertainties that are generated by the transformations in the pipeline are not propagated to the final estimator. .. versionadded:: 0.20 Returns ------- y_pred : array-like """ last_step = self._final_estimator fit_params_steps = self._check_fit_params(**predict_params) fit_params_last_step = fit_params_steps[self.steps[-1][0]] if hasattr(last_step, 'fit_predict'): return last_step.fit_predict(self.X_transformed, **fit_params_last_step) else: return last_step.predict(self.X_transformed, **predict_params)
[docs] def get_score(self, X): """Caculate X's cluster score Parameters ---------- X : df DataFrame Returns ------- df_score : df DataFrame contain param and scores """ all_score_list = [] param_grid_list = list(ParameterGrid(self.param_grid)) for param in param_grid_list: self.set_params(**param) labels = self.predict() score_list = [] # score_list.append(self.steps) score_list.append(param) for scorer in self.scoring: try: if scorer == 'si': cluster_score = silhouette_score( self.X_transformed, labels) elif scorer == 'ca': cluster_score = calinski_harabasz_score( self.X_transformed, labels) elif scorer == 'da': cluster_score = davies_bouldin_score( self.X_transformed, labels) except: cluster_score = np.nan score_list.append(cluster_score) #score_list.insert(0, self.named_steps) all_score_list.append(score_list) column_list = self.scoring.copy() column_list.insert(0, 'param') df_score = pd.DataFrame(all_score_list, columns=column_list) return df_score
[docs] def get_best_score(self, data, n_best_score=3): """Return the best score DataFrame Parameters ---------- data : df DataFrame of param and scores n_best_score : n best score for each scoring method Returns ------- data_best_score : df DataFrame of the best score and param """ score_list = list(data.columns) score_list.remove('param') str_a = score_list.pop() if str_a == 'da': idx1 = data.sort_values(str_a, ascending=True).head(n_best_score).index else: idx1 = data.sort_values(str_a, ascending=False).head(n_best_score).index while score_list: str_a = score_list.pop() if str_a == 'da': idx2 = data.sort_values(str_a, ascending=True).head(n_best_score).index else: idx2 = data.sort_values(str_a, ascending=False).head(n_best_score).index idx1 = idx1.union(idx2) data_best_score = data.iloc[idx1] return data_best_score
[docs] def plot(self, X, param_type='grid', ncol=1, figsize_x=10, scale_y=1): """Preprocessing transform X,then TSNE transform, plot scatterplot Parameters ---------- X : data DataFrame param_type : string 'grid'(default) for param_grid,'' for best_score param. ncol: int figure number in a row figsize_x : float sub-figure width scale_y : float scale height Returns ------- return : None plot multiple figures """ if param_type == 'grid': param_grid_list = list(ParameterGrid(self.param_grid)) else: param_grid_list = self.param_grid nrow = math.ceil(len(param_grid_list)/ncol) figsize_y = nrow*figsize_x/ncol*scale_y fig, axes = plt.subplots( nrows=nrow, ncols=ncol, figsize=(figsize_x, figsize_y)) for param, ax in zip(param_grid_list, axes.flat): self.set_params(**param) labels = self.predict() tsne = TSNE(random_state=42) X_tsne = tsne.fit_transform(self.X_transformed) ax.set_title(str(param)) sns.scatterplot(x=X_tsne[:, 0], y=X_tsne[:, 1], hue=labels, size=labels, style=labels, data=X_tsne, ax=ax) return