根本没有正确的选择,我们只有靠奋斗来使当初的选择显得正确。
—— 一个有思考的程序员
机器学习分为【监督学习】和【无监督学习】。我们这里先说监督学习。 监督学习是指你有一些带有已知标签的数据,把这些数据拟合成一个函数,从而可以用这个函数去预测未知的值。 比如:我们有已知的三个值 1、2、3,已知它们的标签为 2、3、4。即 1->2, 2->3, 3->4 那我们怎么预测 5 的标签是多少呢? 这里我们把每个数据作为一个样本点,三个样本点分别为:(1,2) (2, 3) (3, 4) 我们知道可以用 y = kx + b 来表示一条直线,带入以上数值,解出 k=1, b=1, 即 y = x + 1 把 x = 5 带入直线,解得 y = 6,就成功预测了 5 的标签为 6
那么问题来了,现实生活中的真实值大多时候不会这么完美的落到一条直线上, 比如:我们的值是这样的(1, 2) (2, 2.8) (3, 4.2) 那怎么办呢?我们就找一个能尽可能多的靠近所有点的直线,使误差达到最小,如图五,我们可以看出,中间的绿色直线是最好的一条直线,我们怎么找到这样一条直线呢?
假设函数 y = kx,在机器学习中我们经常这么表示假设函数: 这里引出一个概念“损失函数”,损失函数就是每个样本点到我们的直线的距离的总和。我们容易看出,当损失函数最小时,即每个样本点到我们的直线的距离(如图七中的箭头长度)的和最小时就是我们要找的拟合的最好的直线,即图八中绿色的直线。 我们定义损失函数: 怎样使损失函数最小呢? 我们以上面三个样本点为例,代入展开为: 继续展开,为: 化简,得: 我们画出此损失函数的图像为:
我们发现,损失函数其实是一个开口向上的抛物线,我们知道抛物线的顶点是其最小值,即图中的红点。 红点即为其导数为0的点,损失函数的导数为: 令其等于0,解得: 我们解出最小值为我们求出最小值时k和b的值,即y=x+b
import numpy as np from matplotlib import pyplot as plt from mpl_toolkits import mplot3d import time #from mpl_toolkits.mplot3d import Axes3D #from decimal import Decimal #from fractions import Fraction # 线性回归 class LinearRegression: def __init__(self, x_points, y_points): self.X = np.array(x_points) self.Y = np.array(y_points) self.point_size = len(x_points) # ================= 用户自定义参数 ===================== self.SLEEP_TIME = 0.001 # 动画间隔时间 self.STEP_ALPHA = 0.1 # 梯度下降的步长 self.show_view = True # 是否可视化显示 # =================== 画布设置 ======================= _fig = plt.figure(figsize=(13.3, 5)) # 设置画布大小 mngr = plt.get_current_fig_manager() # 获取当前figure manager mngr.window.wm_geometry("+10+110") # 调整窗口在屏幕上弹出的位置 plt.subplots_adjust(left=0.05, right=0.98, bottom=0.1, top=0.8) # 设置子图的边距 plt.ion()#打开交互模式 #plt.subplots_adjust(left=0.2, bottom=0.2, right=0.8, top=0.8, hspace=0.2, wspace=0.3) plt.rcParams['font.sans-serif'] = ['SimHei'] # 正常显示中文 plt.rcParams['axes.unicode_minus'] = False # 正常显示负号 # ==================== 子图 ======================== #self.ax1 = plt.subplot(1, 2, 1) # 子图1 #self.ax2 = plt.subplot(1, 2, 2) # 子图2 self.ax1 = _fig.add_subplot(131) # 子图1 plt.title('拟合数据点') self.ax1.set_xlabel('x label', color='r') self.ax1.set_ylabel('y label', color='g') # self.ax2 = _fig.add_subplot(122) # 子图2 self.ax2_3d = _fig.add_subplot(132, projection='3d') # 3维子图 self.ax2_3d.set_title('平方差损失函数') self.ax2_3d.set_xlabel('x label', color='r') self.ax2_3d.set_ylabel('y label', color='g') self.ax2_3d.set_zlabel('z label', color='b') # θ散点图 self.ax3 = _fig.add_subplot(133) self.ax3.set_title('θ值') self.ax3.set_xlabel('upte times', color='r') self.ax3.set_ylabel('θ值', color='g') # self.theta = 0 # 梯度下降的初始值 self.theta0 = 0 self.theta1 = 0 #self.power2, self.power1, self.power0 = self.analysis_cost_function(x_points, y_points) # =================== 函数系数初始化 =================== # 多项式系数 self.theta0_power2 = 0 self.theta1_power2 = 0 self.theta0_theta1 = 0 self.theta0_power1 = 0 self.theta1_power1 = 0 self.power0 = 0 # θ0的微分函数系数 self.iteration_theta0_0 = 0 self.iteration_theta0_1 = 0 self.iteration_theta0_2 = 0 # θ1的微分函数系数 self.iteration_theta1_0 = 0 self.iteration_theta1_1 = 0 self.iteration_theta1_2 = 0 # 预处理,解析函数,求多项式系数 self.__analysis_cost_function(x_points, y_points) # 展示回归过程 def regression0(self): # 绘制散点图 self.ax1.scatter(self.X, self.Y) plt.pause(self.SLEEP_TIME) # 绘制抛物线 X1 = np.linspace(0, 2, 30, endpoint=True) self.ax2.plot(X1, self.__get_power2_y(X1)) plt.pause(self.SLEEP_TIME) # 梯度下降法拟合函数 self.__show_current_theta_line0(self.theta) old = self.theta while True: self.theta = self.theta - self.STEP_ALPHA * (1 / (self.point_size)) * self.__get_dx() self.__show_current_theta_line0(self.theta) if self.theta == old: break old = self.theta print(f'结束,收敛于:{old}') # 展示回归过程 def regression(self, show_view=True): self.show_view = show_view if self.show_view: # 绘制散点图 self.ax1.scatter(self.X, self.Y) plt.pause(self.SLEEP_TIME) # 绘制代价函数三维图 X1 = np.linspace(-1.3, 1.2, 100, endpoint=True) X2 = np.linspace(-0.3, 2.5, 100, endpoint=True) X1, X2 = np.meshgrid(X1, X2) Z = self.__get_power2_y(X1, X2) self.ax2_3d.plot_surface(X1, X2, Z, alpha=0.7) plt.pause(self.SLEEP_TIME) theta0_old = self.theta0 theta1_old = self.theta1 last_abvious_change0 = self.theta0 last_abvious_change1 = self.theta1 last_abvious_change0_1 = self.theta0 last_abvious_change1_1 = self.theta1 update_times = 0 # 梯度下降法拟合函数 print(f'第{update_times}次梯度下降,当前θ值为:θ0: {self.theta0}, θ1: {self.theta1}') self.__show_current_theta_line(self.theta0, self.theta1) self.__show_current_theta_value(update_times, self.theta0, self.theta1) while True: update_times += 1 # 梯度下降法更新参数 tmp0 = self.theta0 - self.get_dx_theta0() tmp1 = self.theta1 - self.get_dx_theta1() self.theta0 = tmp0 self.theta1 = tmp1 print(f'第{update_times}次梯度下降,当前θ值为:θ0: {self.theta0}, θ1: {self.theta1}') # 有明显变化时才绘制 if abs(self.theta0 - last_abvious_change0) > 0.1 or abs(self.theta1 - last_abvious_change1) > 0.1: self.__show_current_theta_line(self.theta0, self.theta1) last_abvious_change0 = self.theta0 last_abvious_change1 = self.theta1 if abs(self.theta0 - last_abvious_change0) > 0.1 or abs(self.theta1 - last_abvious_change1) > 0.1: self.__show_current_theta_value(update_times, self.theta0, self.theta1) last_abvious_change0_1 = self.theta0 last_abvious_change1_1 = self.theta1 elif update_times % 30 == 0: self.__show_current_theta_value(update_times, self.theta0, self.theta1) #self.__show_current_theta_value(update_times, self.theta0, self.theta1) ''' if update_times % 30 == 0: self.__show_current_theta_value(update_times, self.theta0, self.theta1) last_abvious_change0_1 = self.theta0 last_abvious_change1_1 = self.theta1 ''' # 如果找到最小值结束迭代 if self.theta0 == theta0_old and self.theta1 == theta1_old: print('精确收敛,结束') break # 如果变化特别特别微小,结束迭代 if abs(self.theta0-theta0_old) < 1e-8 and abs(self.theta1-theta1_old) < 1e-8: print('更新值差值小于1e-8,结束') break theta0_old = self.theta0 theta1_old = self.theta1 print(f'最终收敛, θ0收敛于: {self.theta0}, θ1收敛于: {self.theta1}') # 消除小数运算误差 fixed1 = abs(self.theta0-round(self.theta0)) < 1e-5 fixed2 = abs(self.theta1-round(self.theta1)) < 1e-5 if fixed1: self.theta0 = round(self.theta0) if fixed2: self.theta1 = round(self.theta1) if fixed1 or fixed2: print(f'消除误差后,θ0收敛于: {self.theta0}, θ1收敛于: {self.theta1}') # 绘图显示最终的结果 self.__show_current_theta_line(self.theta0, self.theta1, color='r') # 解析函数 def analysis_cost_function0(self, x_points, y_points): power2, power1, power0 = 0, 0, 0 for i, x in enumerate(x_points): power2 += x**2 power1 += -2 * x * y_points[i] power0 += y_points[i] ** 2 _2m_1 = 1 / (2 * len(x_points)) power2 *= _2m_1 power1 *= _2m_1 power0 *= _2m_1 print(f'平方差代价函数为: {power2}θ^2{"+" if power1 >= 0 else ""}{power1}θ{"+" if power0 >= 0 else ""}{power0}') print(f'导数为: {2*power2}θ{"+" if power1 >= 0 else ""}{power1}') return power2, power1, power0 # 预处理,解析函数,求多项式系数 def __analysis_cost_function(self, x_points, y_points): self.theta0_power2 = len(x_points) # θ0^2 的系数 self.iteration_theta0_0 = len(x_points) for i, x in enumerate(x_points): # 解析代价函数多项式参数 self.theta1_power2 += x**2 # θ1^2 的系数 self.theta0_theta1 += 2 * x # θ0*θ1 的系数 self.theta0_power1 += -2 * y_points[i] # θ0 的系数 self.theta1_power1 += -2 * x * y_points[i] # θ1 的系数 self.power0 += y_points[i] ** 2 # 常数项 # 计算偏微分函数多项式参数 self.iteration_theta0_1 += x self.iteration_theta0_2 += -y_points[i] self.iteration_theta1_0 += x self.iteration_theta1_1 += x**2 self.iteration_theta1_2 += -x * y_points[i] str1 = f'平方差损失函数为: 1/{2*len(x_points)}({self.theta0_power2}θ0^2 {"+" if self.theta1_power2 >= 0 else "-"} {abs(self.theta1_power2)}θ1^2 {"+" if self.theta0_theta1 >= 0 else "-"} {abs(self.theta0_theta1)}θ0θ1 {"+" if self.theta0_power1 >= 0 else "-"} {abs(self.theta0_power1)}θ0 {"+" if self.theta1_power1 >= 0 else "-"} {abs(self.theta1_power1)}θ1 {"+" if self.power0 >= 0 else "-"} {abs(self.power0)})' str2 = f'导数为: 1/{2*len(x_points)}({2*self.theta0_power2}θ0 {"+" if self.theta1_power2 >= 0 else "-"} {abs(2*self.theta1_power2)}θ1 {"+" if self.theta0_theta1 else "-"} {abs(self.theta0_theta1)})' str3 = f'θ0偏微分函数为: 1/{len(x_points)}({self.iteration_theta0_0}θ0 {"+" if self.iteration_theta0_1 >= 0 else "-"} {abs(self.iteration_theta0_1)}θ1 {"+" if self.iteration_theta0_2 >= 0 else "-"} {abs(self.iteration_theta0_2)})' str4 = f'θ1偏微分函数为: 1/{len(x_points)}({self.iteration_theta1_0}θ0 {"+" if self.iteration_theta1_1 >= 0 else "-"} {abs(self.iteration_theta1_1)}θ1 {"+" if self.iteration_theta1_2 >= 0 else "-"} {abs(self.iteration_theta1_2)})' # 乘以 1/2m _2m_1 = 1 / (2 * len(x_points)) self.theta0_power2 *= _2m_1 self.theta1_power2 *= _2m_1 self.theta0_theta1 *= _2m_1 self.theta0_power1 *= _2m_1 self.theta1_power1 *= _2m_1 self.power0 *= _2m_1 # 乘以 1/m _m_1 = 1 / len(x_points) self.iteration_theta0_0 *= _m_1 self.iteration_theta0_1 *= _m_1 self.iteration_theta0_2 *= _m_1 self.iteration_theta1_0 *= _m_1 self.iteration_theta1_1 *= _m_1 self.iteration_theta1_2 *= _m_1 # 绘图界面上显示解析出来的多函数 if self.show_view: self.ax2_3d.set_title(f'{str1}\n{str2}\n{str3}\n{str4}', fontsize='x-small', horizontalalignment='left', loc='left') # 输出解析出来的函数 print(str1) print(str2) print(str3) print(str4) # θ0的微分函数 def get_dx_theta0(self): #return self.STEP_ALPHA * (1/self.point_size) * (3*self.theta0+6*self.theta1-9) #return Decimal(f'{self.STEP_ALPHA}') * (Decimal(f'{1}')/Decimal(f'{self.point_size}')) * (Decimal(f'{3}')*Decimal(f'{self.theta0}')+Decimal(f'{6}')*Decimal(f'{self.theta1}')-Decimal(f'{9}')) return self.STEP_ALPHA * (1/self.point_size) * (self.iteration_theta0_0*self.theta0+self.iteration_theta0_1*self.theta1+self.iteration_theta0_2) # θ1的微分函数 def get_dx_theta1(self): #return self.STEP_ALPHA * (1/self.point_size) * (6*self.theta0+14*self.theta1-20) #return Decimal(f'{self.STEP_ALPHA}') * (Decimal(f'{1}')/Decimal(f'{self.point_size}')) * (Decimal(f'{6}')*Decimal(f'{self.theta0}')+Decimal(f'{14}')*Decimal(f'{self.theta1}')-Decimal(f'{20}')) return self.STEP_ALPHA * (1/self.point_size) * (self.iteration_theta1_0*self.theta0+self.iteration_theta1_1*self.theta1+self.iteration_theta1_2) # 代价函数 def __get_power2_y0(self, x): #return (1/(2*self.point_size))*self.power2*x**2 + (1/(2*self.point_size))*self.power1*x + (1/(2*self.point_size))*self.power0 # 14x^2 - 28.4x + 14.48 pass#return self.power2*x**2 + self.power1*x + self.power0 # 14x^2 - 28.4x + 14.48 # 代价函数 def __get_power2_y(self, theta0, theta1): return self.theta0_power2*theta0**2 + self.theta1_power2*theta1**2 + self.theta0_theta1*theta0*theta1 + self.theta0_power1*theta0 + self.theta1_power1*theta1 + self.power0 # 导数 def __get_dx0(self): #return (1/(2*self.point_size))*self.power2*2*self.theta + (1/(2*self.point_size))*self.power1 return 2*self.power2*self.theta + self.power1 # 导数 def __get_dx(self): return 2*self.theta0_power2*self.theta0 + 2*self.theta1_power2*self.theta1 + self.theta0_theta1 + self.theta0_power1 + self.theta1_power1 # 绘制出theta点和当theta为此值时拟合出来的直线 def __show_current_theta_line0(self, theta): print(f'梯度下降,当前θ值为:{theta}') # 绘制切线处的点 plt.sca(self.ax2) plt.scatter(theta, self.__get_power2_y0(theta)) plt.pause(self.SLEEP_TIME) # 绘制一次函数 plt.sca(self.ax1) plt.plot(self.X, theta*self.X) plt.pause(self.SLEEP_TIME) # 绘制出theta点和当theta为此值时拟合出来的直线 def __show_current_theta_line(self, theta0, theta1, color=None): if not self.show_view: return # ================ 绘制梯度下降的点 ================== self.ax2_3d.scatter3D(theta0, theta1, self.__get_power2_y(theta0, theta1), color='r') #plt.pause(self.SLEEP_TIME) # ================ 绘制拟合的一次函数 ================ # 清除上次的绘制 if self.ax1.lines: self.ax1.lines.pop(0) #self.ax1.lines.remove(self.ax1.lines[0]) #del self.ax1.lines #self.ax1.clear() #self.ax1.cla() # 绘制 if color is None: self.ax1.plot(self.X, theta0+theta1*self.X, alpha=0.5) else: self.ax1.plot(self.X, theta0+theta1*self.X, color=color) self.ax1.set_title(f'当前拟合结果为: y = {theta0:.1f} + {theta1:.1f}x') plt.pause(self.SLEEP_TIME) # 绘制θ值 #self.ax3.scatter(theta0, theta1) #plt.pause(self.SLEEP_TIME) def __show_current_theta_value(self, update_times, current_theta0, current_theta1): if not self.show_view: return self.ax3.scatter(update_times, current_theta0, color='r', alpha=0.5) self.ax3.scatter(update_times, current_theta1, color='g', alpha=0.5) plt.pause(self.SLEEP_TIME) if __name__ == '__main__': #line_regression = LinearRegression([0, 1, 2, 3], [0, 0.8, 2.2, 3.0]) #line_regression = LinearRegression([0, 1, 2, 3, 4, 5], [0, 1.8, 4.2, 5.8, 8.2, 10]) #line_regression = LinearRegression([0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5]) #line_regression = LinearRegression([0, 1, 2, 3], [1, 2, 3, 4]) #line_regression = LinearRegression([1, 2, 3], [2, 3, 4]) line_regression = LinearRegression([1, 2, 3], [1, 2, 3]) line_regression.regression()
运行效果: