根本没有正确的选择,我们只有靠奋斗来使当初的选择显得正确。

—— 一个有思考的程序员

【机器学习】

标签:
监督学习
拟合函数
损失函数
梯度下降
机器学习分为【监督学习】和【无监督学习】。我们这里先说监督学习。
监督学习是指你有一些带有已知标签的数据,把这些数据拟合成一个函数,从而可以用这个函数去预测未知的值。
比如:我们有已知的三个值 1、2、3,已知它们的标签为 2、3、4。即 1->2, 2->3, 3->4 那我们怎么预测 5 的标签是多少呢?
这里我们把每个数据作为一个样本点,三个样本点分别为:(1,2) (2, 3) (3, 4)
我们知道可以用 y = kx + b 来表示一条直线,带入以上数值,解出 k=1, b=1, 即 y = x + 1
把 x = 5 带入直线,解得 y = 6,就成功预测了 5 的标签为 6
那么问题来了,现实生活中的真实值大多时候不会这么完美的落到一条直线上,
比如:我们的值是这样的(1, 2) (2, 2.8) (3, 4.2) 那怎么办呢?我们就找一个能尽可能多的靠近所有点的直线,使误差达到最小,如图五,我们可以看出,中间的绿色直线是最好的一条直线,我们怎么找到这样一条直线呢? 
我们先从最简单的情况开始,假设三个样本点为(1, 1) (2, 1.8) (3., 3.2)如图六:
假设函数 y = kx,在机器学习中我们经常这么表示假设函数:

这里引出一个概念“损失函数”,损失函数就是每个样本点到我们的直线的距离的总和。我们容易看出,当损失函数最小时,即每个样本点到我们的直线的距离(如图七中的箭头长度)的和最小时就是我们要找的拟合的最好的直线,即图八中绿色的直线。
我们定义损失函数: 

怎样使损失函数最小呢?
我们以上面三个样本点为例,代入展开为:

继续展开,为:

化简,得:

我们画出此损失函数的图像为:
我们发现,损失函数其实是一个开口向上的抛物线,我们知道抛物线的顶点是其最小值,即图中的红点。
红点即为其导数为0的点,损失函数的导数为:

令其等于0,解得:


我们解出最小值为我们求出最小值时k和b的值,即y=x+b
python代码:
import numpy as np
from matplotlib import pyplot as plt
from mpl_toolkits import mplot3d
import time
#from mpl_toolkits.mplot3d import Axes3D
#from decimal import Decimal
#from fractions import Fraction


