# 开发者使用指南 此章以Allgather算子为例,介绍基于AclShmem工程的[Allgather算子](https://gitcode.com/cann/aclshmem-dev/blob/master/examples/allgather/README.md)具体实现流程。 ## 初始化所需的基础数据类型 ```c++ typedef struct { uint32_t rank_size; // PE总数 uint32_t rank; // 当前PE编号 void *rank_table_path{nullptr}; // 环境配置文件路径 void *root_info{nullptr}; // hccl生成的root info } aclshmem_init_attr_t; ``` ```c++ typedef struct { int32_t my_pe; // 当前PE编号 int32_t n_pes; // PE总数 int64_t pe_base_offset[ACLSHMEM_MAX_PES]; // device侧各PE对称内存基地址偏移量 int64_t host_pe_base_offset[ACLSHMEM_MAX_PES]; // host侧各PE对称内存基地址偏移量 } aclshmem_host_aicore_context_t; ``` ## 初始化ACLSHMEM ```c++ int32_t status = 0; // 初始化ACL status = aclInit(nullptr); CHECK_ACL(status, ERROR_LOG("aclInit failed"), return status); status = aclrtSetDevice(rank_id); CHECK_ACL(status, ERROR_LOG("aclrtSetDevice failed"), return status); status = aclrtCreateStream(&stream); CHECK_ACL(status, ERROR_LOG("aclrtCreateStream failed"), return status); // 初始化ACLSHMEM aclshmem_init_attr_t init_attr; init_attr.rank_size = n_ranks; init_attr.rank = rank_id; init_attr.rank_table_path = const_cast(rank_table_path.c_str()); CHECK_ACL(aclshmem_init_attr(1, &init_attr), ERROR_LOG("aclshmem_init_attr failed"), return -1); // 初始化context aclshmem_host_aicore_context_t* host_aicore_context = aclshmem_get_context(); uint8_t *context; status = aclrtMalloc((void **)&context, sizeof(aclshmem_host_aicore_context_t), ACL_MEM_MALLOC_HUGE_FIRST); CHECK_ACL(status, ERROR_LOG("aclrtMalloc failed"), return status); status = aclrtMemcpy(context, sizeof(aclshmem_host_aicore_context_t), host_aicore_context, sizeof(aclshmem_host_aicore_context_t), ACL_MEMCPY_HOST_TO_DEVICE); CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status); // 初始化对称内存 constexpr uint64_t aclshmem_base_size = 2 * 2048 * 2048; uint8_t *ptr = static_cast(aclshmem_malloc(aclshmem_base_size)); if (ptr == nullptr) { ERROR_LOG("aclshmem_malloc failed"); return -1; } ``` 注:可通过SYMMETRIC_SIZE环境变量设置每个PE的内存堆上限(不设置默认为1G,至少大于2M大小),aclshmem_base_size设置上限为SYMMETRIC_SIZE值大小。 ## 初始化数据 ```c++ // 划分当前PE的mask掩码和signals信号存放地址 uint8_t *mask = ptr + aclshmem_base_size / 4 * 2; // 把分配内存划分为4等份,第二等份用于存放mask uint8_t *signals = ptr + aclshmem_base_size / 4 * 3; // 把分配内存划分为4等份,第三等份用于存放signals // 自定义传输数据内容:此处为TRANS_SIZE大小的数组,值为当前PE编号+自定义基数NUM int32_t status = 0; std::vector input(TRANS_SIZE, 0); // TRANS_SIZE为自定义数组大小 for (int32_t i = 0; i < TRANS_SIZE; i++) { input[i] = rank_id + NUM; } // 把每个PE的传输数据拷贝到各个PE对应地址上 status = aclrtMemcpy(ptr + rank_id * TRANS_SIZE * sizeof(int32_t), TRANS_SIZE * sizeof(int32_t), input.data(), TRANS_SIZE * sizeof(int32_t), ACL_MEMCPY_HOST_TO_DEVICE); CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status); // 初始化mast掩码值为0(除本PE对应掩码为1) input.reserve(n_ranks); std::fill(input.begin(), input.end(), 0); input[rank_id] = 1; status = aclrtMemcpy(mask, n_ranks * sizeof(int32_t), input.data(), n_ranks * sizeof(int32_t), ACL_MEMCPY_HOST_TO_DEVICE); CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status); // 初始化signals信号为全0 input[rank_id] = 0; status = aclrtMemcpy(signals, n_ranks * sizeof(int32_t), input.data(), n_ranks * sizeof(int32_t), ACL_MEMCPY_HOST_TO_DEVICE); CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status); ``` ## kernel实现 ```c++ extern "C" __global__ __aicore__ void device_all_gather_test(GM_ADDR gva, int32_t message_length, GM_ADDR context, GM_ADDR mask, GM_ADDR signals) { if ASCEND_IS_AIC { return; } if (AscendC::GetBlockIdx() > 0) { return; } // device侧初始化context aclshmem_context_init(context); // device侧初始化 constexpr uint32_t BYTE_LEN = 192; AscendC::TPipe pipe; AscendC::TBuf buf; pipe.InitBuffer(buf, BYTE_LEN); // 需要用户指定一个长度大于等于64字节的LocalTensor用于RMA任务下发 AscendC::LocalTensor ub_local = buf.Get(); aclshmem_aicore_init((__ubuf__ uint8_t*)(ub_local.GetPhyAddr()), BYTE_LEN, 5); int64_t my_rank = aclshmem_my_pe(); // 获取当前PE编号 int64_t pe_size = aclshmem_n_pes(); // 获取PE总数量 uint32_t signal = 1; // 设置信号量 int32_t sig_op = static_cast(AIC_SIGNAL_OP::SIGNAL_SET); // 设置信号类型 uint64_t ele_num = static_cast(message_length / sizeof(int32_t)); // 计算传输数据元素个数 // 计算目的地址及源地址偏移 auto dest = (__gm__ int32_t *)(gva + message_length * my_rank); auto source = dest; // All Gather简易实现 for (int32_t i = 0; i < pe_size; i++) { if (i == my_rank) { continue; } // 将本PE的数据发送给其他所有PE,并在目标PE的signals中本PE对应的信号位标记完成 aclshmem_int32_put_signal(dest, source, ele_num, ((__gm__ uint32_t *)signals) + my_rank, signal, sig_op, i); } // 等待本PE接收到其他所有PE的消息后结束 aclshmem_uint32_wait_until_all((__gm__ uint32_t*)signals, pe_size, (__gm__ const int32_t *)mask, AIC_CMP_TYPE::EQ, signal); } ``` ## 结果校验 ```c++ int32_t status = 0; // 在host侧分配mask掩码存放内存大小,将device侧掩码数据拷贝至host侧,并打印 int32_t *mask_host; status = aclrtMallocHost(reinterpret_cast(&mask_host), n_ranks * sizeof(int32_t)); CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status); status = aclrtMemcpy(mask_host, n_ranks * sizeof(int32_t), mask, n_ranks * sizeof(int32_t), ACL_MEMCPY_DEVICE_TO_HOST); CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status); std::cout << "Rank-" << rank_id << " mask: ["; for (int32_t j = 0; j < n_ranks; j++) { std::cout << mask_host[j] << ", "; } std::cout << "]" << std::endl; // 在host侧分配signals信号存放内存大小,将device侧signals数据拷贝至host侧,并打印 int32_t *signals_host; status = aclrtMallocHost(reinterpret_cast(&signals_host), n_ranks * sizeof(int32_t)); CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status); status = aclrtMemcpy(signals_host, n_ranks * sizeof(int32_t), signals, n_ranks * sizeof(int32_t), ACL_MEMCPY_DEVICE_TO_HOST); CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status); std::cout << "Rank-" << rank_id << " signals: ["; for (int32_t j = 0; j < n_ranks; j++) { std::cout << signals_host[j] << ", "; } std::cout << "]" << std::endl; // 在host侧分配Allgather传输数据存放内存大小,将device侧Allgather传输数据结果拷贝至host侧 int32_t *y_host; size_t input_size = n_ranks * TRANS_SIZE * sizeof(int32_t); status = aclrtMallocHost(reinterpret_cast(&y_host), input_size); CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status); status = aclrtMemcpy(y_host, input_size, ptr, input_size, ACL_MEMCPY_DEVICE_TO_HOST); CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status); // 校验Allgather结果是否符合预期 bool correct_result = true; for (int32_t i = 0; i < n_ranks; i++) { if (y_host[TRANS_SIZE * i] != i + NUM) { std::cout << "Data of rank " << rank_id << " from " << i << ": " << y_host[TRANS_SIZE * i] << " != " << i + NUM << std::endl; correct_result = false; } } // 打印Allgather结果数据 std::cout << "Rank-" << rank_id << " data: ["; for (int32_t j = 0; j < TRANS_SIZE * n_ranks; j++) { std::cout << y_host[j] << ", "; } std::cout << "]" << std::endl; // 打印结果校验是否成功 if (rank_id == 0 && correct_result) { INFO_LOG("[SUCCESS] Result Correct!"); } ``` ## 清理释放资源 ```c++ status = aclrtFreeHost(y_host); // 释放host侧数据内存 CHECK_ACL(status, ERROR_LOG("aclrtFreeHost failed"), return status); // 清理ACLSHMEM相关资源 aclshmem_free(ptr); // aclshmem内存释放接口调用 status = aclrtFree(context); CHECK_ACL(status, ERROR_LOG("aclrtFree failed"), return status); aclshmem_finalize(); // aclshmem销毁接口调用 // 清理ACL相关资源 status = aclrtDestroyStream(stream); CHECK_ACL(status, ERROR_LOG("aclrtDestroyStream failed"), return status); status = aclrtResetDevice(rank_id); CHECK_ACL(status, ERROR_LOG("aclrtResetDevice failed"), return status); status = aclFinalize(); CHECK_ACL(status, ERROR_LOG("aclFinalize failed"), return status); ```