FastDCS 分布式计算系统介绍(上)

网易云社区2018-06-11 15:13

1 背景简介

FastDCS是一个使用 C++ 开发的轻量级的分布式计算系统,使用它可以解决海量数据的计算和需要分布式服务方面的问题。2013年FastDCS进行了代码重构,大幅度提升了系统的性能和可靠性,并将代码和相关设计文档全部开源。[源码下载地址]

2 系统特性

  • FastDCS 是一个轻量级的分布式计算系统,开发者使用它可以快速的完成开发和部署工作,如同系统的名称Fast Distributed Computing System。
  • FastDCS 适合用于需要长期运行的计算处理业务,如瑞读网提供了长达4年的在线文档转换服务,它和MapReduce模式的批处理系统有很大的区别。
  • FastDCS 通过简洁而有效的设计,为开发者提供一套简洁而有效的分布式计算框架,开发者只需要定制由FastDCS 提供的三个用户自定义UDF函数(User defined function),就可以完成整个分布式系统的开发工作,FastDCS同样也是非常容易进行部署的,开发者可以轻松的将FastDCS系统部署在很多台Linux服务器中的,FastDCS会自动将这些Linux服务器建立起服务集群,以分布式运行的方式完成开发者的计算任务。

3 系统组成结构

从系统组成划分的角度来看,FastDCS包括了管理节点(Master)和工作节点(Worker) - 管理节点Master对整个计算集群的服务状态、任务分发、计算调度等服务进行管理。 - 工作节点Worker作为计算单元接受Master服务器的管理,完成整个计算集群的计算任务。

4 系统运行方式

  • 管理节点Master在多台服务器中以主从模式运行
    • 至少需要提供一台Master节点,建议同时运行三台Master节点会达到最佳的可靠性和性能。
    • 多台Master节点会自动选举出主节点(Primary master)和从节点(Secondary master)。
    • 每台Master节点分别管理一部分Worker节点,达到了系统的负载均衡的同时也提升了系统的整体性能。
  • 工作节点Worker在多台服务器中以并行模式运行
    • 至少需要提供一台Worker节点,目前可以支持上百台的Woker节点进行并行工作。
    • Worker节点会主动寻找负载较小的Master节点,接受计算任务调度,进行计算任务运算。

5 计算任务调度方式

  • Primary Master节点管理了服务集群中的所有计算任务
  • Primary Master节点将一部分计算任务副本分发给Secondary master节点进行管理,当一段时间内该任务未完成计算,Primary Master节点将会注销这个任务副本,并将这个副本重新分发给其他的Secondary master节点。
  • Primary Master和Secondary master节点都会分别处理各自管辖的Worker节点的服务请求,将计算任务进行下发,当一段时间内该任务未完成计算,Master[注1]节点将会注销这个任务副本,并将这个副本重新分发给其他的Woker节点。
  • 当服务集群中空闲时,Primary Master节点将会通过调用开发者自定义ImportTaskUDF函数,将新的计算任务导入到计算集群中等待处理。
  • 当Worker节点接收到Master节点分发的计算任务时,将会通过调用开发者自定义ComputingUDF函数,完成计算任务的运算处理。
  • 当服务集群中的计算任务计算完成后,Primary Master节点将会通过调用开发者自定义ExportTaskUDF函数,将计算结果导出到外部存储系统中。

[注1]:此处的Master服务器是指Primary Master或Secondary master服务器。

6 系统容错处理

  • 当Primary master节点发生异常停止服务时,Secondary master节点会重新选举出新的Primary master节点接替工作,避免了服务集群的单点故障的问题,当故障服务器恢复后可以重新参加到集群的工作中。
  • Worker节点可以根据环境的需要进行动态的增减,服务集群的计算能力和计算周期也随之发生线性的变化。
  • 在Master和Worker节点发生动态调整的时候,只会影响极小部分的计算任务,不会影响服务集群的正常运行,受影响的小部分计算结果也会由系统判断是否采纳或重新进行计算。
  • 当发生网络延时或Worker节点异常造成计算任务超时的情况下,Master节点会将计算任务重新分发给其他Worker节点,即使多台Worker节点运算同一个任务时,Master也只会采纳其中有效的计算结果。

