需求背景

在多线程编程中,经常会用到线程同步,如某一个完整任务被切分为多个子任务,在多线程中并行处理时,需要同步等待所有线程将子任务都处理完成,然后只有其中一个线程通过屏障进行后续的任务首尾工作,这里的同步操作可称为线程同步屏障,详情参考同步屏障 线程同步屏障示意图

2种常见实现

  1. pthread_barrier_wait 缺点是pthread_barrier_wait会导致条件未满足时线程进入休眠状态,导致线程被阻塞
  2. std::condition_variable&mutex 缺点是依赖于互斥量锁,存在锁竞争,效率低下

高效实现之CAS

可以基于CAS操作实现高性能的线程同步屏障

最近项目中存在2个线程同步的需求,即需要对两个线程的处理结果进行合并,再返回给上游服务模块。

基于CAS的定义,我很快给出了以下实现:

struct SearchRequestPack {
    search_core::proto::Packet packet;
    search_core::proto::Packet thin_text_packet;
    int normal_id{1};
    int thin_text_id{2};
    std::atomic_int m_mergeBarrier;
}

void thread1_func(void* arg) {
    if (SearchIndexdServer::passBarrier(arg, pkt, search_request_pack->normal_id)) {
        mergeResultAndSendResponseBack(arg);
    }
}

void thread2_func(void* arg) {
    if (SearchIndexdServer::passBarrier(arg, pkt, search_request_pack->thin_text_id)) {
       	mergeResultAndSendResponseBack(arg);
    }
}

void mergeResultAndSendResponseBack(void* arg) {
    auto* search_request_pack = reinterpret_cast<SearchRequestPack*>(arg);
    // 1. Merge the results from normal packet and thin_text_packet.
    
    // 2. Send packet to upstream module.
    
    // 3. Recycle the resouce.
    delete(search_request_pack);
}

    
bool passBarrier(void* arg, search_core::proto::Packet* pkt, int value) {
    auto* search_request_pack = reinterpret_cast<SearchRequestPack*>(arg);

    int default_barrier = k_defaultBarrierValue; // k_defaultBarrierValue = -1
    search_request_pack->m_mergeBarrier.compare_exchange_strong(default_barrier, value, std::memory_order_seq_cst);
    return search_request_pack->m_mergeBarrier != value;
}
// search_request_pack 对应着一次服务请求,每来一个请求,就new一个该对象
// 该设计的思路是,第一个到达的线程返回false,即不通过该barrier;而第二个到达的线程会返回true,即通过该barrier;
// 这样对于2线程的场景,总是最后到达的线程通过该Barrier,去处理剩余的首尾工作(合并结果,网络回包,回收search_request_pack)

BUG出现

  1. 以上实现版本在线上环境业务A运行了长达半年之久,服务稳定正常,该服务的单机部署瓶颈在于磁盘IO,其CPU没有压满;

  2. 然而同样一份代码在另外一个业务B中进行压测时,确出现了core异常;

  3. 当时定位这个bug时,非常困惑,因为根据core信息能定位到程序core在了mergeResultAndSendResponseBack里面某一行对search_request_pack的访问处;

  4. 根据core信息我猜测是该Merge函数进入了2次,导致了search_request_pack的线程不安全;

  5. 这样就意味着上述的passBarrier函数通过了2次,但这是不可能的?

代码分析

  1. 通过review以上passBarrier代码,发现在通过了CAS这个原子操作之后,还有一行代码return search_request_pack->m_mergeBarrier != value;
  2. 该行代码会去访问search_request_pack这个对象,也就意味着,存在同一个对象被2个线程同时操作的情况
  3. 假设存在如下情况:
    1. 线程1先通过CAS,准备运行return search_request_pack->m_mergeBarrier != value;之前,该线程的执行CPU被调度走,线程重新进入就绪状态等待下次CPU调度;
    2. 线程2在线程1通过CAS之后再通过CAS,passBarrier函数返回true,执行后续merge函数且delete(search_request_pack)
    3. 线程1在线程2全部执行完成之后,再重新被调度执行return search_request_pack->m_mergeBarrier != value;
    4. 这个时候search_request_pack已经被回收掉了,因此程序会core掉;
    5. 这样分析core因该在return search_request_pack->m_mergeBarrier != value;这一行才对,为什么会core在Merge函数里面呢?

为什么core的位置不符预期

  1. 首先,C/C++语言中的new/delete,本身并不具备对象完整性的检查;
  2. 当一个对象被delete时,该对象对应的堆内存可能立即被回收然后重新分配给其他同类型对象B;
  3. 由于指针仅仅是一个地址,该地址指向的内存,可能会经历先被delete再重新被new出来的情况;
  4. 因此,当线程1调用return search_request_pack->m_mergeBarrier != value;这一行时,可能不会立马core掉,因为其search_request_pack的对象已经被delete并重新在下一个请求中new出来,导致其对象取属性操作是合法的;
  5. 由于线程1实际上指向的是下一个请求的内存,因此控制逻辑已经完全紊乱,导致Merge函数进入两遍,最终导致core掉

为什么在业务上运行半年之久都正常呢?

  1. 业务A其单机部署瓶颈在磁盘IO,压测时,其磁盘IO先打满到达瓶颈,导致其CPU负载还有余量;
  2. 业务B的单机部署瓶颈在于CPU,在压测时,CPU负载接近满载;
  3. 当CPU满载时,其线程的CPU调度会更加频繁,更容易出现换入换出的情况,以至于线程1在剩下一行代码就结束时恰恰好被换出;
  4. 而当CPU负载不高时,以上情况出现的概率就大大减小,以至于不会出现上述BUG

解决方法

bool passBarrier(void* arg, search_core::proto::Packet* pkt, int value) {
    auto* search_request_pack = reinterpret_cast<SearchRequestPack*>(arg);

    search_request_pack->finish_count++;
    int size = static_cast<int>(search_request_pack->search_packet_finished_flags.size());
    if (search_request_pack->finish_count != size) {
        search_request_pack->setFinished(pkt);
        return false;
    }
    int default_barrier = k_defaultBarrierValue;
    search_request_pack->m_mergeBarrier.compare_exchange_strong(default_barrier, value, std::memory_order_seq_cst);
    if (search_request_pack->m_mergeBarrier != value) {
        search_request_pack->setFinished(pkt);
        return false;
    }

    // Wait all other task finish.
    int current_index = search_request_pack->getIndex(pkt);
    bool all_other_target_returned;
    do {
        all_other_target_returned = true;
        for (int i = 0; i < size; i++) {
            if (current_index == i) { continue; }
            if (!search_request_pack->search_packet_finished_flags[i]) { all_other_target_returned = false; }
        }
    } while (!all_other_target_returned);
    return true;
}
// 以上代码的思路是,第一个通过CAS的线程通过,第二个通过CAS的线程立即返回;
// 在第一个线程中会做一个循环检查,确保除了本线程外的其它线程都已经正常结束本次请求的处理流程,以保证不存在再访问本请求的search_request_pack对象的情况,从而保证了线程安全

结论

  1. 开发多线程程序时,其压测尽可能使其单机CPU达到满载或较高负载情况,以验证多线程程序在极致情况下的表现;
  2. 在调试定位BUG时,需要摆脱直觉,一份正常运行长达半年之久的代码不出问题,不意味着其一定是正确的,可能和其运行的环境有关,比如这里的CPU负载情况;
  3. 所谓的线程安全,就是对于一个对象,不要同时在多个线程中对其读写,如果要实现无锁编程,对其线程的写操作期间只有一个线程有其操作权限