Databus架构分析与初步实践(for mysql)(下篇)

阿凡达2018-07-06 13:13

6. Databus for Mysql实践

6.1 相关解释

  • 实现原理:通过解析mysql的binlog日志来获取变更事件,解析过程利用Java开源工具OpenReplicator,Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用,所有的Event实现了BinlogEventV4接口。

  • binlog 格式:Databus设计为针对Row格式日志进行解析

    • Statement:基于SQL语句的复制(statement-based replication,SBR)
    • Row:基于行的复制(row-based replication,RBR)
    • Mixed:混合模式复制(mixed-based replication,MBR)
  • SCN的确定:64bits组成,高32位表示binlog的文件序号,低32位代表event在binlog文件的offset,例如在 mysql-bin.000001文件中 offset为 4的scn表示为(1 << 32) | 4 = 4294967300

6.2 数据库环境配置

  • 安装mysql数据库,本次使用mysql-5.5.56版本。

  • 查看数据库是否开启binlog,如果binlog没有开启,可以通过set sql_log_bin=1命令来启用;如果想停用binlog,可以使用set sql_log_bin=0。                                                                                                            

  • 配置数据库binlog_format=ROW, show variables like ‘binlog_format‘可查看日志格式, set globle binlog_format=ROW’可设置,通过修改my.cnf文件也可以,增加或修改行binlog_format=ROW即可。            

  • binlog_checksum设置为空,show global variables like ‘binlog_checksum’命令可查看,set binlog_checksum=none可设置。

  • 在mysql上创建名为or_test的数据库,or_test上创建表名为person的表,定义如下:

6.3 Demo配置与运行

6.3.1 下载源码

  • Databus官网下载源码,下载地址https://github.com/linkedin/databus.git,我们需要用到databus目录下的databus2-example文件夹,在此基础上改造并运行,目录结构及介绍如下:                                                              

    • database:数据库模拟相关的脚本和工具
    • databus2-example-bst-producer-pkg:bootstrap producer的属性配置文件夹,包括bootstrap producer和log4j属性文件,build脚本以及bootstrap producer的启动和停止脚本。
    • databus2-example-client-pkg:client的属性配置文件夹,包括各种属性文件和启动和停止脚本。
    • databus2-example-client:client源代码,包含启动主类和消费者代码逻辑。
    • databus2-example-relay-pkg:relay的属性配置文件夹,包含监控的表的source信息和Avro schema。
    • databus2-example-relay:relay的启动主类。
    • schemas_registry:存放表的avsc文件。

6.3.2 Relay端的操作

  • 配置Relay属性文件:databus2-example-relay-pkg/conf/relay-or-person.properties的内容如下配置,包括端口号,buffer存储策略,maxScn存放地址等信息:

      databus.relay.container.httpPort=11115
      databus.relay.container.jmx.rmiEnabled=false
      databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY
      databus.relay.eventBuffer.queuePolicy=OVERWRITE_ON_WRITE
      databus.relay.eventLogReader.enabled=false
      databus.relay.eventLogWriter.enabled=false
      databus.relay.schemaRegistry.type=FILE_SYSTEM
      databus.relay.schemaRegistry.fileSystem.schemaDir=./schemas_registry
      databus.relay.eventBuffer.maxSize=1024000000
      databus.relay.eventBuffer.readBufferSize=10240
      databus.relay.eventBuffer.scnIndexSize=10240000
      databus.relay.physicalSourcesConfigsPattern=../../databus2-example/databus2-example-relay-pkg/conf/sources-or-person.json
      databus.relay.dataSources.sequenceNumbersHandler.file.scnDir=/tmp/maxScn
      databus.relay.startDbPuller=true
    
  • 配置被监控表的source信息:databus2-example-relay-pkg/conf/sources-or-person.json的内容如下配置,其中URI format:mysql://username/password@mysql_host[:mysql_port]/mysql_serverid/binlog_prefix,注意%2F为转义字符,用户名为root,数据库密码为123。

      {
          "name" : "person",
          "id"  : 1,
          "uri" : "mysql://root%2F123@localhost:3306/1/mysql-bin",
          "slowSourceQueryThreshold" : 2000,
          "sources" :
          [
              {
              "id" : 40,
              "name" : "com.linkedin.events.example.or_test.Person",
              "uri": "or_test.person",
              "partitionFunction" : "constant:1"
               }
          ]
      }
    
  • databus2-example-relay-pkg/schemas_registry/下定义person的Avro schema文件 com.linkedin.events.example.or_test.Person.1.avsc,其中1表示版本(Databus目前没有针对mysql提供生成Avro schema文件的工具,所以只能手工编写)具体内容如下所示:

      {
        "name" : "Person_V1",
        "doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST",
        "type" : "record",
        "meta" : "dbFieldName=person;pk=id;",
        "namespace" : "com.linkedin.events.example.or_test",
        "fields" : [ {
          "name" : "id",
          "type" : [ "long", "null" ],
          "meta" : "dbFieldName=ID;dbFieldPosition=0;"
        }, {
          "name" : "firstName",
          "type" : [ "string", "null" ],
          "meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"
        }, {
          "name" : "lastName",
          "type" : [ "string", "null" ],
          "meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"
        }, {
          "name" : "birthDate",
          "type" : [ "long", "null" ],
          "meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"
        }, {
          "name" : "deleted",
          "type" : [ "string", "null" ],
          "meta" : "dbFieldName=DELETED;dbFieldPosition=4;"
        } ]
      }
    
  • 注册Avro schema到index.schemas_registry文件,databus2-example-relay-pkg/schemas_registry/index.schemas_registry文件中添加行com.linkedin.events.example.or_test.Person.1.avsc ,每定义一个Avro schema都需要添加进去,relay运行时会到此文件中查找表对应的定义的Avro schema。

