ogg

ogg #

ogg全名Oracle GoldenGate,是Oracle公司用于同步数据库的工具。

1. 需求 #

监听源数据库MySQL/Oracle,将DML事件发送到Kafka中。

2. 方案 #

2.1. 使用ogg(推荐) #

ogg支持MySQL和Oracle,将DML同步到kafka中,提供给后端使用。

2.2. 使用canal #

canal是阿里开源的数据库同步/迁移工具,目前仅支持MySQL作为源数据库;对于Oracle,可以使用yugong同步到MySQL中,然后再基于canal进行binlog解析,最后同步到kafka中。

3. ogg的安装部署(MySQL) #

3.1. 准备材料 #

  • 下载ogg二进制包
    • Oracle GoldenGate for Big Data 19.1.0.0.1 on Linux x86-64
    • Oracle GoldenGate 19.1.0.0.3 for MySQL on Linux x86-64
  • centos7
    • ogg-mysql(10.41.253.211)
    • ogg-target(10.41.253.212)
  • JDK 1.8
    • /usr/local/jdk

3.2. 安装测试MySQL #

ubuntu

# 安装
$ apt install mysql-server-5.7 -y
# 简单配置
$ mysql_secure_installation
# 添加binlog配置
$ grep -E "^(server-id|log_bin|binlog_format)" /etc/mysql/mysql.conf.d/mysqld.cnf
server-id		= 1
log_bin			= /var/log/mysql/mysql-bin.log
binlog_format	=	row
$ mkdir /var/log/mysql && \
 chown mysql:mysql -R /var/log/mysql
 $ systemctl restart mysqld

centos7

# 配置源
$ cd /tmp && wget https://dev.mysql.com/get/mysql57-community-release-el7-9.noarch.rpm && \
  rpm -i mysql57-community-release-el7-9.noarch.rpm
# 安装mysql
$ yum install mysql-server -y
# 启动mysql
$ systemctl start mysqld
# 获取root密码
$ grep 'temporary password' /var/log/mysqld.log
# 初始化配置
$ mysql_secure_installation

建立测试库表

CREATE DATABASE data CHARSET utf8;

CREATE TABLE `data`.`t1` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(60) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

# ogg 
create user 'ogg'@'%' identified by 'Ogg.123456';
grant select on data.* to 'ogg'@'%';
# mng
create user 'root'@'%' identified by 'Ogg.123456';
grant all privileges on *.* to 'ogg'@'%';
flush privileges;

3.3. 安装zookeeper以及kafka #

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

3.4. 安装ogg #

# base /opt/ogg

# /etc/profile
export OGG_HOME=/opt/ogg
export PATH=$OGG_HOME:$PATH
export LD_LIBRARY_PATH=$OGG_HOME:$JAVA_HOME/jre/lib/amd64/server:$LD_LIBRARY_PATH

3.5. 配置ogg-mysql #

3.5.1. 初始化目录 #

oggsci> create subdirs

3.5.2. 配置启动mgr进程 #

oggsci> edit param mgr

port 7809
dynamicportlist 7840-7939
autorestart er *, retries 5, waitminutes 3
purgeoldextracts /opt/ogg/dirdat/*,usecheckpoints, minkeepdays 7

oggsci> start mgr
oggsci> info mgr
  • port: 进程的默认监听端口
  • dynamicportlist:动态端口列表
  • autorestart:重启参数,配置重启所有extract进程
  • purgeoldextracts:TRAIL文件的定期清理策略

3.5.3. 配置启动extract进程 #

oggsci> edit param extmysql

extract extmysql
sourcedb data@ogg-mysql:3306 userid ogg password Ogg.123456
exttrail /opt/ogg/dirdat/om
TranLogOptions AltLogDest /var/log/mysql/mysql-bin.index
table data.t1;

# 添加extract
oggsci> add extract extmysql,tranlog,begin now
# 绑定extract和trail
oggsci> add exttrail /opt/ogg/dirdat/om,extract extmysql

add exttrail /opt/ogg/dirdat/m2,extract extmysql

3.5.4. 配置传输pump进程 #

oggsci> edit param pumpcp

extract pumpcp
passthru
sourcedb data@ogg-mysql:3306 userid ogg password Ogg.123456
rmthost  ogg-target,mgrport 7809,compress
rmttrail  /opt/ogg/dirdat/om
dynamicresolution
numfiles 3000
table data.t1;

# 分别添加本地trail文件和目标端trail文件绑定到mcp1进程
oggsci> add extract pumpcp,exttrailsource /opt/ogg/dirdat/om
oggsci> add rmttrail /opt/ogg/dirdat/om,extract pumpcp

add rmttrail /opt/ogg/dirdat/m2,extract pumpcp

3.5.5. 创建定义文件 #

ggsci> edit param defgen

defsfile ./dirdef/datat1.def
sourcedb data@ogg-mysql:3306 userid ogg password Ogg.123456
 table data.t1;
$ defgen paramfile ./dirprm/defgen.prm
$ scp ./dirdef/datat1.def ogg-target:/opt/ogg/dirdef/

3.6. 配置ogg-target #

3.6.1. 初始化目录 #

oggsci> create subdirs

3.6.2. 配置启动mgr进程 #

oggsci> edit param mgr

port 7809
dynamicportlist 7840-7939
autorestart er *, retries 5, waitminutes 3
purgeoldextracts /opt/ogg/dirdat/*,usecheckpoints, minkeepdays 7

oggsci> start mgr
oggsci> info mgr

3.6.3. 配置replicat进程 #

oggsci> edit param repkfk

REPLICAT repkfk
sourcedefs /opt/ogg/dirdef/datat1.def
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP data.t1, TARGET data.t1;

oggsci> add replicat repkfk exttrail /opt/ogg/dirdat/om,checkpointtable ogg-mapping-tmpl.checkpoint

oggsci> add replicat repkfk exttrail /opt/ogg/dirdat/m2,checkpointtable ogg-mapping-tmpl.checkpoint

kafka.props

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=${tableName}
gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.SchemaTopicName=ogg-schema
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=/usr/local/kafka/libs/*

custom_kafka_producer.properties

bootstrap.servers=localhost:9092
acks=1
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

batch.size=16384
linger.ms=200

3.7. 启动并测试 #

  1. ogg-mysql -> start mgr
  2. ogg-mysql -> start extract
  3. ogg-target -> start mgr
  4. ogg-mysql -> start pump
  5. ogg-target -> start replicat

监听kafka队列

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1 --from-beginning

{"table":"data.t1","op_type":"U","op_ts":"2020-07-08 20:18:21.000390","current_ts":"2020-07-08T20:18:27.673000","pos":"00000000000000003260","before":{},"after":{"ID":3,"NAME":"luoshuai1"}}