这是 数学重学路线图 阶段三的子页面
📌 已有笔记:图论基础,本页补充Python实战
本页核心:用图的语言描述”事物之间的关系”,再用算法在关系中找答案
一、图是什么?——从直觉开始 直觉比喻:图就是”关系的地图”
社交网络:人是点,好友关系是线
城市交通:路口是点,道路是线(可以有方向、有距离)
微服务架构:服务是点,调用关系是线
1.1 基本术语 顶点(Vertex / Node) :图中的对象
边(Edge) :两个顶点之间的关系
有向图 :边有方向(A→B不等于B→A),如微服务调用
无向图 :边没方向(A—B等于B—A),如好友关系
加权图 :边有权重(距离、耗时、带宽)
无权图 :边没有权重,或认为权重都是1
度(Degree) :一个顶点连了几条边
有向图分入度 (指向自己的边)和出度 (从自己出去的边)
1.2 数学表示 图 G = (V, E),V是顶点集合,E是边集合
无向图:E ⊆ { {u,v} | u,v ∈ V }
有向图:E ⊆ { (u,v) | u,v ∈ V }
顶点数 |V|,边数 |E|
无向图:所有顶点度之和 = 2|E|(每条边贡献两个度)
二、图的存储:邻接矩阵 vs 邻接表 2.1 邻接矩阵 V×V 的二维数组,matrix[i][j]=1 表示 i→j 有边
空间复杂度:O(V²)
查询”i和j之间有边吗”:O(1)
遍历某顶点的所有邻居:O(V)
适合:稠密图 (边很多),小规模图
2.2 邻接表 每个顶点维护一个邻居列表
空间复杂度:O(V+E)
查询”i和j之间有边吗”:O(度)
遍历某顶点的所有邻居:O(度)
适合:稀疏图 (边远少于V²),大规模图
2.3 怎么选? 大多数实际场景是稀疏图 → 邻接表更常用
社交网络10亿用户,平均好友200 → 邻接矩阵10^18不可能;邻接表只需10^11
Python实现两种表示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class AdjMatrix :def __init__ (self, n ): self .n = n self .matrix = [[0 ]*n for _ in range (n)] def add_edge (self, u, v, directed=False ): self .matrix[u][v] = 1 if not directed: self .matrix[v][u] = 1 def has_edge (self, u, v ): return self .matrix[u][v] == 1 def neighbors (self, u ): return [v for v in range (self .n) if self .matrix[u][v]] class AdjList :def __init__ (self ): self .graph = {} def add_vertex (self, v ): if v not in self .graph: self .graph[v] = [] def add_edge (self, u, v, directed=False ): self .add_vertex(u) self .add_vertex(v) self .graph[u].append(v) if not directed: self .graph[v].append(u) def has_edge (self, u, v ): return v in self .graph.get(u, []) def neighbors (self, u ): return self .graph.get(u, []) g = AdjList() edges = [(0 ,1 ), (0 ,2 ), (1 ,3 ), (2 ,3 ), (3 ,4 )] for u, v in edges:g.add_edge(u, v) print (f"顶点0的邻居: {g.neighbors(0 )} " ) print (f"1和3有边? {g.has_edge(1 , 3 )} " )
三、BFS——广度优先搜索 3.1 直觉 像水波一样层层扩展 :先访问距离为1的,再距离为2的…
数据结构:队列 (先进先出)
结果:发现每个可达顶点时的距离就是最短跳数
3.2 无权图最短路径 BFS天然求解无权图的最短路径(每条边权重=1)
3.3 时间复杂度:O(V+E) Python实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from collections import dequedef bfs (graph, start ):""" BFS遍历,返回每个顶点到start的距离 graph: 邻接表字典 {node: [neighbors]} """ visited = {start} queue = deque([(start, 0 )]) distances = {start: 0 } order = [] while queue: node, dist = queue.popleft() order.append(node) for neighbor in graph.get(node, []): if neighbor not in visited: visited.add(neighbor) distances[neighbor] = dist + 1 queue.append((neighbor, dist + 1 )) return order, distancesgraph = { 'A' : ['B' , 'C' ],'B' : ['A' , 'D' , 'E' ],'C' : ['A' , 'F' ],'D' : ['B' ],'E' : ['B' , 'F' ],'F' : ['C' , 'E' ]} order, dist = bfs(graph, 'A' ) print (f"BFS顺序: {order} " )print (f"到A的距离: {dist} " )
BFS求最短路径(带路径回溯)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def bfs_shortest_path (graph, start, end ):"""返回start到end的最短路径""" visited = {start} queue = deque([(start, [start])]) while queue: node, path = queue.popleft() if node == end: return path for neighbor in graph.get(node, []): if neighbor not in visited: visited.add(neighbor) queue.append((neighbor, path + [neighbor])) return None path = bfs_shortest_path(graph, 'A' , 'F' ) print (f"A到F最短路径: {' → ' .join(path)} " )
四、DFS——深度优先搜索 4.1 直觉 一条路走到底,走不通再回头 (回溯)
数据结构:栈 (递归调用栈或显式栈)
4.2 用途 连通性检测:从一点能否到达另一点
环检测:有向图中是否存在环
拓扑排序(DFS后序的逆序)
4.3 时间复杂度:O(V+E) Python实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 def dfs_recursive (graph, node, visited=None ):"""DFS递归版""" if visited is None : visited = set () visited.add(node) print (node, end=' ' )for neighbor in graph.get(node, []): if neighbor not in visited: dfs_recursive(graph, neighbor, visited) return visiteddef dfs_iterative (graph, start ):"""DFS迭代版(显式栈)""" visited = set () stack = [start] order = [] while stack: node = stack.pop() if node in visited: continue visited.add(node) order.append(node) for neighbor in reversed (graph.get(node, [])): if neighbor not in visited: stack.append(neighbor) return orderprint ("DFS递归: " , end='' )dfs_recursive(graph, 'A' ) print ()print (f"DFS迭代: {dfs_iterative(graph, 'A' )} " )
环检测(有向图)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def has_cycle (graph ):"""有向图环检测,三色标记法""" WHITE, GRAY, BLACK = 0 , 1 , 2 color = {node: WHITE for node in graph} def dfs (node ): color[node] = GRAY for neighbor in graph.get(node, []): if color.get(neighbor, WHITE) == GRAY: return True if color.get(neighbor, WHITE) == WHITE: if dfs(neighbor): return True color[node] = BLACK return False return any (dfs(node) for node in graph if color[node] == WHITE)cyclic = {'A' : ['B' ], 'B' : ['C' ], 'C' : ['A' ]} print (f"cyclic图有环? {has_cycle(cyclic)} " ) acyclic = {'A' : ['B' ], 'B' : ['C' ], 'C' : []} print (f"acyclic图有环? {has_cycle(acyclic)} " )
五、最短路径——Dijkstra算法 5.1 直觉 贪心策略:每次选当前已知距离最短的未确定顶点
像水流一样,水先到达近的地方,再从近处继续扩展
前提条件:所有边权重 ≥ 0
5.2 时间复杂度 朴素实现:O(V²)
优先队列(最小堆):O((V+E)log V)
Python实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import heapqdef dijkstra (graph, start ):""" Dijkstra最短路径算法 graph: {node: [(neighbor, weight), ...]} 返回: distances字典 和 predecessors字典 """ distances = {start: 0 } predecessors = {start: None } pq = [(0 , start)] visited = set () while pq: dist, node = heapq.heappop(pq) if node in visited: continue visited.add(node) for neighbor, weight in graph.get(node, []): new_dist = dist + weight if new_dist < distances.get(neighbor, float ('inf' )): distances[neighbor] = new_dist predecessors[neighbor] = node heapq.heappush(pq, (new_dist, neighbor)) return distances, predecessorsdef reconstruct_path (predecessors, end ):"""根据predecessors回溯最短路径""" path = [] node = end while node is not None : path.append(node) node = predecessors.get(node) return list (reversed (path))weighted_graph = { 'A' : [('B' , 4 ), ('C' , 2 )],'B' : [('A' , 4 ), ('D' , 3 ), ('E' , 1 )],'C' : [('A' , 2 ), ('D' , 8 ), ('F' , 5 )],'D' : [('B' , 3 ), ('C' , 8 ), ('E' , 2 ), ('F' , 1 )],'E' : [('B' , 1 ), ('D' , 2 )],'F' : [('C' , 5 ), ('D' , 1 )]} dist, pred = dijkstra(weighted_graph, 'A' ) print ("从A出发的最短距离:" )for node in sorted (dist):path = reconstruct_path(pred, node) print (f" A → {node} : 距离={dist[node]} , 路径={'→' .join(path)} " )
六、拓扑排序——DAG的依赖顺序 6.1 直觉 穿衣服:先穿内裤再穿外裤,先穿袜子再穿鞋
编译依赖:A依赖B,B依赖C → 编译顺序必须 C → B → A
前提条件:有向无环图(DAG)
6.2 Kahn算法(基于入度)
找所有入度为0的顶点入队
出队一个顶点,移除它的所有出边,更新邻居入度
入度变为0的新顶点入队
重复直到队列空。如果处理了所有顶点 → 排序成功;否则 → 有环
Python实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from collections import dequedef topological_sort (graph ):""" Kahn算法拓扑排序 graph: {node: [依赖的后继节点]} 返回: 拓扑排序结果 或 None(有环) """ in_degree = {node: 0 for node in graph} for node in graph: for neighbor in graph[node]: in_degree[neighbor] = in_degree.get(neighbor, 0 ) + 1 queue = deque([n for n in in_degree if in_degree[n] == 0 ]) result = [] while queue: node = queue.popleft() result.append(node) for neighbor in graph.get(node, []): in_degree[neighbor] -= 1 if in_degree[neighbor] == 0 : queue.append(neighbor) if len (result) == len (in_degree): return result else : return None compile_deps = { 'main.py' : ['utils.py' , 'config.py' ],'utils.py' : ['config.py' ],'config.py' : [],'api.py' : ['utils.py' , 'main.py' ],} order = topological_sort(compile_deps) print (f"编译顺序: {order} " )
七、后端应用 7.1 微服务调用链(有向图) 每个微服务是顶点,RPC调用是有向边
检测循环依赖 → 环检测
调用链追踪 → BFS/DFS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 services = { 'api-gateway' : ['user-service' , 'order-service' ],'user-service' : ['db-service' , 'cache-service' ],'order-service' : ['user-service' , 'payment-service' , 'db-service' ],'payment-service' :['db-service' , 'notify-service' ],'notify-service' : [],'db-service' : [],'cache-service' : [],} print (f"有循环依赖? {has_cycle(services)} " )deploy_order = topological_sort(services) print (f"安全部署顺序: {deploy_order} " )
7.2 任务编排DAG(Airflow风格) 数据管道中各task之间的依赖关系就是DAG
拓扑排序决定执行顺序
并行度分析:同一层的task可以并行
7.3 数据库表依赖 外键关系形成有向图
拓扑排序确定安全的删表/建表顺序
八、大数据应用 8.1 社交网络分析 度中心性:度最高的人是”社交达人”
最短路径:六度分隔理论
连通分量:找出独立的社区
核心思想:一个网页的重要性取决于有多少重要网页链接它
迭代公式:PR(A) = (1-d)/N + d × Σ PR(Ti)/C(Ti)
d = 阻尼系数(通常0.85)
Ti = 链接到A的页面
C(Ti) = Ti的出链数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def pagerank (graph, damping=0.85 , iterations=50 ):""" 简易PageRank实现 graph: {node: [该节点指向的节点]} """ nodes = list (graph.keys()) n = len (nodes) rank = {node: 1.0 /n for node in nodes} for _ in range (iterations): new_rank = {} for node in nodes: incoming = [src for src in nodes if node in graph.get(src, [])] score = sum (rank[src] / len (graph[src]) for src in incoming if graph[src]) new_rank[node] = (1 - damping) / n + damping * score rank = new_rank return rankweb_graph = { 'A' : ['B' , 'C' ],'B' : ['C' ],'C' : ['A' ],'D' : ['C' ],} ranks = pagerank(web_graph) for page, score in sorted (ranks.items(), key=lambda x: -x[1 ]):print (f" {page} : {score:.4 f} " )
8.3 知识图谱 三元组:(实体, 关系, 实体) → 有向标记图
图数据库(Neo4j)做查询
九、安全应用 9.1 攻击路径分析 网络拓扑是一张图,入口点到目标资产的路径就是”攻击路径”
最短路径 = 最少步骤的攻击链
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 network = { 'internet' : [('dmz-web' , 1 )],'dmz-web' : [('app-server' , 2 ), ('dmz-mail' , 1 )],'dmz-mail' : [('app-server' , 3 )],'app-server' : [('db-server' , 1 ), ('internal-net' , 2 )],'db-server' : [('backup-server' , 1 )],'internal-net' : [('domain-controller' , 1 )],'domain-controller' : [],'backup-server' : [],} dist, pred = dijkstra(network, 'internet' ) target = 'domain-controller' path = reconstruct_path(pred, target) print (f"到{target} 的最短攻击路径:" )print (f" 路径: {'→' .join(path)} " )print (f" 代价: {dist[target]} " )
9.2 网络拓扑扫描 用BFS从已知主机探测可达性
连通分量分析:哪些网段是隔离的
9.3 调用链追踪异常 服务调用图中DFS追踪异常传播路径
找到异常的根因服务
十、用networkx快速建图和可视化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import networkx as nximport matplotlib.pyplot as pltG = nx.DiGraph() G.add_edges_from([ ('gateway' , 'user-svc' ), ('gateway' , 'order-svc' ), ('order-svc' , 'user-svc' ), ('order-svc' , 'pay-svc' ), ('user-svc' , 'db' ), ('pay-svc' , 'db' ), ('pay-svc' , 'notify-svc' ), ]) print (f"节点数: {G.number_of_nodes()} " )print (f"边数: {G.number_of_edges()} " )print (f"是否DAG: {nx.is_directed_acyclic_graph(G)} " )print (f"拓扑排序: {list (nx.topological_sort(G))} " )if nx.has_path(G, 'gateway' , 'db' ):path = nx.shortest_path(G, 'gateway' , 'db' ) print (f"gateway→db最短路径: {path} " )pos = nx.spring_layout(G, seed=42 ) plt.figure(figsize=(10 , 6 )) nx.draw(G, pos, with_labels=True , node_color='lightblue' , node_size=2000 , font_size=10 , arrows=True , arrowsize=20 , edge_color='gray' ) plt.title("微服务调用关系图" ) plt.tight_layout() plt.savefig("service_graph.png" , dpi=100 ) plt.show()
十一、练习题 练习1:图的基本概念 一个无向图有7个顶点,10条边,所有顶点度之和是多少?
答案:无向图所有顶点度之和 = 2×|E| = 2×10 = 20
练习2:BFS最短路径 在以下无向图中,从A到F的最短路径(跳数)是多少?
A-B, A-C, B-D, C-D, D-E, E-F, C-F
BFS从A出发:A(0) → B,C(1) → D,F(2)
A到F最短路径 = 2(A→C→F)
练习3:拓扑排序 任务依赖:D→B, D→C, B→A, C→A, E→C
给出一个合法的拓扑排序
入度分析:A=2, B=1, C=2, D=0, E=0
一种合法顺序:D, E, B, C, A
另一种合法顺序:E, D, B, C, A(拓扑排序不唯一)
练习4:Dijkstra 加权图:A→B(2), A→C(5), B→C(1), B→D(7), C→D(3)
求A到D的最短路径及距离
Dijkstra过程:A(0) → B(2) → C(3=2+1) → D(6=3+3)
最短路径:A→B→C→D,距离=6
练习5:安全应用 设计一个Python函数:给定微服务调用图,找出所有”单点故障”服务
(即移除后会导致某些服务不可达的服务)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def find_single_points_of_failure (graph, entry_point ):"""找出所有单点故障节点""" all_reachable = set (bfs(graph, entry_point)[0 ]) critical = [] for node in list (graph.keys()): if node == entry_point: continue temp_graph = {k: [v for v in vs if v != node] for k, vs in graph.items() if k != node} reachable = set (bfs(temp_graph, entry_point)[0 ]) if len (reachable) < len (all_reachable) - 1 : critical.append(node) return critical
十二、本章小结 图 = 顶点 + 边,描述任意事物之间的关系
邻接表适合稀疏图(大多数场景),邻接矩阵适合稠密图
BFS层层扩展求无权最短路径,DFS一路到底检测连通性和环
Dijkstra用贪心+优先队列求加权最短路径
拓扑排序处理DAG的依赖顺序(部署、编译、任务编排)
图论在后端(微服务依赖)、大数据(PageRank)、安全(攻击路径)中无处不在
➡️ 下一页:18-树与递归结构