6.3.3 Client端的操作

  • 配置Client属性文件:databus2-example-client-pkg/conf/client-person.properties的内容如下配置,包括端口号,buffer存储策略,checkpoint持久化等信息:

      databus.relay.container.httpPort=11125
      databus.relay.container.jmx.rmiEnabled=false
      databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY
      databus.relay.eventBuffer.queuePolicy=BLOCK_ON_WRITE
      databus.relay.schemaRegistry.type=FILE_SYSTEM
      databus.relay.eventBuffer.maxSize=10240000
      databus.relay.eventBuffer.readBufferSize=1024000
      databus.relay.eventBuffer.scnIndexSize=1024000
      databus.client.connectionDefaults.pullerRetries.initSleep=1
      databus.client.checkpointPersistence.fileSystem.rootDirectory=./personclient-checkpoints
      databus.client.checkpointPersistence.clearBeforeUse=false
      databus.client.connectionDefaults.enablePullerMessageQueueLogging=true
    
  • databus2-example-client/src/main/java下的PersonConsumer类是消费逻辑回调代码,主要是取出每一个event后依次打印每个字段的名值对,主要代码如下:

      private ConsumerCallbackResult processEvent(DbusEvent event, DbusEventDecoder eventDecoder)
      {
          GenericRecord decodedEvent = eventDecoder.getGenericRecord(event, null);
          try {
            Utf8 firstName = (Utf8)decodedEvent.get("firstName");
            Utf8 lastName = (Utf8)decodedEvent.get("lastName");
            Long birthDate = (Long)decodedEvent.get("birthDate");
            Utf8 deleted = (Utf8)decodedEvent.get("deleted");
    
            LOG.info("firstName: " + firstName.toString() +
                     ", lastName: " + lastName.toString() +
                     ", birthDate: " + birthDate +
                     ", deleted: " + deleted.toString());
          } catch (Exception e) {
            LOG.error("error decoding event ", e);
            return ConsumerCallbackResult.ERROR;
          }
    
          return ConsumerCallbackResult.SUCCESS;
      }
    
  • databus2-example-client/src/main/java下的PersonClient类是relay的启动主类,主要是设置启动Client的配置信息,将消费者实例注册到监听器中,后续可对其进行回调,主要代码如下:

      public static void main(String[] args) throws Exception
      {
          DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();
    
          //Try to connect to a relay on localhost
          configBuilder.getRuntime().getRelay("1").setHost("localhost");
          configBuilder.getRuntime().getRelay("1").setPort(11115);
          configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);
    
          //Instantiate a client using command-line parameters if any
          DatabusHttpClientImpl client = DatabusHttpClientImpl.createFromCli(args, configBuilder);
    
          //register callbacks
          PersonConsumer personConsumer = new PersonConsumer();
          client.registerDatabusStreamListener(personConsumer, null, PERSON_SOURCE);
          client.registerDatabusBootstrapListener(personConsumer, null, PERSON_SOURCE);
    
          //fire off the Databus client
          client.startAndBlock();
      }
    

6.3.4 build-启动-测试

  • Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令gradle -Dopen_source=true assemble即可完成build,成功后在databus根目录下生成名为build的文件夹

  • 启动Relay:

    1. cd build/databus2-example-relay-pkg/distributions
    2. tar -zxvf databus2-example-relay-pkg.tar.gz解压
    3. 执行启动脚本 ./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json
    4. 执行命令 curl -s http://localhost:11115/sources返回如下内容说明启动成功:
  • 启动Client:
    1. cd build/databus2-example-client-pkg/distributions
    2. tar -zxvf databus2-example-client-pkg.tar.gz解压
    3. 执行启动脚本 ./bin/start-example-client.sh person
    4. 执行命令 curl http://localhost:11115/relayStats/outbound/http/clients返回如下内容说明启动成功:
  • 测试:

    Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了,现在向person表插入一条如下记录:

    databus2-example-relay-pkg/distributions/logs下的relay.log记录如下:

    databus2-example-client-pkg/distributions/logs下的client.log记录如下:

    可以看到已经可以抓取到改变的数据了!

7. 总结

遇到的问题

  • 主要是属性文件的配置问题,包括source-or-person.json, schemas_registry的文件缺失或配置错误。
  • 脚本方式启动时JVM无法创建,由于脚本启动时包含了自定义的JVM参数,与系统环境不符导致启动失败,去掉相关参数后正常启动。
  • Relay可以获取增删改查的Event,但Client只能解析到更新操作的Event,主要原因是Mysql默认的binlog_format=MIXED,而databus的设计是针对ROW格式的binlog,修改格式后可正常解析。
  • Windows平台无法使用,启动方式是用脚本启动,脚本启动时包含命令行参数较多,启动后无法进行调试,只能通过对日志观察的方式来进行。

需要进一步实验

  • 使用bootstrap produces和bootstrap servers模式来进行大批量事件的获取
  • 配置多个relay进行事件抓取
  • 结合zookeeper来配置客户端集群进行消费

参考资料

相关阅读:Databus架构分析与初步实践(for mysql)(上篇)

本文来自网易实践者社区,经作者徐和东授权发布。