Python my-replication读取mysql的binlog

来自Linux78|wiki

一、准备

Python 2.7;
Python 3.4 or Python 3.5 or Python 3.6;
MySQL 5.5 or MySQL 5.6 or MySQL 5.7;

二、Mysql 配置

1. Mysql 开启 binlog

查看 Mysql 是否开启 binlog show variables like 'log_bin' 如果 Value 为 OFF 则为未启日志文件。

找到 my,cnf 中 [mysqld] 添加如下

[mysqld]
# binlog 配置
server-id = 1
# 目录需要有写的权限
log-bin = /var/lib/mysql-binlog/mysql-bin.log
expire-logs-days = 10
max-binlog-size = 100M
binlog-format = row

重启mysql后 show variables like 'log_bin' , Value 为 ON即可

查询binlog 变动信息 show binlog events;

Binlog要满足如下条件:

MySQL> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.01 sec)

MySQL>show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

MySQL>show variables like 'binlog_row_image';
+------------------+-------+
| Variable_name    | Value |
+------------------+-------+
| binlog_row_image | FULL  |
+------------------+-------+
1 row in set (0.00 sec)

MySQL> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      431 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+


2. 配置同步账号

自己伪装成一个 slave 不断的从 MySQL 数据库获取 binlog 并解析。

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';
# 刷新权限
flush privileges;

三、Python

安装 mysql-replication 包

pip install mysql-replication

代码例子:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
   DeleteRowsEvent,
   UpdateRowsEvent,
   WriteRowsEvent,
)
import sys
import json

def main():
   mysql_settings = {
   	'host': '127.0.0.7',
       'port': 3306, 
       'user': 'replicator', 
       'passwd': '123456'
       }
   stream = BinLogStreamReader(
       connection_settings=mysql_settings,
       server_id=101,
       blocking=True,
       only_schemas=['zow'],  # 监控表名
       only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
       resume_stream=True,
       log_file='mysql-bin.000001', log_pos=0)  #show master status; 中的Position 
   for binlogevent in stream:
       for row in binlogevent.rows:
           event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
           if isinstance(binlogevent, DeleteRowsEvent):
               event["action"] = "delete"
               event["values"] = dict(row["values"].items())
               event = dict(event.items())
           elif isinstance(binlogevent, UpdateRowsEvent):
               event["action"] = "update"
               event["before_values"] = dict(row["before_values"].items())
               event["after_values"] = dict(row["after_values"].items())
               event = dict(event.items())
           elif isinstance(binlogevent, WriteRowsEvent):
               event["action"] = "insert"
               event["values"] = dict(row["values"].items())
               event = dict(event.items())
           print( json.dumps(event))
           sys.stdout.flush()
   stream.close()


if __name__ == "__main__":
   main()

运行结果:

# python mysql-replication.py 
{"table": "test4", "log_pos": 884, "action": "insert", "schema": "test", "values": {"id": 1, "data2": "World", "data": "Hello"}}
{"table": "test4", "after_values": {"id": 1, "data2": "Hello", "data": "World"}, "log_pos": 1111, "action": "update", "schema": "test", 
"before_values": {"id": 1, "data2": "World", "data": "Hello"}}
{"table": "test4", "log_pos": 1320, "action": "delete", "schema": "test", "values": {"id": 1, "data2": "Hello", "data": "World"}}
{"table": "test4", "log_pos": 1529, "action": "insert", "schema": "test", "values": {"id": 2, "data2": "World", "data": "Hello"}}
{"table": "test4", "log_pos": 3010, "action": "delete", "schema": "test", "values": {"id": 2, "data2": "Hello", "data": "World"}}


参考:

https://github.com/noplay/python-mysql-replication

其他语言:

Java: https://github.com/shyiko/mysql-binlog-connector-java

GO: https://github.com/siddontang/go-mysql

PHP: Based on this this project https://github.com/krowinski/php-mysql-replication and https://github.com/fengxiangyun/mysql-replication