开发者使用指南
此章以Allgather算子为例,介绍基于AclShmem工程的Allgather算子具体实现流程。
初始化所需的基础数据类型
typedef struct {
uint32_t rank_size; // PE总数
uint32_t rank; // 当前PE编号
void *rank_table_path{nullptr}; // 环境配置文件路径
void *root_info{nullptr}; // hccl生成的root info
} aclshmem_init_attr_t;
typedef struct {
int32_t my_pe; // 当前PE编号
int32_t n_pes; // PE总数
int64_t pe_base_offset[ACLSHMEM_MAX_PES]; // device侧各PE对称内存基地址偏移量
int64_t host_pe_base_offset[ACLSHMEM_MAX_PES]; // host侧各PE对称内存基地址偏移量
} aclshmem_host_aicore_context_t;
初始化ACLSHMEM
int32_t status = 0;
// 初始化ACL
status = aclInit(nullptr);
CHECK_ACL(status, ERROR_LOG("aclInit failed"), return status);
status = aclrtSetDevice(rank_id);
CHECK_ACL(status, ERROR_LOG("aclrtSetDevice failed"), return status);
status = aclrtCreateStream(&stream);
CHECK_ACL(status, ERROR_LOG("aclrtCreateStream failed"), return status);
// 初始化ACLSHMEM
aclshmem_init_attr_t init_attr;
init_attr.rank_size = n_ranks;
init_attr.rank = rank_id;
init_attr.rank_table_path = const_cast<char *>(rank_table_path.c_str());
CHECK_ACL(aclshmem_init_attr(1, &init_attr), ERROR_LOG("aclshmem_init_attr failed"), return -1);
// 初始化context
aclshmem_host_aicore_context_t* host_aicore_context = aclshmem_get_context();
uint8_t *context;
status = aclrtMalloc((void **)&context, sizeof(aclshmem_host_aicore_context_t), ACL_MEM_MALLOC_HUGE_FIRST);
CHECK_ACL(status, ERROR_LOG("aclrtMalloc failed"), return status);
status = aclrtMemcpy(context, sizeof(aclshmem_host_aicore_context_t), host_aicore_context,
sizeof(aclshmem_host_aicore_context_t), ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
// 初始化对称内存
constexpr uint64_t aclshmem_base_size = 2 * 2048 * 2048;
uint8_t *ptr = static_cast<uint8_t *>(aclshmem_malloc(aclshmem_base_size));
if (ptr == nullptr) {
ERROR_LOG("aclshmem_malloc failed");
return -1;
}
注:可通过SYMMETRIC_SIZE环境变量设置每个PE的内存堆上限(不设置默认为1G,至少大于2M大小),aclshmem_base_size设置上限为SYMMETRIC_SIZE值大小。
初始化数据
// 划分当前PE的mask掩码和signals信号存放地址
uint8_t *mask = ptr + aclshmem_base_size / 4 * 2; // 把分配内存划分为4等份,第二等份用于存放mask
uint8_t *signals = ptr + aclshmem_base_size / 4 * 3; // 把分配内存划分为4等份,第三等份用于存放signals
// 自定义传输数据内容:此处为TRANS_SIZE大小的数组,值为当前PE编号+自定义基数NUM
int32_t status = 0;
std::vector<int32_t> input(TRANS_SIZE, 0); // TRANS_SIZE为自定义数组大小
for (int32_t i = 0; i < TRANS_SIZE; i++) {
input[i] = rank_id + NUM;
}
// 把每个PE的传输数据拷贝到各个PE对应地址上
status = aclrtMemcpy(ptr + rank_id * TRANS_SIZE * sizeof(int32_t), TRANS_SIZE * sizeof(int32_t), input.data(),
TRANS_SIZE * sizeof(int32_t), ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
// 初始化mast掩码值为0(除本PE对应掩码为1)
input.reserve(n_ranks);
std::fill(input.begin(), input.end(), 0);
input[rank_id] = 1;
status = aclrtMemcpy(mask, n_ranks * sizeof(int32_t), input.data(), n_ranks * sizeof(int32_t),
ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
// 初始化signals信号为全0
input[rank_id] = 0;
status = aclrtMemcpy(signals, n_ranks * sizeof(int32_t), input.data(), n_ranks * sizeof(int32_t),
ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
kernel实现
extern "C" __global__ __aicore__ void device_all_gather_test(GM_ADDR gva, int32_t message_length, GM_ADDR context,
GM_ADDR mask, GM_ADDR signals)
{
if ASCEND_IS_AIC {
return;
}
if (AscendC::GetBlockIdx() > 0) {
return;
}
// device侧初始化context
aclshmem_context_init(context);
// device侧初始化
constexpr uint32_t BYTE_LEN = 192;
AscendC::TPipe pipe;
AscendC::TBuf<AscendC::TPosition::VECOUT> buf;
pipe.InitBuffer(buf, BYTE_LEN);
// 需要用户指定一个长度大于等于64字节的LocalTensor用于RMA任务下发
AscendC::LocalTensor<uint8_t> ub_local = buf.Get<uint8_t>();
aclshmem_aicore_init((__ubuf__ uint8_t*)(ub_local.GetPhyAddr()), BYTE_LEN, 5);
int64_t my_rank = aclshmem_my_pe(); // 获取当前PE编号
int64_t pe_size = aclshmem_n_pes(); // 获取PE总数量
uint32_t signal = 1; // 设置信号量
int32_t sig_op = static_cast<int32_t>(AIC_SIGNAL_OP::SIGNAL_SET); // 设置信号类型
uint64_t ele_num = static_cast<uint64_t>(message_length / sizeof(int32_t)); // 计算传输数据元素个数
// 计算目的地址及源地址偏移
auto dest = (__gm__ int32_t *)(gva + message_length * my_rank);
auto source = dest;
// All Gather简易实现
for (int32_t i = 0; i < pe_size; i++) {
if (i == my_rank) {
continue;
}
// 将本PE的数据发送给其他所有PE,并在目标PE的signals中本PE对应的信号位标记完成
aclshmem_int32_put_signal(dest, source, ele_num, ((__gm__ uint32_t *)signals) + my_rank, signal, sig_op, i);
}
// 等待本PE接收到其他所有PE的消息后结束
aclshmem_uint32_wait_until_all((__gm__ uint32_t*)signals, pe_size, (__gm__ const int32_t *)mask, AIC_CMP_TYPE::EQ,
signal);
}
结果校验
int32_t status = 0;
// 在host侧分配mask掩码存放内存大小,将device侧掩码数据拷贝至host侧,并打印
int32_t *mask_host;
status = aclrtMallocHost(reinterpret_cast<void **>(&mask_host), n_ranks * sizeof(int32_t));
CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status);
status = aclrtMemcpy(mask_host, n_ranks * sizeof(int32_t), mask, n_ranks * sizeof(int32_t),
ACL_MEMCPY_DEVICE_TO_HOST);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
std::cout << "Rank-" << rank_id << " mask: [";
for (int32_t j = 0; j < n_ranks; j++) {
std::cout << mask_host[j] << ", ";
}
std::cout << "]" << std::endl;
// 在host侧分配signals信号存放内存大小,将device侧signals数据拷贝至host侧,并打印
int32_t *signals_host;
status = aclrtMallocHost(reinterpret_cast<void **>(&signals_host), n_ranks * sizeof(int32_t));
CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status);
status = aclrtMemcpy(signals_host, n_ranks * sizeof(int32_t), signals, n_ranks * sizeof(int32_t),
ACL_MEMCPY_DEVICE_TO_HOST);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
std::cout << "Rank-" << rank_id << " signals: [";
for (int32_t j = 0; j < n_ranks; j++) {
std::cout << signals_host[j] << ", ";
}
std::cout << "]" << std::endl;
// 在host侧分配Allgather传输数据存放内存大小,将device侧Allgather传输数据结果拷贝至host侧
int32_t *y_host;
size_t input_size = n_ranks * TRANS_SIZE * sizeof(int32_t);
status = aclrtMallocHost(reinterpret_cast<void **>(&y_host), input_size);
CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status);
status = aclrtMemcpy(y_host, input_size, ptr, input_size, ACL_MEMCPY_DEVICE_TO_HOST);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
// 校验Allgather结果是否符合预期
bool correct_result = true;
for (int32_t i = 0; i < n_ranks; i++) {
if (y_host[TRANS_SIZE * i] != i + NUM) {
std::cout << "Data of rank " << rank_id << " from " << i << ": " << y_host[TRANS_SIZE * i]
<< " != " << i + NUM << std::endl;
correct_result = false;
}
}
// 打印Allgather结果数据
std::cout << "Rank-" << rank_id << " data: [";
for (int32_t j = 0; j < TRANS_SIZE * n_ranks; j++) {
std::cout << y_host[j] << ", ";
}
std::cout << "]" << std::endl;
// 打印结果校验是否成功
if (rank_id == 0 && correct_result) {
INFO_LOG("[SUCCESS] Result Correct!");
}
清理释放资源
status = aclrtFreeHost(y_host); // 释放host侧数据内存
CHECK_ACL(status, ERROR_LOG("aclrtFreeHost failed"), return status);
// 清理ACLSHMEM相关资源
aclshmem_free(ptr); // aclshmem内存释放接口调用
status = aclrtFree(context);
CHECK_ACL(status, ERROR_LOG("aclrtFree failed"), return status);
aclshmem_finalize(); // aclshmem销毁接口调用
// 清理ACL相关资源
status = aclrtDestroyStream(stream);
CHECK_ACL(status, ERROR_LOG("aclrtDestroyStream failed"), return status);
status = aclrtResetDevice(rank_id);
CHECK_ACL(status, ERROR_LOG("aclrtResetDevice failed"), return status);
status = aclFinalize();
CHECK_ACL(status, ERROR_LOG("aclFinalize failed"), return status);