开发者使用指南

此章以Allgather算子为例,介绍基于AclShmem工程的Allgather算子具体实现流程。

初始化所需的基础数据类型

typedef struct {
    uint32_t rank_size;              // PE总数
    uint32_t rank;                   // 当前PE编号
    void *rank_table_path{nullptr};  // 环境配置文件路径
    void *root_info{nullptr};        // hccl生成的root info
} aclshmem_init_attr_t;
typedef struct {
  int32_t my_pe;                                  // 当前PE编号
  int32_t n_pes;                                  // PE总数
  int64_t pe_base_offset[ACLSHMEM_MAX_PES];       // device侧各PE对称内存基地址偏移量
  int64_t host_pe_base_offset[ACLSHMEM_MAX_PES];  // host侧各PE对称内存基地址偏移量
} aclshmem_host_aicore_context_t;

初始化ACLSHMEM

int32_t status = 0;
// 初始化ACL
status = aclInit(nullptr);
CHECK_ACL(status, ERROR_LOG("aclInit failed"), return status);
status = aclrtSetDevice(rank_id);
CHECK_ACL(status, ERROR_LOG("aclrtSetDevice failed"), return status);
status = aclrtCreateStream(&stream);
CHECK_ACL(status, ERROR_LOG("aclrtCreateStream failed"), return status);

// 初始化ACLSHMEM
aclshmem_init_attr_t init_attr;
init_attr.rank_size = n_ranks;
init_attr.rank = rank_id;
init_attr.rank_table_path = const_cast<char *>(rank_table_path.c_str());
CHECK_ACL(aclshmem_init_attr(1, &init_attr), ERROR_LOG("aclshmem_init_attr failed"), return -1);

// 初始化context
aclshmem_host_aicore_context_t* host_aicore_context = aclshmem_get_context();
uint8_t *context;
status = aclrtMalloc((void **)&context, sizeof(aclshmem_host_aicore_context_t), ACL_MEM_MALLOC_HUGE_FIRST);
CHECK_ACL(status, ERROR_LOG("aclrtMalloc failed"), return status);
status = aclrtMemcpy(context, sizeof(aclshmem_host_aicore_context_t), host_aicore_context,
    sizeof(aclshmem_host_aicore_context_t), ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);

// 初始化对称内存
constexpr uint64_t aclshmem_base_size = 2 * 2048 * 2048;
uint8_t *ptr = static_cast<uint8_t *>(aclshmem_malloc(aclshmem_base_size));
if (ptr == nullptr) {
    ERROR_LOG("aclshmem_malloc failed");
    return -1;
}

注:可通过SYMMETRIC_SIZE环境变量设置每个PE的内存堆上限(不设置默认为1G,至少大于2M大小),aclshmem_base_size设置上限为SYMMETRIC_SIZE值大小。

初始化数据

// 划分当前PE的mask掩码和signals信号存放地址
uint8_t *mask = ptr + aclshmem_base_size / 4 * 2; // 把分配内存划分为4等份,第二等份用于存放mask
uint8_t *signals = ptr + aclshmem_base_size / 4 * 3; // 把分配内存划分为4等份,第三等份用于存放signals

// 自定义传输数据内容:此处为TRANS_SIZE大小的数组,值为当前PE编号+自定义基数NUM
int32_t status = 0;
std::vector<int32_t> input(TRANS_SIZE, 0); // TRANS_SIZE为自定义数组大小
for (int32_t i = 0; i < TRANS_SIZE; i++) {
    input[i] = rank_id + NUM;
}

// 把每个PE的传输数据拷贝到各个PE对应地址上
status = aclrtMemcpy(ptr + rank_id * TRANS_SIZE * sizeof(int32_t), TRANS_SIZE * sizeof(int32_t), input.data(),
    TRANS_SIZE * sizeof(int32_t), ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);

// 初始化mast掩码值为0(除本PE对应掩码为1)
input.reserve(n_ranks);
std::fill(input.begin(), input.end(), 0);
input[rank_id] = 1;
status = aclrtMemcpy(mask, n_ranks * sizeof(int32_t), input.data(), n_ranks * sizeof(int32_t),
    ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);

// 初始化signals信号为全0
input[rank_id] = 0;
status = aclrtMemcpy(signals, n_ranks * sizeof(int32_t), input.data(), n_ranks * sizeof(int32_t),
    ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);

kernel实现

extern "C" __global__ __aicore__ void device_all_gather_test(GM_ADDR gva, int32_t message_length, GM_ADDR context,
    GM_ADDR mask, GM_ADDR signals)
{
    if ASCEND_IS_AIC {
        return;
    }
    if (AscendC::GetBlockIdx() > 0) {
        return;
    }

    // device侧初始化context
    aclshmem_context_init(context);

    // device侧初始化
    constexpr uint32_t BYTE_LEN = 192;
    AscendC::TPipe pipe;
    AscendC::TBuf<AscendC::TPosition::VECOUT> buf;
    pipe.InitBuffer(buf, BYTE_LEN);
    // 需要用户指定一个长度大于等于64字节的LocalTensor用于RMA任务下发
    AscendC::LocalTensor<uint8_t> ub_local = buf.Get<uint8_t>();
    aclshmem_aicore_init((__ubuf__ uint8_t*)(ub_local.GetPhyAddr()), BYTE_LEN, 5);

    int64_t my_rank = aclshmem_my_pe(); // 获取当前PE编号
    int64_t pe_size = aclshmem_n_pes(); // 获取PE总数量
    uint32_t signal = 1;  // 设置信号量
    int32_t sig_op = static_cast<int32_t>(AIC_SIGNAL_OP::SIGNAL_SET); // 设置信号类型
    uint64_t ele_num = static_cast<uint64_t>(message_length / sizeof(int32_t)); // 计算传输数据元素个数

    // 计算目的地址及源地址偏移
    auto dest = (__gm__ int32_t *)(gva + message_length * my_rank);
    auto source = dest;

    // All Gather简易实现
    for (int32_t i = 0; i < pe_size; i++) {
        if (i == my_rank) {
            continue;
        }
        // 将本PE的数据发送给其他所有PE,并在目标PE的signals中本PE对应的信号位标记完成
        aclshmem_int32_put_signal(dest, source, ele_num, ((__gm__ uint32_t *)signals) + my_rank, signal, sig_op, i);
    }

    // 等待本PE接收到其他所有PE的消息后结束
    aclshmem_uint32_wait_until_all((__gm__ uint32_t*)signals, pe_size, (__gm__ const int32_t *)mask, AIC_CMP_TYPE::EQ,
        signal);
}

结果校验

int32_t status = 0;
// 在host侧分配mask掩码存放内存大小,将device侧掩码数据拷贝至host侧,并打印
int32_t *mask_host;
status = aclrtMallocHost(reinterpret_cast<void **>(&mask_host), n_ranks * sizeof(int32_t));
CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status);
status = aclrtMemcpy(mask_host, n_ranks * sizeof(int32_t), mask, n_ranks * sizeof(int32_t),
    ACL_MEMCPY_DEVICE_TO_HOST);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
std::cout << "Rank-" << rank_id << " mask: [";
for (int32_t j = 0; j < n_ranks; j++) {
    std::cout << mask_host[j] << ", ";
}
std::cout << "]" << std::endl;

// 在host侧分配signals信号存放内存大小,将device侧signals数据拷贝至host侧,并打印
int32_t *signals_host;
status = aclrtMallocHost(reinterpret_cast<void **>(&signals_host), n_ranks * sizeof(int32_t));
CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status);
status = aclrtMemcpy(signals_host, n_ranks * sizeof(int32_t), signals, n_ranks * sizeof(int32_t),
    ACL_MEMCPY_DEVICE_TO_HOST);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);
std::cout << "Rank-" << rank_id << " signals: [";
for (int32_t j = 0; j < n_ranks; j++) {
    std::cout << signals_host[j] << ", ";
}
std::cout << "]" << std::endl;

// 在host侧分配Allgather传输数据存放内存大小,将device侧Allgather传输数据结果拷贝至host侧
int32_t *y_host;
size_t input_size = n_ranks * TRANS_SIZE * sizeof(int32_t);
status = aclrtMallocHost(reinterpret_cast<void **>(&y_host), input_size);
CHECK_ACL(status, ERROR_LOG("aclrtMallocHost failed"), return status);
status = aclrtMemcpy(y_host, input_size, ptr, input_size, ACL_MEMCPY_DEVICE_TO_HOST);
CHECK_ACL(status, ERROR_LOG("aclrtMemcpy failed"), return status);

// 校验Allgather结果是否符合预期
bool correct_result = true;
for (int32_t i = 0; i < n_ranks; i++) {
    if (y_host[TRANS_SIZE * i] != i + NUM) {
        std::cout << "Data of rank " << rank_id << " from " << i << ": " << y_host[TRANS_SIZE * i]
            << " != " << i + NUM << std::endl;
        correct_result = false;
    }
}

// 打印Allgather结果数据
std::cout << "Rank-" << rank_id << " data: [";
for (int32_t j = 0; j < TRANS_SIZE * n_ranks; j++) {
    std::cout << y_host[j] << ", ";
}
std::cout << "]" << std::endl;

// 打印结果校验是否成功
if (rank_id == 0 && correct_result) {
    INFO_LOG("[SUCCESS] Result Correct!");
}

清理释放资源

status = aclrtFreeHost(y_host); // 释放host侧数据内存
CHECK_ACL(status, ERROR_LOG("aclrtFreeHost failed"), return status);

// 清理ACLSHMEM相关资源
aclshmem_free(ptr); // aclshmem内存释放接口调用
status = aclrtFree(context);
CHECK_ACL(status, ERROR_LOG("aclrtFree failed"), return status);
aclshmem_finalize(); // aclshmem销毁接口调用

// 清理ACL相关资源
status = aclrtDestroyStream(stream);
CHECK_ACL(status, ERROR_LOG("aclrtDestroyStream failed"), return status);
status = aclrtResetDevice(rank_id);
CHECK_ACL(status, ERROR_LOG("aclrtResetDevice failed"), return status);
status = aclFinalize();
CHECK_ACL(status, ERROR_LOG("aclFinalize failed"), return status);