voidBFS_sequential(int S, int *row_ptr, int *col_idx, int *level, int num_nodes){ std::queue<int> q; q.push(S); level[S] = 0; // 初始化其他为 -1
while (!q.empty()) { int u = q.front(); q.pop(); // 遍历所有邻居 v for (int i = row_ptr[u]; i < row_ptr[u+1]; i++) { int v = col_idx[i]; // 如果 v 未被访问 if (level[v] == -1) { level[v] = level[u] + 1; q.push(v); } } } }
__global__ void BFS_kernel_queue(int *row_ptr, int *col_idx, int *level, int *current_queue, int current_size, int *next_queue, int *next_size, int current_depth) { int tid = blockIdx.x * blockDim.x + threadIdx.x; if (tid < current_size) { int u = current_queue[tid]; // 从队列取节点 // 遍历邻居 int start = row_ptr[u]; int end = row_ptr[u + 1]; for (int i = start; i < end; i++) { int v = col_idx[i]; // 只有第一个访问的线程能成功更新 if (atomicCAS(&level[v], -1, current_depth + 1) == -1) { // 原子地获取队列位置并插入 int pos = atomicAdd(next_size, 1); next_queue[pos] = v; } } } }
__global__ void BFS_kernel_privatized( int *row_ptr, int *col_idx, int *level, int *current_queue, int current_size, int *next_queue, int *next_size, int current_depth) { __shared__ int s_queue[BLOCK_SIZE * 2]; // 局部队列(预留2倍空间) __shared__ int s_tail; // 局部队列计数器 // 初始化局部队列 if (threadIdx.x == 0) { s_tail = 0; } __syncthreads(); int tid = blockIdx.x * blockDim.x + threadIdx.x; if (tid < current_size) { int u = current_queue[tid]; // 遍历邻居 int start = row_ptr[u]; int end = row_ptr[u + 1]; for (int i = start; i < end; i++) { int v = col_idx[i]; // 尝试更新距离 if (atomicCAS(&level[v], -1, current_depth + 1) == -1) { // 成功更新,加入局部队列 int s_pos = atomicAdd(&s_tail, 1); if (s_pos < BLOCK_SIZE * 2) { s_queue[s_pos] = v; } else { // 局部队列满了,直接写全局(降级) int g_pos = atomicAdd(next_size, 1); next_queue[g_pos] = v; } } } } __syncthreads(); // Block 内线程协作:将局部队列拷贝到全局 int num_local = min(s_tail, BLOCK_SIZE * 2); if (num_local > 0) { __shared__ int g_offset; // 一次性申请全局空间 if (threadIdx.x == 0) { g_offset = atomicAdd(next_size, num_local); } __syncthreads(); // 并行拷贝 for (int i = threadIdx.x; i < num_local; i += BLOCK_SIZE) { next_queue[g_offset + i] = s_queue[i]; } } }
性能分析:
操作
布尔方法
队列朴素
队列私有化
启动线程数
num_nodes
frontier 大小
frontier 大小
全局原子操作
0(但有竞争)
每条边
每 Block
共享内存原子
0
0
每条边
工作效率
极低
中等
高
相对性能
1×
5×
15×
关键改进:
全局原子操作从 O(边数) 降到 O(Block数)
共享内存原子操作比全局快 20 倍
批量拷贝提高内存带宽利用率
优化三:方向优化 BFS
Top-Down vs Bottom-Up
这是 BFS 优化中最重要的策略之一,源自 Scott Beamer 等人的开创性工作。
Top-Down(Push)模式
传统的 BFS 是**推(Push)**模式:从前沿节点出发,推送更新到邻居。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
__global__ void BFS_top_down(int *row_ptr, int *col_idx, int *level, int *frontier, int frontier_size, int *next_frontier, int *next_size, int depth) { int tid = blockIdx.x * blockDim.x + threadIdx.x; if (tid < frontier_size) { int u = frontier[tid]; // Push:检查 u 的所有邻居 for (int i = row_ptr[u]; i < row_ptr[u + 1]; i++) { int v = col_idx[i]; if (atomicCAS(&level[v], -1, depth + 1) == -1) { int pos = atomicAdd(next_size, 1); next_frontier[pos] = v; } } } }
__global__ void BFS_bottom_up(int *row_ptr, int *col_idx, int *level, bool *frontier_map, int num_nodes, bool *next_frontier_map, int *found_count, int depth) { int v = blockIdx.x * blockDim.x + threadIdx.x; if (v < num_nodes && level[v] == -1) { // 未访问节点 // Pull:检查 v 的邻居中是否有在前沿的 int start = row_ptr[v]; int end = row_ptr[v + 1]; for (int i = start; i < end; i++) { int u = col_idx[i]; // 如果邻居 u 在当前前沿 if (frontier_map[u]) { level[v] = depth + 1; next_frontier_map[v] = true; atomicAdd(found_count, 1); break; // 找到一个就够了,不需要继续 } } } }
__device__ void process_node_warp(int u, int *row_ptr, int *col_idx, int *level, int *next_frontier, int *next_size, int depth) { int lane = threadIdx.x % 32; int start = row_ptr[u]; int end = row_ptr[u + 1]; __shared__ int warp_queue[32]; __shared__ int warp_tail; if (lane == 0) warp_tail = 0; __syncwarp(); // Warp 内线程协作遍历邻居 for (int i = start + lane; i < end; i += 32) { int v = col_idx[i]; if (atomicCAS(&level[v], -1, depth + 1) == -1) { int pos = atomicAdd(&warp_tail, 1); if (pos < 32) { warp_queue[pos] = v; } } } __syncwarp(); // Warp 的第一个线程提交到全局 if (lane == 0 && warp_tail > 0) { int offset = atomicAdd(next_size, warp_tail); for (int i = 0; i < warp_tail; i++) { next_frontier[offset + i] = warp_queue[i]; } } }
__device__ void process_neighbors_warp_optimized( int start, int end, int *col_idx, int *level, int *next_frontier, int *next_size, int depth) { int lane = threadIdx.x % 32; int found = 0; // Warp 协作遍历 for (int i = start + lane; i < end; i += 32) { int v = col_idx[i]; if (atomicCAS(&level[v], -1, depth + 1) == -1) { found = v; } } // 使用 ballot 统计有多少线程找到了新节点 unsigned mask = __ballot_sync(0xffffffff, found != 0); int count = __popc(mask); if (count > 0) { // 第一个有效线程申请空间 __shared__ int warp_offset; if (lane == __ffs(mask) - 1) { warp_offset = atomicAdd(next_size, count); } __syncwarp(); // 使用前缀和确定每个线程的位置 if (found != 0) { unsigned preceding = mask & ((1u << lane) - 1); int local_pos = __popc(preceding); next_frontier[warp_offset + local_pos] = found; } } }
优势:
减少原子操作到每 Warp 最多一次
利用 Warp 内隐式同步
紧凑存储,无空隙
终止条件检测
问题
每次迭代都需要检查前沿是否为空,传统方法需要:
1 2
cudaMemcpy(&h_size, d_queue_size, sizeof(int), cudaMemcpyDeviceToHost); if (h_size == 0) break;
__global__ void BFS_optimized( int *row_ptr, int *col_idx, int *level, int *current_queue, int current_size, int *next_queue, int *next_size, bool *frontier_bitmap, // 用于 Bottom-Up int depth, bool top_down_mode) { __shared__ int s_queue[LOCAL_QUEUE_SIZE]; __shared__ int s_tail; if (threadIdx.x == 0) s_tail = 0; __syncthreads(); if (top_down_mode) { // ========== Top-Down 模式 ========== int tid = blockIdx.x * blockDim.x + threadIdx.x; int warp_id = tid / WARP_SIZE; int lane = tid % WARP_SIZE; if (tid < current_size) { int u = current_queue[tid]; int degree = row_ptr[u + 1] - row_ptr[u]; if (degree < WARP_SIZE) { // 单线程处理小度数节点 for (int i = row_ptr[u]; i < row_ptr[u + 1]; i++) { int v = col_idx[i]; if (atomicCAS(&level[v], -1, depth + 1) == -1) { int pos = atomicAdd(&s_tail, 1); if (pos < LOCAL_QUEUE_SIZE) { s_queue[pos] = v; } } } } else { // Warp 协作处理大度数节点 for (int i = row_ptr[u] + lane; i < row_ptr[u + 1]; i += WARP_SIZE) { int v = col_idx[i]; if (atomicCAS(&level[v], -1, depth + 1) == -1) { int pos = atomicAdd(&s_tail, 1); if (pos < LOCAL_QUEUE_SIZE) { s_queue[pos] = v; } } } } } } else { // ========== Bottom-Up 模式 ========== int v = blockIdx.x * blockDim.x + threadIdx.x; if (v < num_nodes && level[v] == -1) { for (int i = row_ptr[v]; i < row_ptr[v + 1]; i++) { int u = col_idx[i]; if (frontier_bitmap[u]) { level[v] = depth + 1; int pos = atomicAdd(&s_tail, 1); if (pos < LOCAL_QUEUE_SIZE) { s_queue[pos] = v; } break; } } } } __syncthreads(); // 批量提交局部队列到全局 if (s_tail > 0) { __shared__ int g_offset; if (threadIdx.x == 0) { g_offset = atomicAdd(next_size, s_tail); } __syncthreads(); for (int i = threadIdx.x; i < s_tail; i += BLOCK_SIZE) { next_queue[g_offset + i] = s_queue[i]; } } }
性能分析与对比
不同优化的累计效果
以美国道路网络图为例:
优化阶段
时间(ms)
单阶段加速
累计加速
朴素布尔前沿
8500
1×
1×
稀疏队列
2100
4.0×
4.0×
+ 私有化队列
620
3.4×
13.7×
+ 线程束协作
380
1.6×
22.4×
+ 方向优化
95
4.0×
89.5×
+ CUB DeviceSelect优化
75
1.3×
113×
实际测试环境
硬件配置:
GPU:NVIDIA RTX 3090(10496 CUDA 核心,24GB GDDR6X)
CPU:Intel i9-12900K(用于对比)
CUDA 版本:11.8
编译选项:nvcc -O3 -arch=sm_86
测试图数据集:
社交网络:Twitter 图(4100万节点,14.7亿条边)
路网:USA Road Network(2300万节点,5800万条边)
随机图:RMAT Scale-26(6700万节点,5.3亿条边)
说明:性能数据为10次运行的平均值,不包括图数据传输时间。
各优化的瓶颈分析
版本
主要瓶颈
带宽利用率
占用率
布尔前沿
分支发散、浪费
5%
10%
朴素队列
全局原子争用
15%
35%
私有化
共享内存原子
45%
60%
Warp 协作
负载不均
65%
75%
方向优化
接近最优
85%
80%
应用扩展
单源最短路径(SSSP)
BFS 可以扩展到加权图的最短路径:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
__global__ void SSSP_kernel(int *row_ptr, int *col_idx, float *weights, float *dist, bool *updated, int num_nodes) { int u = blockIdx.x * blockDim.x + threadIdx.x; if (u < num_nodes && updated[u]) { updated[u] = false; for (int i = row_ptr[u]; i < row_ptr[u + 1]; i++) { int v = col_idx[i]; float new_dist = dist[u] + weights[i]; // 使用原子操作更新更短的距离 atomicMin_float(&dist[v], new_dist); updated[v] = true; } } }
__global__ void connected_components_kernel( int *row_ptr, int *col_idx, int *component, bool *changed, int num_nodes) { int u = blockIdx.x * blockDim.x + threadIdx.x; if (u < num_nodes) { int my_component = component[u]; for (int i = row_ptr[u]; i < row_ptr[u + 1]; i++) { int v = col_idx[i]; int neighbor_component = component[v]; // 取较小的分量标签 if (neighbor_component < my_component) { atomicMin(&component[u], neighbor_component); *changed = true; } } } }
__global__ void triangle_count_kernel( int *row_ptr, int *col_idx, int num_nodes, unsigned long long *count) { int u = blockIdx.x * blockDim.x + threadIdx.x; if (u < num_nodes) { int u_start = row_ptr[u]; int u_end = row_ptr[u + 1]; // 遍历 u 的邻居 v for (int i = u_start; i < u_end; i++) { int v = col_idx[i]; if (v > u) { // 避免重复计数 int v_start = row_ptr[v]; int v_end = row_ptr[v + 1]; // 找 u 和 v 的共同邻居 int j = u_start, k = v_start; while (j < u_end && k < v_end) { int u_neighbor = col_idx[j]; int v_neighbor = col_idx[k]; if (u_neighbor == v_neighbor && u_neighbor > v) { atomicAdd(count, 1ULL); j++; k++; } else if (u_neighbor < v_neighbor) { j++; } else { k++; } } } } } }