7 发行日志

  • 2013/06/06,整理提交了V0.1.1代码和相关技术文档。[下载地址]

8 版本特性

  • 通过环形选举算法从多个管理服务中选举出 Master;
  • 使用 epoll 完成网络通讯,实现更高的网络处理能力;
  • 使用 Protobuf 完成数据结构的定义和数据的编解码;
  • 提供计算任务的导入、自定义计算和计算结果的导出这三个 UDF 函数;

9 版权说明

FastDCS源代码通过GNU General Public License V3进行发布。

FastDCS 架构介绍

FastDCS是一个使用C++开发的轻量级的分布式计算系统,使用它可以解决海量数据的实时计算和需要分布式服务方面的问题。

1.1 系统特性

  • FastDCS是一个轻量级的分布式计算系统,开发者使用它可以快速的完成开发和部署工作,如同系统的名称Fast Distributed Computing System。
  • FastDCS适合用于需要长期运行的计算处理业务,如瑞读网提供了长达4年的在线文档转换服务,它和MapReduce模式的批处理系统有很大的区别。

1.2 系统说明

从系统组成划分的角度来看,FastDCS包括了管理节点(Master)和工作节点(Worker)

  • 管理节点Master对整个计算集群的服务状态、任务分发、计算调度等服务进行管理。
  • 工作节点Worker作为计算单元接受Master服务器的管理,完成整个计算集群的计算任务。

1.2.1 系统处理流程说明

FastDCS系统整体架构FastDCS系统整体架构

如图1-3-1,系统处理流程有如下几个部分组成:

  1. 数据流将需要计算的数据保存在外部的存储系统中,如数据库或分布式存储系统中;
  2. Primary master节点从外部存储系统中不断获取新的计算任务,保存在Primary master节点中;
  3. Secondary master节点主动从Primary master节点获取一部分计算任务副本;
  4. Worker节点主动从Secondary master节点获取计算任务进行计算处理;
  5. Worker节点将计算结果提交给Secondeary master节点,所有的Secondary master节点将计算结果汇总到Primary master节点;
  6. Primary master节点将计算结果保存到外部的存储系统。

1.2.2 计算任务处理流程说明

计算任务处理流程

计算任务处理流程

如图1-3-2,计算任务处理流程有如下几个部分组成:

  1. Primary master节点保存了整个服务集群的所有计算任务;
  2. 一台Master节点加入Master服务集群后,通过Lease机制从Primary master节点租赁一部分计算任务,将副本数据保存在本节点(副本有效期10秒);
  3. 每个Worker节点分别从所属的Secondary master节点获取一份计算任务进行计算;
  4. 当Secondary master节点退出或异常时,它下属的Worker节点直接从Primary master节点获取计算任务;
  5. Worker节点完成计算任务后,将计算结果提交给所属的Secondary master节点,由Secondary master节点将计算结果汇总提交给Primary master节点保存到外部的存储系统;

1.3 架构剖析

1.3.1 系统可靠性

FastDCS通过同时运行多个管理节点Master来避免服务集群的单点故障问题,多个Master节点之间通过环形算法选举出Primary master节点对服务集群进行主控,其它的Secondary master节点承担一部分辅助管理,确保了服务集群的可靠性的同时也避免了单点过载的问题。

Master选举算法

Master选举算法

如图1-4-1,FastDCS系统中所有Master节点按照前后顺序组成一个环新形结构,每个Master节点都保存了这个环形结构的排序表,因此每个Master节点都知道自己的所有后继Master节点。

Master选举算法说明

假设当Master(1)节点发现到Primary master不再工作时,它将启动一个召集选举的过程:

  1. Master(1)构造一个包含自己进程编号的 ELECTION 给后继进程,如果直接后继进程没有响应,Master(1)就将消息发送给下一个Master节点,直到找到一个正常运行的Master节点;
  2. Master(2)接收到 ELECTION 消息的进程将自己的编号增加到 ELECTION 消息中,然后按照同样的方式将消息发送给后继Master节点。这样,消息在环上的传递将构造一个包含所有正常运行的Master的编号表;
  3. 当 ELECTION 消息最后回到召集选举的Master节点时,消息中最大编号的Master节点即成为选举的胜利者。召集选举的进程将消息类型改为 COORDINATOR,然后将消息沿着环重新发送一次,将选举结果通知所有的Master节点;
  4. 当 COORDINATOR 消息重新回到召集选举的Master节点时,算法终止;

