numad详解(2)

经过上面各种各样的初始化之后,终于我们可以进入真正的业务流程了。

//你没有看错,又是死循环^_^。
for (;;) {
    int interval = max_interval;
    //线程已经启动,再更新node信息就需要加锁了
    pthread_mutex_lock(&node_info_mutex);
    int nodes = update_nodes();
    pthread_mutex_unlock(&node_info_mutex);
    if (nodes > 1) {
        //更新process信息
        update_processes();
        //numad真正的核心操作,在这里会根据当前状态更新进程的内存和cpu分布。
        interval = manage_loads();
        if (interval < max_interval) {
            //如果执行了一些内容更新,再更新一下node信息。
            nodes = update_nodes();
        }
    }
    //等待
    sleep(interval);
    //判断是否收到进程信号
    if (got_sigterm | got_sigquit) {
        //清理内核消息队列,删除pid文件,退出程序
        shut_down_numad();
    }
    if (got_sighup) {
        got_sighup = 0;
        close_log_file();
        open_log_file();
    }
}

下面我们就来介绍一下numad中最关键的几个处理逻辑update_nodes,update_processes,manage_loads以及-w参数需要的逻辑pick_numa_nodes

update_nodes这个函数用于更新物理节点上的node信息。需要初始化一个包含以下信息的node数组:

typedef struct node_data {
    uint64_t node_id; //node id
    uint64_t MBs_total; //node内存总量
    uint64_t MBs_free; //node剩余内存总量
    uint64_t CPUs_total; //node全部可用CPU的总计算能力
    uint64_t CPUs_free;  //node可用CPU的剩余计算能力
    uint64_t magnitude;  // hack: MBs * CPUs
    uint8_t *distance; //当前node与其他node的距离
    id_list_p cpu_list_p; //当前node的所有可用CPU列表
} node_data_t, *node_data_p;
node_data_p node = NULL;

int min_node_CPUs_free_ix = -1; //剩余CPU最少的node id
int min_node_MBs_free_ix = -1; //剩余内存最少的node id
long min_node_CPUs_free = MAXINT; //node上剩余最少的CPU量
long min_node_MBs_free = MAXINT; //node上剩余最少的内存量
long max_node_CPUs_free = 0; //剩余CPU最多的node id
long max_node_MBs_free = 0; //剩余内存最多的node id
long avg_node_CPUs_free = 0; //平均每个node剩余的CPU
long avg_node_MBs_free = 0; //平均每个node剩余的内存
double stddev_node_CPUs_free = 0.0; //每个node剩余CPU的方差
double stddev_node_MBs_free = 0.0; //每个node剩余内存的方差

完成以上信息的初始化之后,我们就拿到了一个全局的资源列表。在这个列表中,可以看到每个numa node的资源使用情况,和numa nodes之间的资源使用离散情况。下面来看看numad中是怎样一步一步的完成这个列表的初始化的。

