Python自定義指標聚類實例代碼

前言

最近在研究 Yolov2 論文的時候,發現作者在做先驗框聚類使用的指標並非歐式距離,而是IOU。在找瞭很多資料之後,基本確定 Python 沒有自定義指標聚類的函數,所以打算自己做一個

設訓練集的 shape 是 [n_sample, n_feature],基本思路是:

  • 簇中心初始化:第 1 個簇中心取樣本的特征均值,shape = [n_feature, ];從第 2 個簇中心開始,用距離函數 (自定義) 計算每個樣本到最近中心點的距離,歸一化後作為選取下一個簇中心的概率 —— 迭代到選取到足夠的簇中心為止
  • 簇中心調整:訓練多輪,每一輪以樣本點到最近中心點的距離之和作為 loss,梯度下降法 + Adam 優化器逼近最優解,在 loss 浮動值小於閾值的次數達到一定值時停止訓練

因為設計之初就打算使用自定義距離函數,所以求導是很大的難題。筆者不才,最終決定借助 PyTorch 自動求導的天然優勢

先給出歐式距離的計算函數

def Eu_dist(data, center):
    """ 以 歐氏距離 為聚類準則的距離計算函數
        data: 形如 [n_sample, n_feature] 的 tensor
        center: 形如 [n_cluster, n_feature] 的 tensor"""
    data = data.unsqueeze(1)
    center = center.unsqueeze(0)
    dist = ((data - center) ** 2).sum(dim=2)
    return dist

然後就是聚類器的代碼:使用時隻需關註 __init__、fit、classify 函數

import torch
import numpy as np
import matplotlib.pyplot as plt
Adam = torch.optim.Adam
 
def get_progress(current, target, bar_len=30):
    """ current: 當前完成任務數
        target: 任務總數
        bar_len: 進度條長度
        return: 進度條字符串"""
    assert current <= target
    percent = round(current / target * 100, 1)
    unit = 100 / bar_len
    solid = int(percent / unit)
    hollow = bar_len - solid
    return "■" * solid + "□" * hollow + f" {current}/{target}({percent}%)"
 
 
class Cluster:
    """ 聚類器
        n_cluster: 簇中心數
        dist_fun: 距離計算函數
            kwargs:
                data: 形如 [n_sample, n_feather] 的 tensor
                center: 形如 [n_cluster, n_feature] 的 tensor
            return: 形如 [n_sample, n_cluster] 的 tensor
        init: 初始簇中心
        max_iter: 最大迭代輪數
        lr: 中心點坐標學習率
        stop_thresh: 停止訓練的loss浮動閾值
        cluster_centers_: 聚類中心
        labels_: 聚類結果"""
 
    def __init__(self, n_cluster, dist_fun, init=None, max_iter=300, lr=0.08, stop_thresh=1e-4):
        self._n_cluster = n_cluster
        self._dist_fun = dist_fun
        self._max_iter = max_iter
        self._lr = lr
        self._stop_thresh = stop_thresh
        # 初始化參數
        self.cluster_centers_ = None if init is None else torch.FloatTensor(init)
        self.labels_ = None
        self._bar_len = 20
 
    def fit(self, data):
        """ data: 形如 [n_sample, n_feature] 的 tensor
            return: loss浮動日志"""
        if self.cluster_centers_ is None:
            self._init_cluster(data, self._max_iter // 5)
        log = self._train(data, self._max_iter, self._lr)
        # 開始若幹輪次的訓練,得到loss浮動日志
        return log
 
    def classify(self, data, show=False):
        """ data: 形如 [n_sample, n_feature] 的 tensor
            show: 繪制分類結果
            return: 分類標簽"""
        dist = self._dist_fun(data, self.cluster_centers_)
        self.labels_ = dist.argmin(axis=1)
        # 將標簽加載到實例屬性
        if show:
            for idx in range(self._n_cluster):
                container = data[self.labels_ == idx]
                plt.scatter(container[:, 0], container[:, 1], alpha=0.7)
            plt.scatter(self.cluster_centers_[:, 0], self.cluster_centers_[:, 1], c="gold", marker="p", s=50)
            plt.show()
        return self.labels_
 
    def _init_cluster(self, data, epochs):
        self.cluster_centers_ = data.mean(dim=0).reshape(1, -1)
        for idx in range(1, self._n_cluster):
            dist = np.array(self._dist_fun(data, self.cluster_centers_).min(dim=1)[0])
            new_cluster = data[np.random.choice(range(data.shape[0]), p=dist / dist.sum())].reshape(1, -1)
            # 取新的中心點
            self.cluster_centers_ = torch.cat([self.cluster_centers_, new_cluster], dim=0)
            progress = get_progress(idx, self._n_cluster, bar_len=self._n_cluster if self._n_cluster <= self._bar_len else self._bar_len)
            print(f"\rCluster Init: {progress}", end="")
            self._train(data, epochs, self._lr * 2.5, init=True)
            # 初始化簇中心時使用較大的lr
 
    def _train(self, data, epochs, lr, init=False):
        center = self.cluster_centers_.cuda()
        center.requires_grad = True
        data = data.cuda()
        optimizer = Adam([center], lr=lr)
        # 將中心數據加載到 GPU 上
        init_patience = int(epochs ** 0.5)
        patience = init_patience
        update_log = []
        min_loss = np.inf
        for epoch in range(epochs):
            # 對樣本分類並更新中心點
            sample_dist = self._dist_fun(data, center).min(dim=1)
            self.labels_ = sample_dist[1]
            loss = sum([sample_dist[0][self.labels_ == idx].mean() for idx in range(len(center))])
            # loss 函數: 所有樣本到中心點的最小距離和 - 中心點間的最小間隔
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            # 反向傳播梯度更新中心點
            loss = loss.item()
            progress = min_loss - loss
            update_log.append(progress)
            if progress > 0:
                self.cluster_centers_ = center.cpu().detach()
                min_loss = loss
                # 脫離計算圖後記錄中心點
            if progress < self._stop_thresh:
                patience -= 1
                # 耐心值減少
                if patience < 0:
                    break
                    # 耐心值歸零時退出
            else:
                patience = init_patience
                # 恢復耐心值
            progress = get_progress(init_patience - patience, init_patience, bar_len=self._bar_len)
            if not init:
                print(f"\rCluster: {progress}\titer: {epoch + 1}", end="")
        if not init:
            print("")
        return torch.FloatTensor(update_log)

與KMeans++比較

KMeans++ 是以歐式距離為聚類準則的經典聚類算法。在 iris 數據集上,KMeans++ 遠遠快於我的聚類器。但在我反復對比測試的幾輪裡,我的聚類器精度也是不差的 —— 可以看到下圖裡的聚類結果完全一致

  KMeans++ My Cluster
Cost 145 ms 1597 ms
Center

[[5.9016, 2.7484, 4.3935, 1.4339],

[5.0060, 3.4280, 1.4620, 0.2460],
[6.8500, 3.0737, 5.7421, 2.0711]]

[[5.9016, 2.7485, 4.3934, 1.4338],
[5.0063, 3.4284, 1.4617, 0.2463],
[6.8500, 3.0741, 5.7420, 2.0714]]

雖然速度方面與老牌算法對比的確不行,但是我的這個聚類器最大的亮點還是自定義距離函數

Yolo 檢測框聚類

本來想用 Yolov4 檢測框聚類引入的 CIoU 做聚類,但是沒法解決梯度彌散的問題,所以退其次用瞭 DIoU

def DIoU_dist(boxes, anchor):
    """ 以 DIoU 為聚類準則的距離計算函數
        boxes: 形如 [n_sample, 2] 的 tensor
        anchor: 形如 [n_cluster, 2] 的 tensor"""
    n_sample = boxes.shape[0]
    n_cluster = anchor.shape[0]
    dist = Eu_dist(boxes, anchor)
    # 計算歐式距離
    union_inter = torch.prod(boxes, dim=1).reshape(-1, 1) + torch.prod(anchor, dim=1).reshape(1, -1)
    boxes = boxes.unsqueeze(1).repeat(1, n_cluster, 1)
    anchor = anchor.unsqueeze(0).repeat(n_sample, 1, 1)
    compare = torch.stack([boxes, anchor], dim=2)
    # 組合檢測框與 anchor 的信息
    diag = torch.sum(compare.max(dim=2)[0] ** 2, dim=2)
    dist /= diag
    # 計算外接矩形的對角線長度
    inter = torch.prod(compare.min(dim=2)[0], dim=2)
    iou = inter / (union_inter - inter)
    # 計算 IoU
    dist += 1 - iou
    return dist

我提取瞭 DroneVehicle 數據集的 650156 個預測框的尺寸做聚類,在這個過程中發現因為小尺寸的預測框過多,導致聚類中心聚集在原點附近。所以對 loss 函數做瞭改進:先分類,再計算每個分類下的最大距離之和

橫軸表示檢測框的寬度,縱軸表示檢測框的高度,其數值都是相對於原圖尺寸的比例。若原圖尺寸為 608 * 608,則得到的 9 個先驗框為:

[ 2,  3 ] [ 9,  13 ] [ 19,  35 ]
[ 10,  76 ] [ 60,  14 ] [ 25,  134 ]
[ 167,  25 ] [ 115,  54 ] [ 70, 176 ]

總結

到此這篇關於Python自定義指標聚類的文章就介紹到這瞭,更多相關Python自定義指標聚類內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: