ogg #
ogg
全名Oracle GoldenGate
,是Oracle公司用于同步数据库的工具。
1. 需求 #
监听源数据库MySQL/Oracle,将DML事件发送到Kafka中。
2. 方案 #
2.1. 使用ogg(推荐) #
ogg支持MySQL和Oracle,将DML同步到kafka中,提供给后端使用。
2.2. 使用canal #
canal是阿里开源的数据库同步/迁移工具,目前仅支持MySQL作为源数据库;对于Oracle,可以使用yugong同步到MySQL中,然后再基于canal进行binlog解析,最后同步到kafka中。
3. ogg的安装部署(MySQL) #
3.1. 准备材料 #
-
下载ogg二进制包
- Oracle GoldenGate for Big Data 19.1.0.0.1 on Linux x86-64
- Oracle GoldenGate 19.1.0.0.3 for MySQL on Linux x86-64
- centos7
- ogg-mysql(10.41.253.211)
- ogg-target(10.41.253.212)
- JDK 1.8
- /usr/local/jdk
3.2. 安装测试MySQL #
ubuntu
# 安装
$ apt install mysql-server-5.7 -y
# 简单配置
$ mysql_secure_installation
# 添加binlog配置
$ grep -E "^(server-id|log_bin|binlog_format)" /etc/mysql/mysql.conf.d/mysqld.cnf
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = row
$ mkdir /var/log/mysql && \
chown mysql:mysql -R /var/log/mysql
$ systemctl restart mysqld
centos7
# 配置源
$ cd /tmp && wget https://dev.mysql.com/get/mysql57-community-release-el7-9.noarch.rpm && \
rpm -i mysql57-community-release-el7-9.noarch.rpm
# 安装mysql
$ yum install mysql-server -y
# 启动mysql
$ systemctl start mysqld
# 获取root密码
$ grep 'temporary password' /var/log/mysqld.log
# 初始化配置
$ mysql_secure_installation
建立测试库表
CREATE DATABASE data CHARSET utf8;
CREATE TABLE `data`.`t1` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(60) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
# ogg
create user 'ogg'@'%' identified by 'Ogg.123456';
grant select on data.* to 'ogg'@'%';
# mng
create user 'root'@'%' identified by 'Ogg.123456';
grant all privileges on *.* to 'ogg'@'%';
flush privileges;
3.3. 安装zookeeper以及kafka #
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
3.4. 安装ogg #
# base /opt/ogg
# /etc/profile
export OGG_HOME=/opt/ogg
export PATH=$OGG_HOME:$PATH
export LD_LIBRARY_PATH=$OGG_HOME:$JAVA_HOME/jre/lib/amd64/server:$LD_LIBRARY_PATH
3.5. 配置ogg-mysql #
3.5.1. 初始化目录 #
oggsci> create subdirs
3.5.2. 配置启动mgr进程 #
oggsci> edit param mgr
port 7809
dynamicportlist 7840-7939
autorestart er *, retries 5, waitminutes 3
purgeoldextracts /opt/ogg/dirdat/*,usecheckpoints, minkeepdays 7
oggsci> start mgr
oggsci> info mgr
- port: 进程的默认监听端口
- dynamicportlist:动态端口列表
- autorestart:重启参数,配置重启所有extract进程
- purgeoldextracts:TRAIL文件的定期清理策略
3.5.3. 配置启动extract进程 #
oggsci> edit param extmysql
extract extmysql
sourcedb data@ogg-mysql:3306 userid ogg password Ogg.123456
exttrail /opt/ogg/dirdat/om
TranLogOptions AltLogDest /var/log/mysql/mysql-bin.index
table data.t1;
# 添加extract
oggsci> add extract extmysql,tranlog,begin now
# 绑定extract和trail
oggsci> add exttrail /opt/ogg/dirdat/om,extract extmysql
add exttrail /opt/ogg/dirdat/m2,extract extmysql
3.5.4. 配置传输pump进程 #
oggsci> edit param pumpcp
extract pumpcp
passthru
sourcedb data@ogg-mysql:3306 userid ogg password Ogg.123456
rmthost ogg-target,mgrport 7809,compress
rmttrail /opt/ogg/dirdat/om
dynamicresolution
numfiles 3000
table data.t1;
# 分别添加本地trail文件和目标端trail文件绑定到mcp1进程
oggsci> add extract pumpcp,exttrailsource /opt/ogg/dirdat/om
oggsci> add rmttrail /opt/ogg/dirdat/om,extract pumpcp
add rmttrail /opt/ogg/dirdat/m2,extract pumpcp
3.5.5. 创建定义文件 #
ggsci> edit param defgen
defsfile ./dirdef/datat1.def
sourcedb data@ogg-mysql:3306 userid ogg password Ogg.123456
table data.t1;
$ defgen paramfile ./dirprm/defgen.prm
$ scp ./dirdef/datat1.def ogg-target:/opt/ogg/dirdef/
3.6. 配置ogg-target #
3.6.1. 初始化目录 #
oggsci> create subdirs
3.6.2. 配置启动mgr进程 #
oggsci> edit param mgr
port 7809
dynamicportlist 7840-7939
autorestart er *, retries 5, waitminutes 3
purgeoldextracts /opt/ogg/dirdat/*,usecheckpoints, minkeepdays 7
oggsci> start mgr
oggsci> info mgr
3.6.3. 配置replicat进程 #
oggsci> edit param repkfk
REPLICAT repkfk
sourcedefs /opt/ogg/dirdef/datat1.def
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP data.t1, TARGET data.t1;
oggsci> add replicat repkfk exttrail /opt/ogg/dirdat/om,checkpointtable ogg-mapping-tmpl.checkpoint
oggsci> add replicat repkfk exttrail /opt/ogg/dirdat/m2,checkpointtable ogg-mapping-tmpl.checkpoint
kafka.props
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=${tableName}
gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.SchemaTopicName=ogg-schema
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=/usr/local/kafka/libs/*
custom_kafka_producer.properties
bootstrap.servers=localhost:9092
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=200
3.7. 启动并测试 #
- ogg-mysql -> start mgr
- ogg-mysql -> start extract
- ogg-target -> start mgr
- ogg-mysql -> start pump
- ogg-target -> start replicat
监听kafka队列
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1 --from-beginning
{"table":"data.t1","op_type":"U","op_ts":"2020-07-08 20:18:21.000390","current_ts":"2020-07-08T20:18:27.673000","pos":"00000000000000003260","before":{},"after":{"ID":3,"NAME":"luoshuai1"}}