int update_nodes() {
    char fname[FNAME_SIZE];
    char buf[BIG_BUF_SIZE];
    //获取当前时间戳,以1%s为单位(我也不知道为什么搞这么奇葩的一个单位。。。)
    uint64_t time_stamp = get_time_stamp();
#define STATIC_NODE_INFO_DELAY (600 * ONE_HUNDRED)
    //node_info_time_stamp初始化为0,在第一次启动的时候会强制刷新nodeinfo
    if ((num_nodes == 0) || (node_info_time_stamp + STATIC_NODE_INFO_DELAY < time_stamp)) {
        //更新node_info刷新时间戳
        node_info_time_stamp = time_stamp;
        //从/sys/devices/system/node/目录下获取node数量
        struct dirent **namelist;
        int num_files = scandir ("/sys/devices/system/node", &namelist, node_and_digits, NULL);
        if (num_files < 1) {
            numad_log(LOG_CRIT, "Could not get NUMA node info\n");
            exit(EXIT_FAILURE);
        }
        //在运行过程中数量一般不会变。第一次运行时num_nodes为0,需要进入初始化。
        int need_to_realloc = (num_files != num_nodes);
        if (need_to_realloc) {
            for (int ix = num_files;  (ix < num_nodes);  ix++) {
                //如果node数减少了,需要释放对应的统计信息
                free(node[ix].distance);
                FREE_LIST(node[ix].cpu_list_p);
            }
            //重新申请存储node信息的内存空间
            node = realloc(node, (num_files * sizeof(node_data_t)));
            if (node == NULL) {
                numad_log(LOG_CRIT, "node realloc failed\n");
                exit(EXIT_FAILURE);
            }
            //初始化新的node结构
            for (int ix = num_nodes;  (ix < num_files);  ix++) {
                node[ix].distance = NULL;
                node[ix].cpu_list_p = NULL;
            }
            //更新node数量
            num_nodes = num_files;
        }
        sum_CPUs_total = 0;
        //初始化下面两个list
        CLEAR_CPU_LIST(all_cpus_list_p);
        CLEAR_NODE_LIST(all_nodes_list_p);
        //获取每个cpu core的超线程数量(一般为2)
        threads_per_core = count_set_bits_in_hex_list_file("/sys/devices/system/cpu/cpu0/topology/thread_siblings");
        if (threads_per_core < 1) {
            numad_log(LOG_CRIT, "Could not count threads per core\n");
            exit(EXIT_FAILURE);
        }
        for (int node_ix = 0;  (node_ix < num_nodes);  node_ix++) {
            int node_id;
            //获取node id
            char *p = &namelist[node_ix]->d_name[4];
            CONVERT_DIGITS_TO_NUM(p, node_id);
            free(namelist[node_ix]);
            //在全局变量node中添加当前node的信息
            node[node_ix].node_id = node_id;
            //将node id添加到all_nodes_list_p
            ADD_ID_TO_LIST(node_id, all_nodes_list_p);
            //获取当前node的cpulist
            snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/cpulist", node_id);
            int fd = open(fname, O_RDONLY, 0);
            if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) {
                buf[BIG_BUF_SIZE - 1] = '\0';
                //初始化node中的cpu list
                CLEAR_CPU_LIST(node[node_ix].cpu_list_p);
                int n = add_ids_to_list_from_str(node[node_ix].cpu_list_p, buf);
                //如果入参中指定了reserve cpu,需要将这部分CPU在cpu_list_p中标记出来
                if (reserved_cpu_str != NULL) {
                    //reserved_cpu_mask_list_p这个列表在初始化的时候已经处理过了,其中包含的是未预留部分的CPU mask
                    //两个列表and处理之后是所有的预留cpu mask
                    AND_LISTS(node[node_ix].cpu_list_p, node[node_ix].cpu_list_p, reserved_cpu_mask_list_p);
                    //除去预留之外的CPU数量
                    n = NUM_IDS_IN_LIST(node[node_ix].cpu_list_p);
                }
                //上面初始化的all_cpus_list_p为空,在此初始化为除去预留部分之后的CPU mask
                OR_LISTS(all_cpus_list_p, all_cpus_list_p, node[node_ix].cpu_list_p);
                //计算CPU数量,并保存到全局变量node数组中。计算规则如下:
                //如果没有打开超线程或者超线程计算能力设置为物理核至少100%,则直接计算为100*n(n为可用的CPU数量)
                //如果打开了超线程,总CPU=物理核数*100+超线程数*超线程计算能力(默认为20%)
                if ((threads_per_core == 1) || (htt_percent >= 100)) {
                    node[node_ix].CPUs_total = n * ONE_HUNDRED;
                } else {
                    n /= threads_per_core;
                    node[node_ix].CPUs_total = n * ONE_HUNDRED;
                    node[node_ix].CPUs_total += n * (threads_per_core - 1) * htt_percent;
                }
                //统计当前节点的总CPU计算能力
                sum_CPUs_total += node[node_ix].CPUs_total;
                close(fd);
            } else {
                numad_log(LOG_CRIT, "Could not get node cpu list\n");
                exit(EXIT_FAILURE);
            }
            //如果需要刷新,则重新获取numa node之间的distance,并保存到node组中。
            if (need_to_realloc) {
                node[node_ix].distance = realloc(node[node_ix].distance, (num_nodes * sizeof(uint8_t)));
                if (node[node_ix].distance == NULL) {
                    numad_log(LOG_CRIT, "node distance realloc failed\n");
                    exit(EXIT_FAILURE);
                }
            }
            snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/distance", node_id);
            fd = open(fname, O_RDONLY, 0);
            if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) {
                int rnode = 0;
                for (char *p = buf;  (*p != '\n'); ) {
                    int lat;
                    CONVERT_DIGITS_TO_NUM(p, lat);
                    node[node_ix].distance[rnode++] = lat;
                    while (*p == ' ') { p++; }
                }
                close(fd);
            } else {
                numad_log(LOG_CRIT, "Could not get node distance data\n");
                exit(EXIT_FAILURE);
            }
        }
        //namelist中的信息已经保存到node中,包括:
        //node id
        //每个node的可用cpu列表
        //每个node的cpu计算能力
        //每个node与其他node的距离
        free(namelist);
    }
    //更新每个node的内存和CPU计算能力信息
    //前面初始化了两个全局变量用于保存CPU数据的信息
    //typedef struct cpu_data {
    //uint64_t time_stamp;
    //uint64_t *idle;  
    //} cpu_data_t, *cpu_data_p;
    //cpu_data_t cpu_data_buf[2];仅保留两次,用于计算差值
    //int cur_cpu_data_buf = 0;
    //确保两次之间至少相差0.07s(原谅我还是忍不住吐槽一下这个单位设置。。。)
    while (cpu_data_buf[cur_cpu_data_buf].time_stamp + 7 >= time_stamp) {
        //如果两次数据之间相差不到0.07s,则sleep 0.1s
        struct timespec ts = { 0, 100000000 }; 
        nanosleep(&ts, &ts);
        time_stamp = get_time_stamp();
    }
    //更新CPU data,原理是从/proc/stat文件中读取每个CPU的idle数据,并保存在cpu_data_buf中。
    update_cpu_data();
    //初始化下面的几个全局变量
    //node上剩余的内存总量
    max_node_MBs_free = 0;
    //node上剩余的cpu计算能力
    max_node_CPUs_free = 0;
    min_node_MBs_free = MAXINT;
    min_node_CPUs_free = MAXINT;
    uint64_t sum_of_node_MBs_free = 0;
    uint64_t sum_of_node_CPUs_free = 0;
    for (int node_ix = 0;  (node_ix < num_nodes);  node_ix++) {
        int node_id = node[node_ix].node_id;
        //获取当前node上的内存信息
        snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/meminfo", node_id);
        int fd = open(fname, O_RDONLY, 0);
        if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) {
            close(fd);
            uint64_t KB;
            buf[BIG_BUF_SIZE - 1] = '\0';
            char *p = strstr(buf, "MemTotal:");
            if (p != NULL) {
                p += 9;
            } else {
                numad_log(LOG_CRIT, "Could not get node MemTotal\n");
                exit(EXIT_FAILURE);
            }
            while (!isdigit(*p)) { p++; }
            CONVERT_DIGITS_TO_NUM(p, KB);
            //获取内存总量,以MB为单位,保存在全局变量node中
            node[node_ix].MBs_total = (KB / KILOBYTE);
            //如果一个node没有内存,则将该node从all_nodes_list_p中移除
            if (node[node_ix].MBs_total < 1) {
                CLR_ID_IN_LIST(node_id, all_nodes_list_p);
            }
            p = strstr(p, "MemFree:");
            if (p != NULL) {
                p += 8;
            } else {
                numad_log(LOG_CRIT, "Could not get node MemFree\n");
                exit(EXIT_FAILURE);
            }
            while (!isdigit(*p)) { p++; }
            CONVERT_DIGITS_TO_NUM(p, KB);
            //获取剩余内存,以MB为单位,保存在全局变量node中
            node[node_ix].MBs_free = (KB / KILOBYTE);
            //如果指定了使用cache作为可用内存的参数,将这部分内存加入可用内存中
            if (use_inactive_file_cache) {
                // Add inactive file cache quantity to "free" memory
                p = strstr(p, "Inactive(file):");
                if (p != NULL) {
                    p += 15;
                } else {
                    numad_log(LOG_CRIT, "Could not get node Inactive(file)\n");
                    exit(EXIT_FAILURE);
                }
                while (!isdigit(*p)) { p++; }
                CONVERT_DIGITS_TO_NUM(p, KB);
                node[node_ix].MBs_free += (KB / KILOBYTE);
            }
            //计算总的内存剩余数量
            sum_of_node_MBs_free += node[node_ix].MBs_free;
            //记录最小及最大剩余内存的nodeid,及对应的剩余内存量。
            if (min_node_MBs_free > node[node_ix].MBs_free) {
                min_node_MBs_free = node[node_ix].MBs_free;
                min_node_MBs_free_ix = node[node_ix].node_id;
            }
            if (max_node_MBs_free < node[node_ix].MBs_free) {
                max_node_MBs_free = node[node_ix].MBs_free;
            }
        } else {
            numad_log(LOG_CRIT, "Could not get node meminfo\n");
            exit(EXIT_FAILURE);
        }
        //如果cpu_data_buf已经保存了两份数据,则可以计算当前node在两个时间点之间的CPU剩余计算能力
        int old_cpu_data_buf = 1 - cur_cpu_data_buf;
        if (cpu_data_buf[old_cpu_data_buf].time_stamp > 0) {
            uint64_t idle_ticks = 0;
            int cpu = 0;
            //当前节点可用的cpu数量
            int num_lcpus = NUM_IDS_IN_LIST(node[node_ix].cpu_list_p);
            int num_cpus_to_process = num_lcpus;
            //计算可用cpu的idle ticks数量之和idle_ticks
            while (num_cpus_to_process) {
                if (ID_IS_IN_LIST(cpu, node[node_ix].cpu_list_p)) {
                    idle_ticks += cpu_data_buf[cur_cpu_data_buf].idle[cpu]
                                - cpu_data_buf[old_cpu_data_buf].idle[cpu];
                    num_cpus_to_process -= 1;
                }
                cpu += 1;
            }
            uint64_t time_diff = cpu_data_buf[cur_cpu_data_buf].time_stamp
                               - cpu_data_buf[old_cpu_data_buf].time_stamp;
            //计算当前node的平均剩余计算能力
            node[node_ix].CPUs_free = (idle_ticks * ONE_HUNDRED) / time_diff;
            //如果打开了超线程,并且设置的超线程计算能力不足一个物理核,则需要将多计算出的部分减去
            if ((threads_per_core > 1) && (htt_percent < 100)) {
                uint64_t htt_discount = (num_lcpus - (num_lcpus / threads_per_core)) * (100 - htt_percent);
                if (node[node_ix].CPUs_free > htt_discount) {
                    node[node_ix].CPUs_free -= htt_discount;
                } else {
                    node[node_ix].CPUs_free = 0;
                }
            }
            if (node[node_ix].CPUs_free > node[node_ix].CPUs_total) {
                node[node_ix].CPUs_free = node[node_ix].CPUs_total;
            }
            //统计总计剩余的CPU计算能力
            sum_of_node_CPUs_free += node[node_ix].CPUs_free;
            //记录最小及最大剩余CPU计算能力的nodeid,及对应的剩余CPU计算能力值。
            if (min_node_CPUs_free > node[node_ix].CPUs_free) {
                min_node_CPUs_free = node[node_ix].CPUs_free;
                min_node_CPUs_free_ix = node[node_ix].node_id;
            }
            if (max_node_CPUs_free < node[node_ix].CPUs_free) {
                max_node_CPUs_free = node[node_ix].CPUs_free;
            }
            //保存mem free * cpu free的数据
            node[node_ix].magnitude = node[node_ix].CPUs_free * node[node_ix].MBs_free;
        } else {
            node[node_ix].CPUs_free = 0;
            node[node_ix].magnitude = 0;
        }
    }
    //记录平均每个node的内存及CPU剩余数量
    avg_node_MBs_free = sum_of_node_MBs_free / num_nodes;
    avg_node_CPUs_free = sum_of_node_CPUs_free / num_nodes;
    //记录所有node剩余内存和剩余CPU的方差和
    double MBs_variance_sum = 0.0;
    double CPUs_variance_sum = 0.0;
    for (int node_ix = 0;  (node_ix < num_nodes);  node_ix++) {
        double MBs_diff = (double)node[node_ix].MBs_free - (double)avg_node_MBs_free;
        double CPUs_diff = (double)node[node_ix].CPUs_free - (double)avg_node_CPUs_free;
        MBs_variance_sum += MBs_diff * MBs_diff;
        CPUs_variance_sum += CPUs_diff * CPUs_diff;
    }
    //记录所有node剩余内存和剩余CPU的方差,用于描述每个node资源余量的离散程度
    double MBs_variance = MBs_variance_sum / (num_nodes);
    double CPUs_variance = CPUs_variance_sum / (num_nodes);
    stddev_node_MBs_free = sqrt(MBs_variance);
    stddev_node_CPUs_free = sqrt(CPUs_variance);
    //如果log level为INFO或以上,输出整理的node信息。
    if (log_level >= LOG_INFO) {
        show_nodes();
    }
    return num_nodes;
}

update_processes函数会根据传入的参数整理出来一个当前进程信息列表放入process_hash_table,信息表格式如下:

typedef struct process_data {
    int pid; //进程pid
    unsigned int flags; //标记是否跨node分配了内存
    uint64_t data_time_stamp; //进程信息更新时间戳
    uint64_t bind_time_stamp;
    uint64_t num_threads; //进程包含的线程数
    uint64_t MBs_size; //进程占用的内存vsize
    uint64_t MBs_used; //进程实际使用的内存rss
    uint64_t cpu_util; //进程使用的CPU时间
    uint64_t CPUs_used; //
    uint64_t CPUs_used_ring_buf[RING_BUF_SIZE];
    int ring_buf_ix;
    char *comm; //进程的命令行
    id_list_p node_list_p; //进程使用node列表
    uint64_t *process_MBs; //进程在每个node上的内存
} process_data_t, *process_data_p;

完成这个列表的初始化之后,我们就获取了当前物理节点上我们需要关注的所有进程的CPU和内存信息。下面来看看numad中是如何一步一步获取这些信息的。

int update_processes() {
    //获取本次更新时间戳
    uint64_t this_update_time = get_time_stamp();
    int new_candidates = 0;
    int files = 0;
    //如果传入的参数指定扫描所有进程,则在/proc目录下获取所有的进程号,并读取这些进程目录下的stat文件,更新process_hash_table。
    if (scan_all_processes) {
        struct dirent **namelist;
        //记录纳入numad管理的进程数量
        files = scandir("/proc", &namelist, name_starts_with_digit, NULL);
        if (files < 0) {
            numad_log(LOG_CRIT, "Could not open /proc\n");
            exit(EXIT_FAILURE);
        }
        for (int ix = 0;  (ix < files);  ix++) {
            process_data_p data_p;
            //用进程的stat文件内容初始化一个process_data_p结构
            if ((data_p = get_stat_data_for_pid(-1, namelist[ix]->d_name)) != NULL) {
                //numad限制至少要有300MB内存才能纳入管理(可以通过参数改变)
                //每次增加到hash表中的内容不能超过当前hash表大小的三分之一(优化性能????)
                if ((data_p->MBs_used > MEMORY_THRESHOLD)
                  && (new_candidates < process_hash_table_size / 3)) {
                    data_p->data_time_stamp = get_time_stamp();
                    //把进程信息更新到hash表中。如果已经存在,new_candidates值不增加。
                    new_candidates += process_hash_update(data_p);
                }
            }
            free(namelist[ix]);
        }
        free(namelist);
    }
    //接下来要处理参数传入的inclusion和exclusion两个队列,需要加上线程锁防止子线程正在处理这个队列。
    pthread_mutex_lock(&pid_list_mutex);
    //inclusion队列,相当于白名单,应当纳入numad管理
    pid_list_p pid_ptr = include_pid_list;
    //仍然限制新增加到hash表中的内容不能超过hash表大小的三分之一
    while ((pid_ptr != NULL) && (new_candidates < process_hash_table_size / 3)) {
        int hash_ix = process_hash_lookup(pid_ptr->pid);
        //如果最近已经更新过一次了(指定了scan_all_processes的情况下),直接扫描列表中的下一个进程
        if ( (hash_ix >= 0) && (process_hash_table[hash_ix].data_time_stamp > this_update_time)) {
            pid_ptr = pid_ptr->next;
            continue;
        }
        process_data_p data_p;
        if ((data_p = get_stat_data_for_pid(pid_ptr->pid, NULL)) != NULL) {
            data_p->data_time_stamp = get_time_stamp();
            new_candidates += process_hash_update(data_p);
            if (!scan_all_processes) {
                //如果没有设置scan_all_processes,需要记录纳入numad管理的进程数量。
                files += 1;
            }
            pid_ptr = pid_ptr->next;
        } else {
            //没有获取到进程信息,说明该进程已经不存在了,从inclusion list中移除该进程
            include_pid_list = remove_pid_from_pid_list(include_pid_list, pid_ptr->pid);
            pid_ptr = include_pid_list;
            continue;
        }
    }
    //处理exclusion list中的进程,这些进程相当于黑名单。如果现有的hash表中已经包含了这部分进程,需要重置他们的CPU使用量为0
    pid_ptr = exclude_pid_list;
    while (pid_ptr != NULL) {
        int hash_ix = process_hash_lookup(pid_ptr->pid);
        if (hash_ix >= 0) {
            process_hash_table[hash_ix].CPUs_used = 0;
        }
        pid_ptr = pid_ptr->next;
    }
    //队列处理完毕,释放队列的线程锁
    pthread_mutex_unlock(&pid_list_mutex);
    if (log_level >= LOG_INFO) {
        numad_log(LOG_INFO, "Processes: %d\n", files);
    }
    //清理hash表中的过时数据,即仍然保留在hash表中的此次更新之前的数据。
    //另外,如果hash表内容已经超过hash表大小的二分之一,需要将此hash表扩大到原来的2倍(性能????)
    process_hash_table_cleanup(this_update_time);
    return files;
}

相关阅读:

numad详解(1)

numad详解(3)

numad详解(4)

本文来自网易实践者社区,经作者岳文远授权发布。