# 线性回归
class LinearRegression:
    def __init__(self, x_points, y_points):
        self.X = np.array(x_points)
        self.Y = np.array(y_points)
        self.point_size = len(x_points)

        # ================= 用户自定义参数 =====================
        self.SLEEP_TIME = 0.001             # 动画间隔时间
        self.STEP_ALPHA = 0.1               # 梯度下降的步长
        self.show_view = True               # 是否可视化显示

        # =================== 画布设置 =======================
        _fig = plt.figure(figsize=(13.3, 5))                            # 设置画布大小
        mngr = plt.get_current_fig_manager()                            # 获取当前figure manager
        mngr.window.wm_geometry("+10+110")                              # 调整窗口在屏幕上弹出的位置
        plt.subplots_adjust(left=0.05, right=0.98, bottom=0.1, top=0.8) # 设置子图的边距
        plt.ion()#打开交互模式
        #plt.subplots_adjust(left=0.2, bottom=0.2, right=0.8, top=0.8, hspace=0.2, wspace=0.3)
        plt.rcParams['font.sans-serif'] = ['SimHei']          # 正常显示中文
        plt.rcParams['axes.unicode_minus'] = False            # 正常显示负号

        # ==================== 子图 ========================
        #self.ax1 = plt.subplot(1, 2, 1)                      # 子图1
        #self.ax2 = plt.subplot(1, 2, 2)                      # 子图2
        self.ax1 = _fig.add_subplot(131)                      # 子图1
        plt.title('拟合数据点')
        self.ax1.set_xlabel('x label', color='r')
        self.ax1.set_ylabel('y label', color='g')

        # self.ax2 = _fig.add_subplot(122)                    # 子图2
        self.ax2_3d = _fig.add_subplot(132, projection='3d')  # 3维子图
        self.ax2_3d.set_title('平方差损失函数')
        self.ax2_3d.set_xlabel('x label', color='r')
        self.ax2_3d.set_ylabel('y label', color='g')
        self.ax2_3d.set_zlabel('z label', color='b')

        # θ散点图
        self.ax3 = _fig.add_subplot(133)
        self.ax3.set_title('θ值')
        self.ax3.set_xlabel('upte times', color='r')
        self.ax3.set_ylabel('θ值', color='g')

        # self.theta = 0                    # 梯度下降的初始值
        self.theta0 = 0
        self.theta1 = 0
        #self.power2, self.power1, self.power0 = self.analysis_cost_function(x_points, y_points)

        # =================== 函数系数初始化 ===================
        # 多项式系数
        self.theta0_power2 = 0
        self.theta1_power2 = 0
        self.theta0_theta1 = 0
        self.theta0_power1 = 0
        self.theta1_power1 = 0
        self.power0 = 0

        # θ0的微分函数系数
        self.iteration_theta0_0 = 0
        self.iteration_theta0_1 = 0
        self.iteration_theta0_2 = 0

        # θ1的微分函数系数
        self.iteration_theta1_0 = 0
        self.iteration_theta1_1 = 0
        self.iteration_theta1_2 = 0

        # 预处理,解析函数,求多项式系数
        self.__analysis_cost_function(x_points, y_points)

    # 展示回归过程
    def regression0(self):
        # 绘制散点图
        self.ax1.scatter(self.X, self.Y)
        plt.pause(self.SLEEP_TIME)

        # 绘制抛物线
        X1 = np.linspace(0, 2, 30, endpoint=True)
        self.ax2.plot(X1, self.__get_power2_y(X1))
        plt.pause(self.SLEEP_TIME)

        # 梯度下降法拟合函数
        self.__show_current_theta_line0(self.theta)
        old = self.theta
        while True:
            self.theta = self.theta - self.STEP_ALPHA * (1 / (self.point_size)) * self.__get_dx()
            self.__show_current_theta_line0(self.theta)
            if self.theta == old:
                break
            old = self.theta
        print(f'结束,收敛于:{old}')

    # 展示回归过程
    def regression(self, show_view=True):
        self.show_view = show_view

        if self.show_view:
            # 绘制散点图
            self.ax1.scatter(self.X, self.Y)
            plt.pause(self.SLEEP_TIME)

            # 绘制代价函数三维图
            X1 = np.linspace(-1.3, 1.2, 100, endpoint=True)
            X2 = np.linspace(-0.3, 2.5, 100, endpoint=True)
            X1, X2 = np.meshgrid(X1, X2)
            Z = self.__get_power2_y(X1, X2)
            self.ax2_3d.plot_surface(X1, X2, Z, alpha=0.7)
            plt.pause(self.SLEEP_TIME)

        theta0_old = self.theta0
        theta1_old = self.theta1
        last_abvious_change0 = self.theta0
        last_abvious_change1 = self.theta1
        last_abvious_change0_1 = self.theta0
        last_abvious_change1_1 = self.theta1
        update_times = 0

        # 梯度下降法拟合函数
        print(f'第{update_times}次梯度下降,当前θ值为:θ0: {self.theta0},  θ1: {self.theta1}')
        self.__show_current_theta_line(self.theta0, self.theta1)
        self.__show_current_theta_value(update_times, self.theta0, self.theta1)

        while True:
            update_times += 1
            
            # 梯度下降法更新参数
            tmp0 = self.theta0 - self.get_dx_theta0()
            tmp1 = self.theta1 - self.get_dx_theta1()
            self.theta0 = tmp0
            self.theta1 = tmp1
            print(f'第{update_times}次梯度下降,当前θ值为:θ0: {self.theta0},  θ1: {self.theta1}')

            # 有明显变化时才绘制
            if abs(self.theta0 - last_abvious_change0) > 0.1 or abs(self.theta1 - last_abvious_change1) > 0.1:
                self.__show_current_theta_line(self.theta0, self.theta1)
                last_abvious_change0 = self.theta0
                last_abvious_change1 = self.theta1
            
            if abs(self.theta0 - last_abvious_change0) > 0.1 or abs(self.theta1 - last_abvious_change1) > 0.1:
                self.__show_current_theta_value(update_times, self.theta0, self.theta1)
                last_abvious_change0_1 = self.theta0
                last_abvious_change1_1 = self.theta1
            elif update_times % 30 == 0:
                self.__show_current_theta_value(update_times, self.theta0, self.theta1)
            
            #self.__show_current_theta_value(update_times, self.theta0, self.theta1)
            '''
            if update_times % 30 == 0:
                self.__show_current_theta_value(update_times, self.theta0, self.theta1)
                last_abvious_change0_1 = self.theta0
                last_abvious_change1_1 = self.theta1
            '''

            # 如果找到最小值结束迭代
            if self.theta0 == theta0_old and self.theta1 == theta1_old:
                print('精确收敛,结束')
                break

            # 如果变化特别特别微小,结束迭代
            if abs(self.theta0-theta0_old) < 1e-8 and abs(self.theta1-theta1_old) < 1e-8:
                print('更新值差值小于1e-8,结束')
                break

            theta0_old = self.theta0
            theta1_old = self.theta1

        print(f'最终收敛,  θ0收敛于: {self.theta0}, θ1收敛于: {self.theta1}')

        # 消除小数运算误差
        fixed1 = abs(self.theta0-round(self.theta0)) < 1e-5
        fixed2 = abs(self.theta1-round(self.theta1)) < 1e-5
        if fixed1:
            self.theta0 = round(self.theta0)
        if fixed2:
            self.theta1 = round(self.theta1)
        if fixed1 or fixed2:
            print(f'消除误差后,θ0收敛于: {self.theta0}, θ1收敛于: {self.theta1}')
    
        # 绘图显示最终的结果
        self.__show_current_theta_line(self.theta0, self.theta1, color='r')

    # 解析函数
    def analysis_cost_function0(self, x_points, y_points):
        power2, power1, power0 = 0, 0, 0
        for i, x in enumerate(x_points):
            power2 += x**2
            power1 += -2 * x * y_points[i]
            power0 += y_points[i] ** 2
        _2m_1 = 1 / (2 * len(x_points))
        power2 *= _2m_1
        power1 *= _2m_1
        power0 *= _2m_1
        print(f'平方差代价函数为: {power2}θ^2{"+" if power1 >= 0 else ""}{power1}θ{"+" if power0 >= 0 else ""}{power0}')
        print(f'导数为: {2*power2}θ{"+" if power1 >= 0 else ""}{power1}')
        return power2, power1, power0

    # 预处理,解析函数,求多项式系数
    def __analysis_cost_function(self, x_points, y_points):
        self.theta0_power2 = len(x_points)                       # θ0^2 的系数
        self.iteration_theta0_0 = len(x_points)
        for i, x in enumerate(x_points):
            # 解析代价函数多项式参数
            self.theta1_power2 += x**2                           # θ1^2 的系数
            self.theta0_theta1 += 2 * x                          # θ0*θ1 的系数
            self.theta0_power1 += -2 * y_points[i]               # θ0 的系数
            self.theta1_power1 += -2 * x * y_points[i]           # θ1 的系数
            self.power0 += y_points[i] ** 2                      # 常数项

            # 计算偏微分函数多项式参数
            self.iteration_theta0_1 += x
            self.iteration_theta0_2 += -y_points[i]
            self.iteration_theta1_0 += x
            self.iteration_theta1_1 += x**2
            self.iteration_theta1_2 += -x * y_points[i]

        str1 = f'平方差损失函数为: 1/{2*len(x_points)}({self.theta0_power2}θ0^2 {"+" if self.theta1_power2 >= 0 else "-"} {abs(self.theta1_power2)}θ1^2 {"+" if self.theta0_theta1 >= 0 else "-"} {abs(self.theta0_theta1)}θ0θ1 {"+" if self.theta0_power1 >= 0 else "-"} {abs(self.theta0_power1)}θ0 {"+" if self.theta1_power1 >= 0 else "-"} {abs(self.theta1_power1)}θ1 {"+" if self.power0 >= 0 else "-"} {abs(self.power0)})'
        str2 = f'导数为:           1/{2*len(x_points)}({2*self.theta0_power2}θ0 {"+" if self.theta1_power2 >= 0 else "-"} {abs(2*self.theta1_power2)}θ1 {"+" if self.theta0_theta1 else "-"} {abs(self.theta0_theta1)})'
        str3 = f'θ0偏微分函数为:  1/{len(x_points)}({self.iteration_theta0_0}θ0 {"+" if self.iteration_theta0_1 >= 0 else "-"} {abs(self.iteration_theta0_1)}θ1 {"+" if self.iteration_theta0_2 >= 0 else "-"} {abs(self.iteration_theta0_2)})'
        str4 = f'θ1偏微分函数为:  1/{len(x_points)}({self.iteration_theta1_0}θ0 {"+" if self.iteration_theta1_1 >= 0 else "-"} {abs(self.iteration_theta1_1)}θ1 {"+" if self.iteration_theta1_2 >= 0 else "-"} {abs(self.iteration_theta1_2)})'

        # 乘以 1/2m
        _2m_1 = 1 / (2 * len(x_points))
        self.theta0_power2 *= _2m_1
        self.theta1_power2 *= _2m_1
        self.theta0_theta1 *= _2m_1
        self.theta0_power1 *= _2m_1
        self.theta1_power1 *= _2m_1
        self.power0 *= _2m_1

        # 乘以 1/m
        _m_1 = 1 / len(x_points)
        self.iteration_theta0_0 *= _m_1
        self.iteration_theta0_1 *= _m_1
        self.iteration_theta0_2 *= _m_1
        self.iteration_theta1_0 *= _m_1
        self.iteration_theta1_1 *= _m_1
        self.iteration_theta1_2 *= _m_1

        # 绘图界面上显示解析出来的多函数
        if self.show_view:
            self.ax2_3d.set_title(f'{str1}\n{str2}\n{str3}\n{str4}', fontsize='x-small', horizontalalignment='left', loc='left')

        # 输出解析出来的函数
        print(str1)
        print(str2)
        print(str3)
        print(str4)

    # θ0的微分函数
    def get_dx_theta0(self):
        #return self.STEP_ALPHA * (1/self.point_size) * (3*self.theta0+6*self.theta1-9)
        #return Decimal(f'{self.STEP_ALPHA}') * (Decimal(f'{1}')/Decimal(f'{self.point_size}')) * (Decimal(f'{3}')*Decimal(f'{self.theta0}')+Decimal(f'{6}')*Decimal(f'{self.theta1}')-Decimal(f'{9}'))
        return self.STEP_ALPHA * (1/self.point_size) * (self.iteration_theta0_0*self.theta0+self.iteration_theta0_1*self.theta1+self.iteration_theta0_2)

    # θ1的微分函数
    def get_dx_theta1(self):
        #return self.STEP_ALPHA * (1/self.point_size) * (6*self.theta0+14*self.theta1-20)
        #return Decimal(f'{self.STEP_ALPHA}') * (Decimal(f'{1}')/Decimal(f'{self.point_size}')) * (Decimal(f'{6}')*Decimal(f'{self.theta0}')+Decimal(f'{14}')*Decimal(f'{self.theta1}')-Decimal(f'{20}'))
        return self.STEP_ALPHA * (1/self.point_size) * (self.iteration_theta1_0*self.theta0+self.iteration_theta1_1*self.theta1+self.iteration_theta1_2)

    # 代价函数
    def __get_power2_y0(self, x):
        #return (1/(2*self.point_size))*self.power2*x**2 + (1/(2*self.point_size))*self.power1*x + (1/(2*self.point_size))*self.power0 # 14x^2 - 28.4x + 14.48
        pass#return self.power2*x**2 + self.power1*x + self.power0 # 14x^2 - 28.4x + 14.48

    # 代价函数
    def __get_power2_y(self, theta0, theta1):
        return self.theta0_power2*theta0**2 + self.theta1_power2*theta1**2 + self.theta0_theta1*theta0*theta1 + self.theta0_power1*theta0 + self.theta1_power1*theta1 + self.power0

    # 导数
    def __get_dx0(self):
        #return (1/(2*self.point_size))*self.power2*2*self.theta + (1/(2*self.point_size))*self.power1
        return 2*self.power2*self.theta + self.power1

    # 导数
    def __get_dx(self):
        return 2*self.theta0_power2*self.theta0 + 2*self.theta1_power2*self.theta1 + self.theta0_theta1 + self.theta0_power1 + self.theta1_power1

    # 绘制出theta点和当theta为此值时拟合出来的直线
    def __show_current_theta_line0(self, theta):
        print(f'梯度下降,当前θ值为:{theta}')
    
        # 绘制切线处的点
        plt.sca(self.ax2)
        plt.scatter(theta, self.__get_power2_y0(theta))
        plt.pause(self.SLEEP_TIME)

        # 绘制一次函数
        plt.sca(self.ax1)
        plt.plot(self.X, theta*self.X)
        plt.pause(self.SLEEP_TIME)

    # 绘制出theta点和当theta为此值时拟合出来的直线
    def __show_current_theta_line(self, theta0, theta1, color=None):
        if not self.show_view:
            return

        # ================ 绘制梯度下降的点 ==================
        self.ax2_3d.scatter3D(theta0, theta1, self.__get_power2_y(theta0, theta1), color='r')
        #plt.pause(self.SLEEP_TIME)

        # ================ 绘制拟合的一次函数 ================
        # 清除上次的绘制
        if self.ax1.lines:
            self.ax1.lines.pop(0)
            #self.ax1.lines.remove(self.ax1.lines[0])
            #del self.ax1.lines
            #self.ax1.clear()
            #self.ax1.cla()

        # 绘制
        if color is None:
            self.ax1.plot(self.X, theta0+theta1*self.X, alpha=0.5)
        else:
            self.ax1.plot(self.X, theta0+theta1*self.X, color=color)
        self.ax1.set_title(f'当前拟合结果为: y = {theta0:.1f} + {theta1:.1f}x')
        plt.pause(self.SLEEP_TIME)

        # 绘制θ值
        #self.ax3.scatter(theta0, theta1)
        #plt.pause(self.SLEEP_TIME)

    def __show_current_theta_value(self, update_times, current_theta0, current_theta1):
        if not self.show_view:
            return

        self.ax3.scatter(update_times, current_theta0, color='r', alpha=0.5)
        self.ax3.scatter(update_times, current_theta1, color='g', alpha=0.5)

        plt.pause(self.SLEEP_TIME)


if __name__ == '__main__':
    #line_regression = LinearRegression([0, 1, 2, 3], [0, 0.8, 2.2, 3.0])
    #line_regression = LinearRegression([0, 1, 2, 3, 4, 5], [0, 1.8, 4.2, 5.8, 8.2, 10])
    #line_regression = LinearRegression([0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5])
    #line_regression = LinearRegression([0, 1, 2, 3], [1, 2, 3, 4])
    #line_regression = LinearRegression([1, 2, 3], [2, 3, 4])
    line_regression = LinearRegression([1, 2, 3], [1, 2, 3])
    line_regression.regression()

  

运行效果: