自动微分是什么?(续)

date
Nov 2, 2023
slug
Automatic-Differentiation
status
Published
tags
Neural Network
Machine Learn
Python
算法
Compute Science
summary
都知道DL中参数是通过梯度下降迭代求得的,那梯度又是如何计算的呢?自动微分!说到自动微分自然离不开计算图。本系列文章将深入讲解现代深度学习框架中扩展计算图的实现细节,并尝试在NumPy的基础上动手实现一个可以求解自动微分的计算图。
type
Post
👉
💿
自动微分是什么?
中,从理论层面介绍了为什么需要自动微分,又解释了什么是自动微分,以及自动微分是如何计算梯度的。本文将续接前文,动手实现一个可以求解自动微分的计算图。当然,为了步跑偏主题,将借助NumPy来实现。且需要说明的是,本文将全力模仿PyTorch框架的接口,这样,可以对PyTorch的用法又更加深刻的认识,尤其是在每次loss.backward()的时候。

数据结构设计

要想实现自动微分,首先要理解透彻前文介绍的计算图。回想一下,计算图中主要包括了节点,和节点之间的运算。那么首先想到的便是封装这个节点的class,然后还需要实现各种基础运算和对应的梯度,毕竟,自动微分初始的想法便是把数值计算的本质看作是一系列可微分算子的组合,因此这些可微分算子便作为了最小基础组件,需要手动去实现正向和反向计算。
此时,我们来设计节点类应该包含的方法与属性,首先一个节点即为一个张量,因此需要一个属性来保存该张量的元数据,比如张量内元素、张量类型、张量维度、形状等,为了不偏移主题,采用Numpyndarray对象来存储该张量;然后,节点还有输入,考虑到节点输入张量的数量的不同,采用数组来保存该节点的所有输入;再然后,节点还对应运算操作,因为只有有了运算操作,才能知道如何从该节点的输入计算得到该节点的数值。此时,对于计算图中的一个节点,有大致如下的结构:
notion imagenotion image
接下来,轮到了运算操作的代码设计了,运算操作即可微分算子,因此,至少需要包含两个方法:前向计算和反向计算,前向节点接受输入,然后返回计算的结果,反向计算接受计算图后面传递回来的梯度,然后计算好当前节点的梯度之后,再沿着计算图传递回去。
notion imagenotion image
需要注意的是,在前向计算的时候,该方法接受的输入不应该是一个节点,而只能是节点上的张量(即节点输入的Tensor上的张量),在计算完成后,再将计算得到结果赋到该节点的张量上。即假设 a为一节点,在计算图上运算时,a.array = a.op.compute(*[t.array for t in a.inputs])。当然,如果以节点没有输入及运算操作时(叶子节点),其内部属性array应该在构造的时候传入并赋值。因此,便对应了两种创建节点的方式:
  • 显示的创建一个节点,此时需要传入array至Tensor初始化函数中
  • 由别的Tensor对象通过运算得到的新Tensor,此时,需要利用a b以及运算操作一起来构建Tensor。
因此,在Tensor内,需要实现两种不同的构造函数。一种是常规的由参数构造,这种实现通常就是把参数传到初始化函数__init__()中;另外一种是通过函数make_form_op来构造对象。此外,需要考虑的是,如果通过Tensor对象的运算自动的来构建,而不是手动调用某函数来构造对象,对于该问题,可以采用Python的魔法方法__call__(),来实现运算时自动调用make_form_op进而创建出新的Tensor对象。
最后,在形成一张计算图之后,每个节点还应该有backward函数,以表明从该节点开始反向计算,具体算法将在后文具体展开。
代码框架如下:
class Tensor: op: Optional[Op] inputs: Optional[List["Tensor"]] cached_data: Optional[np.ndarray] requires_grad: Optional[bool]=False grad: Optional["Tensor"] = None def __init__(self, array, *, dtype, requires_grad): pass @staticmethod def make_from_op(op: Op, inputs: List["Tensor"]) -> "Tensor": pass def realized_cached_data(self): """执行具体的运算""" if self.cached_data is not None: return self.cached_data else: self.cached_data = self.op.compute( *[x.realized_cached_data() for x in self.inputs] ) return self.cached_data @property def shape(self): return self.realized_cached_data().shape @property def is_leaf(self): return self.op is None def backward(self, out_grad: "Tensor"): pass class Op: def __call__(self, *args): raise NotImplementedError() def compute(self, *args: List[np.ndarray]) -> np.ndarray: """计算前向运算 Parameter: *args: 参与前向运算计算的数组 Return: output: 前向传播计算好的结果 """ raise NotImplementedError() def gradient( self, out_grad: "Tensor", node: "Tensor" ) -> Union["Tensor", Tuple["Tensor"]]: """计算反向传播梯度计算 Parameters: out_grad: 从后面节点传到当前节点的梯度 node: 前向传播的节点, 因为使用扩展计算图, 所以需要原节点 Return: 对节点node的输入节点的偏梯度 """ raise NotImplementedError() def __call__(self, *args): """保证了直接调用运算操作的时候, 是创建一个新的Tensor对象,且该对象里面包括了具体操作""" return Tensor.make_from_op(self, args)
在上述代码的框架下,假设有 Tensor 对象 ab,计算c = a + b,过程包括:
计算 c = OpAdd()(a, b) 首先执行 c = Tensor.make_form_op(OpAdd, a, b) 然后有: c.inputs = [a, b] c.op = OpAdd 此时 c 内部张量cached_data还为 None, 因为还没有开始真正计算 a + b 当 c 内调用 realized_cached_data() 时,有 self.cached_data = self.op.compute(*[x.realized_cached_data() for x in self.inputs]) 此时,才真正得到了 cached_data

操作运算细节

实现常规基础运算操作的前向和反向计算,每种操作均继承自Op类,因此,只需要实现computegradient方法即可。
除了transpose、reshape、 summary、broadcast_to 和 matmul操作之外,别的操作通常很容易想到,因此直接给出代码。

逐元素相加

考虑两个节点相加得到另外一个节点,计算compute时,直接利用numpy的接口相加减即可,反向计算gradient的时候,显然,均为1,因此只需要把后面传后来的梯度继续传递回去即可。
notion imagenotion image
class EWiseAdd(Op): def compute(self, a: np.ndarray, b: np.ndarray): return a + b def gradient(self, out_grad: "Tensor", node: "Tensor"): return out_grad, out_grad

与标量相加

不同于逐元素相加的是,在计算图中,标量是不算进去的,因为标量不计算梯度。因此一方面,节点的内部,标量是不会存储在属性inputs的数组中的;另一方面,计算梯度的时候,也不需要给标量返回梯度。
class AddScalar(Op): def __init__(self, scalar): self.scalar = scalar def compute(self, a: np.ndarray): return a + self.scalar def gradient(self, out_grad: "Tensor", node: "Tensor"): return out_grad

取反操作

取反操作是逐元素取反的,因此配合逐元素相加,即可实现逐元素相减了。
class Negative(Op): def compute(self, a: np.ndarray): return -a def gradient(self, out_grad: "Tensor", node: "Tnesor"): return Negative()(out_grad)

逐元素乘除

本质上,逐元素乘除因为不改变张量的shape,因此求梯度和普通变量求梯度是一样的,无非是需要乘上从后面传回来的梯度。
此外,需要注意的是,因为是实现扩展计算图,即在反向计算的时候,会重新生成一张计算图,因此,gradient内的运算,是需要生成新节点的。
class EWiseMul(Op): def compute(self, a: np.ndarray, b: np.ndarray): return a * b def gradient(self, out_grad: "Tensor", node: "Tensor"): lhs, rhs = node.inputs return EWiseMul()(out_grad, rhs), EWiseMul()(out_grad, lhs)
class EWiseDiv(Op): def compute(self, a: np.ndarray, b: np.ndarray): return a / b def gradient(self, out_grad: "Tensor", node: "Tensor"): lhs, rhs = node.inputs return EWiseDiv()(out_grad, rhs), Negative()( EWiseDiv()(EWiseMul()(out_grad, lhs), PowerScalar(2)(rhs)) )

与标量乘除

class MulScalar(Op): def __init__(self, scalar): self.scalar = scalar def compute(self, a: np.ndarray): return a * self.scalar def gradient(self, out_grad: "Tensor", node: "Tensor"): return MulScalar(self.scalar)(out_grad)
class DivScalar(Op): def __init__(self, scalar): self.scalar = scalar def compute(self, a: np.ndarray): return a / self.scalar def gradient(self, out_grad: "Tensor", node: "Tensor"): return DivScalar(self.scalar)(out_grad)

Log / Exp 运算

class Log(Op): def compute(self, a: np.ndarray) -> np.ndarray: return np.log(a) def gradient(self, out_grad: "Tensor", node: "Tensor") -> "Tensor": return EWiseDiv()(out_grad, node.inputs[0])
class Exp(TensorOp): def compute(self, a: np.ndarray): np.exp(a) def gradient(self, out_grad: "Tensor", node: "Tensor"): return EWiseMul()(out_grad, Exp()(node.inputs[0]))

指数运算

class PowerScalar(Op): def __init__(self, scalar): self.scalar = scalar def compute(self, a: np.ndarray): return np.power(a, self.scalar) def gradient(self, out_grad: "Tensor", node: "Tensor"): return EWiseMul()( out_grad, MulScalar(self.scalar)(PowerScalar(self.scalar - 1)(node.inputs[0])), )

Transpose操作

Pytorch的transpose运算和NumPy略有不同,如下,在这里,我选择了模仿PyTorch,因此,在compute的时候,便需要对参数axis做一些操作。
torch.transpose(inputdim0dim1) → Tensor Returns a tensor that is a transposed version of input. The given dimensions dim0 and dim1 are swapped. numpy.transpose(aaxes=None) Returns an array with axes transposed. Thi’th axis of the returned array will correspond to the axis numbered axes[i] of the input.
具体而言,Pytorch中的transpose,传入的参数是dim0, dim1,代表需要交换的两个维度;而Numpy中的transpose传入的axis是长度为ndim的元组,代表原先的维度在交换之后的维度位置,且当axis为None时,默认交换最后两个维度。因此,对于compute有如下代码:
def compute(self, a: np.ndarray): # 构建一个长度为 a.ndim的元组,作为np.transpose的参数 axis_ = [i for i in range(a.ndim)] if self.axis is None: axis_[-1], axis_[-2] = axis_[-2], axis_[-1] else: axis_[self.axis[0]], axis_[self.axis[1]] = ( axis_[self.axis[1]], axis_[self.axis[0]], ) return np.transpose(a, axis_)
transpose的梯度呢?想象一下,前向的时候,交换了两个维度,反向回传梯度的时候,是不是要再换回来呢?这样才那能保证梯度可以到正确的位置。或者直观上理解,transpose只是改变了轴的顺序,并没有涉及到 数值运算,因此微分数值不会改变,仅仅再transpose一下即可。因此,transpose的梯度如下:
def gradient(self, out_grad: "Tensor", node: "Tensor"): return Transpose(self.axis)(out_grad)

Reshape操作

reshape操作和transpose差不多,都没有改变tensor.size,即Tensor中元素数量没变化,因此,对于这种size没变的转换操作,计算梯度的时候,直接恢复原先的shape即可。
class Reshape(Op): def __init__(self, shape): self.shape = shape def compute(self, a: np.ndarray): return np.reshape(a, self.shape) def gradient(self, out_grad: "Tensor", node: "Tensor"): ori_shape = node.inputs[0].shape return Reshape(ori_shape)(out_grad)

Summary

因为是对张量的summary,就必然涉及到了ndim及shape的变化,甚至如果参数axis没有指定的话,求和结果就是一个标量了,此种情况下,求gradient时,回传的一定是原先的shape,但是该如何恢复到原先的shape呢?首先想到的是broadcast to,但是broadcast也是有局限性的,它只能将维度长度为1的broadcast到原先的长度,因此需要先将ndim补回到原先的ndim,在运算summary之后消失的维度上补1。因此,summary的梯度回传包括两部分:
  • 补全ndim,将回传梯度reshape到该现状
  • broadcast_to到原先的shape
第二步很容易实现,那第一步如何补全呢?分为两种情况,如果运算的时候,axis没有指定,那么给所有维度均先补为1即可;如果有指定axis,那么将指定axis的位置补1即可。例:假设原先 a.shape = (5, 10, 2),经过 b = summation(a, axes=(1)) 之后,b.shape = (5, 2), 从后面传回来的梯度 out_grad.shape =(5, 2)。 此时如何计算a的梯度呢? 经过summation运算后,size变少了,所以需要用到broadacat,但是boradcast无法直接把 (5, 2) 变为 (5, 10, 2),因此,先把(5, 2) reshape 为(5, 1, 2),然后再broadcast。
class Summation(Op): def __init__(self, axis: Optional[Tuple[int]] = None) -> None: self.axis = axis def compute(self, a: np.ndarray): return np.sum(a, axis=self.axis) def gradient(self, out_grad: "Tensor", node: "Tensor") -> "Tensor": ori_shape = node.inputs[0].shape if self.axis: temp_shape = list(ori_shape) for i in self.axis: temp_shape[i] = 1 else: temp_shape = [1] * len(ori_shape) # 不涉及 out_grad.size 的改变 out_grad = Reshape(temp_shape)(out_grad) # 2.booadcast: 涉及到 out_grad.size 的改变 return BroadcastTo(ori_shape)(out_grad)

BroadcastTo操作

boardcast 通常会改变数据的size,且通常会有两种case:
  • ndim 不变的,仅仅改变 tensor 的shape,如shape 从(10, 1) 到(10, 5) 。且第二维只能从 1到5,不可能从 2到5,因为会冲突。
  • ndim改变的,如 shape从 (10, 2)到(5, 10, 2),
对 broadcast的特点说明如下:
  • broadcast 后,如果没有改变ndim,那么 shape 改变的那部分,只能是从1到别的数值。
  • broadcast 后,如果改变了ndim,那么 多出来的轴一定在前面。例如 (10, 2)到(5, 10, 2) 多出来轴在最开始。
broadcast 后,梯度回传 就是把shape上多出来的 size 累加起来,因此,需要统计 哪些 axes 发生了改变,如果 ndim 增多了,那么 range(len(a_.shape) - len(a.shape)) 即可代表多出来的axes(利用上述特点1);然后再倒着判断 shape 中的数值,看 前后是否不一致,如果不一致且原先该维度长度为1(利用上述特点1),那么就是需要累计的轴;
class BroadcastTo(Op): def __init__(self, shape): self.shape = shape def compute(self, a): return np.broadcast_to(a, self.shape) def gradient(self, out_grad, node): ori_shape = node.inputs[0].shape # 存储待压缩的轴 axes_ = [] # 1. 处理 ndim 的 broadcast, 这里 broadcast 增加的dim 必定是在 较低的轴,所以计算出来增加了几个,然后从0到几就是了 axes_ += list(range(len(self.shape)-len(ori_shape))) # 2, 处理形状的 broadcast,倒着往回检查不一样的 for i in range(-1, -len(ori_shape)-1, -1): # 不相同,必定是 broadcast了 if ori_shape[i] != self.shape[i]: assert ori_shape[i] == 1 axes_.append(i) axes_ = tuple(axes_) return Reshape(ori_shape)(Summation(axis=tuple(axis_))(out_grad))

矩阵相乘运算

常规的矩阵相乘不管是前向计算还是梯度回传都比较简单,直接计算即可。此外,这里还处理了维度不匹配的情况。
class MatMul(Op): def compute(self, a, b): return np.matmul(a, b) def gradient(self, out_grad, node): lhs, rhs = node.inputs l_grad = matmul(out_grad, transpose(rhs)) r_grad = matmul(transpose(lhs), out_grad) # 如果 ndim 不一样,把多出来的 dim 对应的轴sum起来 ,多出来的轴一定是前面的 l_grad_ndim, r_grad_ndim = len(l_grad.shape), len(r_grad.shape) lhs_ndim, rhs_ndim = len(lhs.shape), len(rhs.shape) if l_grad_ndim > lhs_ndim: axes = tuple(range(l_grad_ndim - lhs_ndim)) l_grad = summation(l_grad, axes) if r_grad_ndim > rhs_ndim: axes = tuple(range(r_grad_ndim - rhs_ndim)) r_grad = summation(r_grad, axes) return l_grad, r_grad

反向传播算法实现

在设计完基础数据结构之后,就可以开始实现梯度反向传播了,在前文中讲到,梯度反向传播,本质上不过是拓扑排序的逆向遍历,然后遍历的时候,把给个节点上,回传回来的偏梯度相加,即得到了当前节点的梯度,然后再接着往前传。
👉
某人说过,拓扑排序是最不像排序算法的排序算法了,因为它并不是按照元素的大小来排序,而是按照拓扑序来排序,不太了解的建议去Leetcode上写几道题熟悉一下。或者去我的笔记上看一下。
按照前一篇的理论推导,以下直接实现完整的反向传播:
def compute_gradient_of_variables(output_tensor:"Tensor", out_grad:"Tensor"): node_to_output_grads_list:Dict["Tensor", List["Tensor"]] = defaultdict(list) node_to_output_grads_list[output_tensor] = [out_grad] reversr_topo_order = list(reversed(find_topo_sort([output_tensor]))) for node in reversr_topo_order: # 先计算得到当前节点的梯度 node.grad = sum_node_list(node_to_output_grads_list[node]) if node.is_leaf: continue # 然后再把待回传的梯度赋给当前节点的输入节点 in_nodes_grads = node.op.gradient_as_tuple(node.grad, node) for in_node, grad in zip(node.inputs, in_nodes_grads): node_to_output_grads_list[in_node].append(grad) def find_topo_sort(node_list:List["Tensor"]): topo_order = [] visited = set() for cur_node in node_list: topo_sort_dfs(cur_node, topo_order, visited) return topo_order def topo_sort_dfs(cur_node, res, visited): if cur_node in visited: return for in_node in cur_node.inputs: topo_sort_dfs(in_node, res, visited) res.append(cur_node) visited.add(cur_node) def sum_node_list(node_list:List["Tensor"]): # 这里需要深拷贝,因为需要创建扩展计算图 if len(node_list) == 1: return copy.deepcopy(node_list[0]) res = node_list[0] for node in node_list[1:]: res = res + node return res

小结

至此,已经完成了自动微分的所有代码实现,完整代码参见, 在该仓库中不仅有本文介绍的所有代码,还有完整的pytest测试案例,以及标准的代码注释;甚至,还利用graphviz库实现了计算图的可视化;最后还用所实现的此自动微分框架,手写了两层神经网络。
自动微分机制是计算梯度的一种方式,它是借助计算图来高效地计算梯度。通过构建计算图,可以跟踪变量之间的依赖关系,从而在反向传播过程中自动计算出梯度,进而利用梯度下来来优化参数。也就是说,自动微分框机制是深度学习框架的基础,因此,后续可以在此基础上,实现类似PyTorch的简易深度学习框架。