数学推导

为了将线性模型用于分类问题,需要寻找一个单调可微的函数将分类任务的真实标签y与线性回归模型的预测值z联系起来
image-1665883409705
交叉熵损失函数是一种常用于分类任务的损失函数,设线性模型的对数几率函数输出为y ̂,而二分类标签值为y={0, 1},那么交叉熵损失为:
image-1665883480020
二值交叉熵用于二分类问题,从上面的公式和函数图像可以看出,当估计值为1,而标签值为0;或者估计值为0,而标签值为1时,将产生非常大的损失值,且损失值的大小仅与对应分类的预测值大小有关,与对其它分类的预测值大小无关。
使用线性分类器进行分类预测,就是要利用样本和标签数据,训练出一组ω∗和b∗,使得模型估计的损失值最小
image-1665883521288
根据梯度下降优化方法,可以求出函数L的梯度∂L/∂ω和∂L/∂b,然后利用梯度和学习率η不断更新ω和b,最终求得优化的ω∗和b
image-1665883544473
根据求出的梯度值可以得到ω和b 的更新公式
image-1665883580709
若有m个样本{x_1y_1,x_2y_2⋯x_my_m},训练线性模型的公式为:
image-1665883612313

代码示例

过程:
生成700 * 2的数据x,然后训练2 * 1的w,通过X*W得到700 * 1的z,再根据Y的正确标签值进行梯度下降,得到最合适的w和b。

# 对数几率回归分类器
class LogisticRegression(object):

    def __init__(self, learning_rate=0.1, max_iter=100, seed=None):
        self.seed = seed
        self.lr = learning_rate
        self.max_iter = max_iter
        self.w = None
        self.b = None
        self.x = None
        self.y = None

    # 迭代训练
    def fit(self, x, y):
        np.random.seed(self.seed)
        self.w = np.random.normal(loc=0.0, scale=1.0, size=x.shape[1])
        self.b = np.random.normal(loc=0.0, scale=1.0)
        self.x = x
        self.y = y
        for i in range(self.max_iter):
            self._update_step()
            # print('loss: \t{}'.format(self.loss()))
            # print('score: \t{}'.format(self.score()))
            print('w: \t{}'.format(self.w))
            print('b: \t{}'.format(self.b))

    def _sigmoid(self, z):
        return 1.0 / (1.0 + np.exp(-z))

    def _f(self, x, w, b):
        z = x.dot(w) + b
        return self._sigmoid(z)

    def predict_proba(self, x=None):
        if x is None:
            x = self.x
        y_pred = self._f(x, self.w, self.b)
        return y_pred

    def predict(self, x=None):
        if x is None:
            x = self.x
        y_pred_proba = self._f(x, self.w, self.b)
        y_pred = np.array([0 if y_pred_proba[i] < 0.5 else 1 for i in range(len(y_pred_proba))])
        return y_pred

    def score(self, y_true=None, y_pred=None):
        if y_true is None or y_pred is None:
            y_true = self.y
            y_pred = self.predict()
        acc = np.mean([1 if y_true[i] == y_pred[i] else 0 for i in range(len(y_true))])
        return acc

    def loss(self, y_true=None, y_pred_proba=None):
        if y_true is None or y_pred_proba is None:
            y_true = self.y
            y_pred_proba = self.predict_proba()
        return np.mean(-1.0 * (y_true * np.log(y_pred_proba) + (1.0 - y_true) * np.log(1.0 - y_pred_proba)))

    def _calc_gradient(self):
        y_pred = self.predict_proba()
        d_w = (y_pred - self.y).dot(self.x) / len(self.y)
        d_b = np.mean(y_pred - self.y)
        return d_w, d_b

    def _update_step(self):
        d_w, d_b = self._calc_gradient()
        self.w -= self.lr * d_w
        self.b -= self.lr * d_b
        return self.w, self.b

# 生成数据集
def generate_dataset(seed):
    np.random.seed(seed)
    data_size_1 = 300
    x1_1 = np.random.normal(loc=5.0, scale=1.0, size=data_size_1)
    x2_1 = np.random.normal(loc=4.0, scale=1.0, size=data_size_1)
    y_1 = [0] * data_size_1
    data_size_2 = 400
    x1_2 = np.random.normal(loc=10.0, scale=2.0, size=data_size_2)
    x2_2 = np.random.normal(loc=8.0, scale=2.0, size=data_size_2)
    y_2 = [1] * data_size_2
    x1 = np.concatenate((x1_1, x1_2), axis=0)
    x2 = np.concatenate((x2_1, x2_2), axis=0)
    x = np.hstack((x1.reshape(-1, 1), x2.reshape(-1, 1)))
    y = np.concatenate((y_1, y_2), axis=0)
    data_size_all = data_size_1 + data_size_2

    # 随机打乱样本点顺序
    shuffled_index = np.random.permutation(data_size_all)
    x = x[shuffled_index]
    y = y[shuffled_index]
    return x, y


# 将数据集分成训练和测试集
def train_test_split(x, y):
    split_index = int(len(y) * 0.7)
    x_train = x[:split_index]
    y_train = y[:split_index]
    x_test = x[split_index:]
    y_test = y[split_index:]
    return x_train, y_train, x_test, y_test


if __name__ == "__main__":
    # 产生数据集
    x, y = generate_dataset(seed=272)
    x_train, y_train, x_test, y_test = train_test_split(x, y)

    # 数据归一化
    x_train = (x_train - np.min(x_train, axis=0)) / (np.max(x_train, axis=0) - np.min(x_train, axis=0))
    x_test = (x_test - np.min(x_test, axis=0)) / (np.max(x_test, axis=0) - np.min(x_test, axis=0))

    # 训练对数几率回归分类器
    clf = LogisticRegression(learning_rate=0.1, max_iter=500, seed=272)
    clf.fit(x_train, y_train)
    print('w: \t{}'.format(clf.w))
    print('b: \t{}'.format(clf.b))
    # 拟合分界线
    split_boundary_func = lambda x: (-clf.b - clf.w[0] * x) / clf.w[1]
    xx = np.arange(0.1, 0.6, 0.1)
    plt.scatter(x_train[:, 0], x_train[:, 1], c=y_train, marker='.')
    plt.plot(xx, split_boundary_func(xx), c='red')
    plt.show()

    # 输出测试集上的损失
    y_test_pred = clf.predict(x_test)
    y_test_pred_proba = clf.predict_proba(x_test)
    print(clf.score(y_test, y_test_pred))
    print(clf.loss(y_test, y_test_pred_proba))
    # print(y_test_pred_proba)

训练结果

image-1665888178981