同样,在环选举算法中,也可能同时存在多个召集选举的过程。当在这个时刻环结构不变时,只是消息数量多一些,最后的结果也是一致的。

1.3.2 系统可用性

primary-secondary协议

primary-secondary协议

如图1-4-2,FastDCS采用primary-secondary(也称 primary-backup)的中心化副本控制协议。 在FastDCS系统中,数据被分为由Primary master节点管理着的元数据和由Secondary master节点管理着的数据副本。

Primary master节点负责维护数据的更新、并发控制、协调副本的一致性

  1. 数据更新都由Primary节点协调完成;
  2. primary master节点从外部存储系统获取新数据;
  3. Primary master节点进行并发控制即确定并发更新操作的先后顺序;
  4. Primary master节点将更新操作发送给secondary 节点;
  5. Primary master根据secondary master节点的完成情况决定更新是否成功并将结果返回外部存储系统;

Secondary master节点管理一部分数据副本

  1. Primary master节点将数据分为数据段,以数据段为副本的基本单位;
  2. 将数据副本分散到集群中个,每个Secondary master节点上都有一部分元数据的副本;
  3. Secondary master节点使用数据副本完成服务集群的一部分管理工作,来分担系统的整体压力;

1.3.3 数据一致性

Lease机制

Lease机制

如图1-4-3,FastDCS采用Lease机制能够确保在服务器或网络异常等情况下,仍然保持分散在服务集群中的数据具有很强的一致性。

  • Primary master节点在向各个Secondary master节点发送数据的同时向节点颁发一个lease;
  • Primary master节点在lease有效期内,保证不对已经颁发的数据进行修改;
  • Primary master节点在lease有效期失效后,才会对颁发的数据进行重新下发或修改;
  • 只有Primary master节点才能修改元数据,所以在任何时间Primary master节点中的数据都是最新的;

1.3.4 数据结构说明

FastDCS中的数据结构定义文件在/src/server/tracker_protocol.proto文件中。

其中只有FdcsTask、KeyValuesPair和KeyValuePair三个数据结构需要开发者了解掌握,其他的数据结构用于FastDCS系统内部的处理,与开发者无关。

数据结构FdcsTask用于计算任务的调度和保存计算结果,为了让FdcsTask能够以一种通用的结构满足各种应用场景中数据传输的需要, FastDCS使用了可以包含多个KV键值对的变量来满足需求,同时使用Google Protocol Buffers对系统内部的数据结构进行定义以及序列化操作。

1.3.4.1 FdcsTask:计算任务数据结构

message FdcsTask {
  // 任务ID
  required string task_id = 1 [default = ""];
  // 任务租约有效时间
  optional int64 lease_time = 2 [default = 0];
  // 多个KeyValuesPair数据结构
  repeated KeyValuesPair key_values_pairs = 3;
}
// 为方便理解FdcsTask数据结构
// 下面列出了在Demo程序'单词排序应用实例'中的一个计算任务FdcsTask变量内部的数据内容如下:
FdcsTask = {
  task_id = id1, 
  key_values_pairs = [
    {key='A', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
    {key='B', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
    {key='C', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
    ...
  ]
}

FastDCS中的计算任务数据结构FdcsTask有只有3个变量组成,分别是任务ID、任务租约有效时间、一对多KV键值对三个部分组成:

  • task_id(任务ID):用于在整个工作任务中进行唯一标示某一项计算任务;
  • lease_time(任务租约有效时间):FastDCS系统内部控制数据有效性的变量,开发者可以忽略该变量;
  • key_values_pairs(一对多KV键值):每个计算任务通过该变量能够保存多个一对多的K-V键值对数据,可以用来保存自定义的多个变量;

1.3.4.2 KeyValuesPair:一对多KV键值对数据结构

message KeyValuesPair {
  optional bytes key = 1;
  // value中以字节的方式保存了多个KeyValuePair数据结构
  repeated bytes value = 2;
}
// 为方便理解KeyValuesPair数据结构
// 下面列出了在Demo程序'单词排序应用实例'中的一个计算任务KeyValuesPair变量内部的数据内容如下:
key_values_pairs = {key='A', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
  • KeyValuesPair其中的Key用来保存自定义变量的名称,value用来保存该自定义变量的数据;
  • value中的数据可以使用多个一对一KV键值对KeyValuePair数据结构进行填充。

1.3.4.3 KeyValuePair:一对一KV键值对

message KeyValuePair {
  optional bytes key = 1;
  optional bytes value = 2;
}
// 为方便理解KeyValuesPair数据结构
// 下面列出了在Demo程序'单词排序应用实例'中的一个计算任务KeyValuePair变量内部的数据内容如下:
{key=单词1, value=count1}
  • KeyValuePair其中的Key用来保存自定义变量的名称,value用来保存该自定义变量的数据;

  • value中的数据可以使用具体的数据进行填充;

  • 注意:KeyValuesPair 和 KeyValuePair数据结构名称只差一个表示是复数的字母s;

FastDCS 开发说明

1.4 开发说明

FastDCS通过简洁而有效的设计,为开发者提供一套简洁而有效的分布式计算框架,开发者只需要定制由FastDCS提供的几个用户自定义UDF函数(User defined function),就可以完成整个分布式系统的开发工作。

1.4.2 管理节点Master开发说明

开发者可以通过自定义Master节点类实现自定义的FastDCS管理节点功能,需要处理的有如下几个部分:

  1. 自定义 Master 类名 DemoMaster,继承 Master 类后即可拥有FastDCS服务集群的管理功能;
  2. 如果需要初始化 DemoMaster中的自定义变量,可以在虚函数 InitialTracker 中实现;
  3. 如果需要释放 DemoMaster中的自定义变量,可以在虚函数 FinalizeTracker 中实现;
  4. 必须在虚函数 ImportTaskUDF 中实现 DemoMaster 中的计算任务导入到服务集群;
  5. 必须在虚函数 ExportTaskUDF 中实现 DemoMaster 中的计算结果保存到外部存储系统;
  6. 必须通过REGISTER_FASTDCS_TRACKER宏,将自定义类名 DemoMaster 注册到FastDCS中;

管理节点Master开发框架

// 1.自定义Master类名`DemoMaster`派生在Master之上
class DemoMaster : public Master {
    // 2.Master节点启动后,会调用这个方法
    void InitialTracker(struct settings_s settings) {
        // 初始化在 DemoMaster 中使用的自定义变量
        ......

        Master::InitialTracker(settings);
    };

    // 3.Master节点退出时,会调用这个方法
    void FinalizeTracker() {
        // 释放在 DemoMaster 中使用的自定义变量
        ......

        Master::FinalizeTracker();
    };

    // 4.FastDCS空闲的时候会主动调用这个方法,将新的计算任务导入到服务集群
    // 该方法只会由选举为Primary master的节点进行调用
    bool ImportTaskUDF(vector<FdcsTask> &tasks) {
        // 你需要将外部存储系统中的计算任务导入到服务集群中
        ......

        return true;
    };

    // 5.FastDCS会主动调用这个方法,将任务的计算结果导出到外部存储系统
    // 该方法只会由选举为Primary master的节点进行调用
    bool ExportTaskUDF(vector<FdcsTask> tasks) {
        // 你需要将计算保存到外部存储系统中
        ......

        return true;
    };
    private:
        自定义变量;
};
// 6.注册宏必须填写正确,`DemoMaster`是你自定义的Master类名称
REGISTER_FASTDCS_TRACKER(DemoMaster);

1.4.3 工作节点Worker开发说明

开发者可以通过自定义Worker节点类实现自定义的FastDCS工作节点功能,需要处理的有如下几个部分:

  1. 自定义 Worker 类名 DemoWorker,继承 Worker 类后即可响应FastDCS服务集群的计算任务调度、并行计算等功能;
  2. 如果需要初始化 DemoWorker中的自定义变量,可以在虚函数 InitialTracker 中实现;
  3. 如果需要释放 DemoWorker中的自定义变量,可以在虚函数 FinalizeTracker 中实现;
  4. 必须在虚函数 ComputingUDF 中实现 DemoWorker 中的计算处理;
  5. 必须通过REGISTER_FASTDCS_TRACKER宏,将自定义类名 DemoWorker 注册到FastDCS中;

工作节点Worker开发框架

// 1.自定义Worker类名`DemoWorker`派生在Master之上
class DemoWorker : public Worker {
    // 2.Worker节点启动后,会调用这个方法
    void InitialTracker(struct settings_s settings) {
        // 初始化在 DemoWorker 中使用的自定义变量
        ......

        Worker::InitialTracker(settings);
    };

    // 3.Worker节点退出时,会调用这个方法
    void FinalizeTracker() {
        // 释放在 DemoWorker 中使用的自定义变量
        ......

        Worker::FinalizeTracker();
    };

    // 4.FastDCS中有需要计算的任务的时候会主动调用这个方法,
    // 开发者自行实现自定义的计算方法
    bool ComputingUDF(FdcsTask &task) {
        // 开发者自行实现自定义的计算方法
        ......

        return true;
    };
    private:
        自定义变量;
};
// 5.注册宏必须填写正确,`DemoWorker`是你自定义的Worker类名称
REGISTER_FASTDCS_TRACKER(DemoWorker);

1.5 编译和部署说明

FastDCS同样也是非常容易进行部署的,开发者可以轻松的将FastDCS系统部署在很多台Linux服务器中的, FastDCS会自动将这些Linux服务器建立起服务集群,以分布式运行的方式完成开发者的计算任务。

1.5.1 编译和安装说明

FastDCS只能运行在Linux系统中(需要内核版本高于 2.6 ),目前FastDCS-v0.1.1版本需要Google Protocol Buffers库的支持。

FastDCS系统运行是不需要依赖mysql,但由于FastDCS的demo演示程序是使用mysql作为外部存储系统进行代码编写的, 所以你的系统中需要有mysql数据库,或者有能够提供远程连接的mysql数据库。

编译和安装依赖环境

  1. 安装GCC
    • 目前FastDCS开发环境使用的是 GCC 4.4.4 版本。
  2. 安装CMake
    • CMake用于管理和编译FastDCS工程,由于FastDCS系统需要使用Protobuf,所以CMake必须安装 2.8.0 以上版本。
    • 你可以通过 http://www.cmake.org/cmake/resources/software.html 下载和安装CMake的二进制版本。
    • CMake安装教程 http://liuxun.org/blog/2012/10/20/linux-xia-an-zhuang-zui-xin-ban-ben-de-cmake/
  3. 安装Protobuf
    • Google Protocol Buffers用于FastDCS服务集群之间的通讯协议。
    • 你可以通过 http://code.google.com/p/protobuf 下载Protobuf的源代码(目前FastDCS开发环境使用的是 Protobuf-2.4.1 版本 )。
    • $ cd protobuf-2.4.1
    • $ ./configure prefix=/usr/local
    • $ make
    • $ sudo make install
    • 添加环境变量 export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig/~/.bash_profile 文件中
    • 添加加载库路径 /usr/local/lib//etc/ld.so.conf文件中
    • $ ldconfig
  4. 编译FastDCS
    • 从 https://github.com/liuxunorg/FastDCS 中下载FastDCS的最新版本源代码。
    • 使用编辑器打开 FastDCS/CMakeLists.txt 文件
    • FastDCS系统自身运行是不需要依赖mysql的,但FastDCS的demo程序是使用mysql作为外部存储系统进行代码编写的, 所以需要配置mysql库的路径,你需要修改 SET(MYSQL_DIR "/usr/local/mysql"),将你的mysql安装路径替换 /usr/local/mysql
    • 如果你需要通过 make install 命令将FastDCS安装到其他路径,你需要修改 set(CMAKE_INSTALL_PREFIX "/home/liuxun/FastDCS"), 将你自己安装路径替换 /home/liuxun/FastDCS
    • $ cd FastDCS-v0.1.1
    • $ mkdir build
    • $ cd build
    • $ cmake …
    • $ make
    • $ make install

1.5.2 系统配置

FastDCS一共有2个配置文件,分别位于 /src/conf/master.conf 和 /src/conf/worker.conf ,这2个配置文件中绝大部分的配置项是通用的。

系统配置参数说明

# Copyright 2013-02-05
# Author: Liu Xun (my@liuxun.org)
#
# FastDCS master config file

# 设置成 true,系统将以服务的方式在后台运行
# 设置成 false,系统将以应用的方式在前台运行
run_by_daemon = true

# 设置成 true,当系统异常崩溃的时候,将产生core文件
# 设置成 false,当系统异常崩溃的时候,不会产生core文件
max_core_file = false

# 三种日志类型的输出文件路径
# 如果不需要输出到文件,可以设置成/dev/null,如:
# info_log = /dev/null
# 如果需要见日志输出到控制台,可以不设置任何值,如:
# info_log = 
info_log = /home/liuxun/FastDCS/demo/info.log
warn_log = /home/liuxun/FastDCS/demo/warn.log
err_log  = /home/liuxun/FastDCS/demo/err.log

# 日志文件最大容量,默认值10M,支持的单位如下:
# G or g for gigabyte(GB)
# M or m for megabyte(MB)
# K or k for kilobyte(KB)
log_max_size = 10MB

# 指定以哪个用户运行这个程序,如果没有设置,以当前用户执行
# 该设置项只有在 run_by_daemon = true 的时候才有效
run_by_user = liuxun

# 保存PID的文件名称
pid_file = /tmp/fdcs_master.pid

# 系统支持的最大连接数,默认值256
# 这个参数的建议值是服务集群所运行的节点数的5倍
max_connections = 256

# 用于分配给每个Socket链接的缓冲区大小,默认64KB
# 这个参数建议的范围是 [8KB, 512KB]
socket_buff_size = 128KB

# 在Master 或 Worker 类的基础上派生的自定义类名称
# 例如FastDCS提供的demo程序中的Master节点需要配置成 
# class_factory = DictMaster
# 例如FastDCS提供的demo程序中的Worker节点需要配置成 
# class_factory = DictWorker
class_factory = "please input you master class name"

# 整个master节点集群的服务器IP和端口队列
# 格式为 host1:port1;host2:port2;...
tracker_group = 127.0.0.1:32301;127.0.0.1:32302;127.0.0.1:32303

# 当前节点的IP和端口
tracker_server = 127.0.0.1:32301

# Socket连接超时时间(单位:秒)
# default value is 30s
connect_timeout = 30

# 网络连接超时时间(单位:秒)
# default value is 30s
network_timeout = 60

# 节点发送心跳包的时间间隔(单位:秒)
# 该设置不能小于 lease_timeout 的设置值
heart_beat_interval = 30

# 节点状态报告的时间间隔(单位:秒)
stat_report_interval = 60

# 数据副本有效期时间(单位:秒)
# 该设置不能大于 heart_beat_interval 的设置值
lease_timeout = 10

# Primary master节点
# 每次从外部存储系统加载的计算任务数目(单位:个)
preload_tasks = 100

# Second master节点每次从Primary master节点
# 同步计算任务副本的数目(单位:个)
task_duplicate = 10

# Worker节点中的任务计算线程数
# 仅使用与Worker节点配置
computing_threads = 2

# FastDCS自带mysql的访问封装类的配置项
# FastDCS自身运行是不需要依赖mysql的
# 如果你需要在自定义函数中使用可以在这里进行配置
mysql_database = FastDCS
mysql_host = localhost
mysql_user = FastDCS
mysql_passwd = fastdcs
mysql_port = 3306

FastDCS 开发实例

2.1 单词排序开发实例

在FastDCS的源代码中的/src/demo/目录下有一个完整的开发样例,该样例演示了如何使用FastDCS对/src/demo/dict.txt文件中的英文单词按照出现的次数进行排序,该样例仅用于FastDCS开发讲解并无实际应用价值;

2.1.1 开发思路

  • 首先,我们需要按照任务分解的思想,将dict.txt文件中按照一行(或几行)作为一个计算任务单元,将整个工作拆分成N份计算任务,将所有的计算任务存储到一个外部存储系统中,Demo中使用了mysql作为外部存储系统;
  • 你需要按照/src/demo/dict.sql文件中提供的SQL脚本,在mysql中建立用于保存每个计算任务的dict_task表以及用于保存单词排序结果的dict_word表;
  • Demo中提供了dict_mock程序帮助你将计算任务单元保存到了mysql中的dict_task表。
  • 我们需要开发管理节点Master程序,我们将它命名为 DictMaster,用来从dict_task表中获取未处理的计算任务单元导入到FastDCS服务集群中,还负责将计算结果保存到dict_word表中;
  • 我们需要开发工作节点Worker程序,我们将它命名为 DictWorker,用来接受DictMaster节点分配的计算任务单元,根据计算任务单元的ID从dict_task表中获取对应的文字内容,然后按照单词进行拆分计算,计算的结果由FastDCS自动发送给DictMaster节点进行保存;
  • 通过运行一段时间完成计算后,dict_word 表中将保存了 dict.txt 文本中所有单词排序后的结果;

2.1.2 DictMaster开发说明

// 1.将自定义类'DictMaster'派生在'Master'之上
class DictMaster : public Master {
  public:
  // 2.Master节点程序启动后,会调用这个方法
  void InitialTracker(struct settings_s settings) {
    // 初始化自定义变量 mysql_connect_,连接mysql数据库
    mysql_connect_.Connect(settings.mysql_database, 
                             settings.mysql_host, 
                             settings.mysql_user, 
                             settings.mysql_passwd, 
                             settings.mysql_port);

    // 必须调用基类中的InitialTracker函数,初始化Master类
    Master::InitialTracker(settings);
  };

  // 3.Master节点程序退出时,会调用这个方法
  void FinalizeTracker() {
    // 由于自定义变量 mysql_connect_ 是自动释放资源的,所以没有自定义变量需要在这里释放

    // 必须调用基类中的FinalizeTracker函数,释放Master类
    Master::FinalizeTracker();
  };
// 4.FastDCS空闲的时候会主动调用这个方法,将新的计算任务导入到服务集群
  // 该方法只会由选举为'Primary master'的节点进行调用
  // 开发思路:批量获取'dict_task'表中的任务状态字段'task_status'等于'1'未处理的计算任务
  // 将它们导入到FastDCS,同时将这批数据的任务状态字段'task_status'设置为'2'(已处理)
  bool ImportTaskUDF(vector<FdcsTask> &tasks) {
    LOG(INFO) << "DictMaster::ImportTaskUDF()";

    std::string select_sql, update_sql, update_where;
    update_sql = "UPDATE dict_task SET task_status = 2 WHERE ";

    // 根据配置文件master.conf中的'preload_tasks'所配置的参数批量获取 
    // task_status等于'1'(未处理)的的计算任务
    std::string format = "SELECT task_id FROM dict_task WHERE task_status = 1 LIMIT %d;";
    SStringPrintf(&select_sql, format.data(), fdcs_env_.PreloadTasks());
    LOG(INFO) << select_sql;

    MysqlQuery query(&mysql_connect_);
    if (!query.Execute(select_sql.data())) {
      LOG(ERROR) << "select task from mysql failduer!";
      return false;
    }

    MysqlResult *sql_result = query.Store();
    int rows_size = sql_result->RowsSize();
    LOG(INFO) << "sql_result->RowsSize() = " << sql_result->RowsSize();
    if (0 == rows_size) return false;

    // 获取到新的计算任务后,为了避免重复处理
    // 将数据库中本次获取的计算任务的'task_status'设置为'2'(正在处理状态)
    FdcsTask task;
    for (int i = 0; i < rows_size; ++i) {
      MysqlRow row = sql_result->FetchRow();

      std::string task_id = (char*)row[0];
      LOG(INFO) << "task_id = " << task_id;
      task.set_task_id(task_id);
      tasks.push_back(task);

      if (update_where.empty()) {
        update_where = "task_id = '" + task_id + "'";
      } else {
        update_where = update_where + " or task_id = '" + task_id + "'";
      }
    }
    query.FreeResult(sql_result);

    update_sql = update_sql + update_where + ";";

    // 执行更新task_status状态的SQL语句
    if (!query.TryExecute(update_sql.data())) {
      LOG(ERROR) << "update task status faildure!";
      return false;
    }

    return true;
  };

关于作者

FastDCS 的作者刘勋,有着 17 年的软件行业和互联网行业开发和架构经验,2008年-2012年曾创办瑞读网,是国内最早期的数字出版云服务提供商,在创业期间开发的 FastDCS 分布式计算系统为瑞读网的客户提供了长期稳定的服务。

FastDCS 分布式计算系统介绍(下)

本文已由作者授权网易云社区发布,未经允许不得转载。