Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加本地binlog解析 #13

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,13 @@ INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33',

欢迎提问题提需求,欢迎pull requests!


bin2sql
========================

Usage:

```bash
python bin2sql.py -h 127.0.0.1 --start-datetime='2017-04-05 21:44:54' --stop-datetime='2017-04-05 21:45:00' -f mysql-bin.000001
```

116 changes: 116 additions & 0 deletions binlog2sql/bin2sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os, sys, datetime
import pymysql
from binlogfile import BinLogFileReader
from pymysqlreplication.row_event import (
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
)
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from binlog2sql_util import concat_sql_from_binlogevent, create_unique_file, reversed_lines
from bin2sql_util import command_line_args

class Bin2sql(object):
def __init__(self, filePath, connectionSettings, startPos=None, endPos=None, startTime=None,
stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False):
'''
connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave}
'''
#if not startFile:
# raise ValueError('lack of parameter,startFile.')

self.filePath = filePath
self.connectionSettings = connectionSettings
self.startPos = startPos if startPos else 4 # use binlog v4
self.endPos = endPos
self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")

self.only_schemas = only_schemas if only_schemas else None
self.only_tables = only_tables if only_tables else None
self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever)

self.binlogList = []
self.connection = pymysql.connect(**self.connectionSettings)


def process_binlog(self):
stream = BinLogFileReader(self.filePath, ctl_connection_settings=self.connectionSettings,
log_pos=self.startPos, only_schemas=self.only_schemas,
only_tables=self.only_tables, resume_stream=True)

cur = self.connection.cursor()
tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile.
ftmp = open(tmpFile ,"w")
flagLastEvent = False
eStartPos, lastPos = stream.log_pos, stream.log_pos
try:
for binlogevent in stream:
if not self.stopnever:
if datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime:
if not (
isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
continue
elif datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime:
break
else:
pass

if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN':
eStartPos = lastPos

if isinstance(binlogevent, QueryEvent):
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk)
if sql:
print sql
elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent):
for row in binlogevent.rows:
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos)
if self.flashback:
ftmp.write(sql + '\n')
else:
print sql

if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
if flagLastEvent:
break
ftmp.close()

if self.flashback:
self.print_rollback_sql(tmpFile)
finally:
os.remove(tmpFile)
cur.close()
stream.close()
return True

def print_rollback_sql(self, fin):
'''print rollback sql from tmpfile'''
with open(fin) as ftmp:
sleepInterval = 1000
i = 0
for line in reversed_lines(ftmp):
print line.rstrip()
if i >= sleepInterval:
print 'SELECT SLEEP(1);'
i = 0
else:
i += 1

def __del__(self):
pass


if __name__ == '__main__':
args = command_line_args(sys.argv[1:])
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
bin2sql = Bin2sql(filePath=args.file_path[0], connectionSettings=connectionSettings,
startPos=args.startPos, endPos=args.endPos,
startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases,
only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever)
bin2sql.process_binlog()
61 changes: 61 additions & 0 deletions binlog2sql/bin2sql_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

import argparse
import sys
from binlog2sql_util import is_valid_datetime

def parse_args(args):
"""parse args for binlog2sql"""
parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL you want', add_help=False)
connect_setting = parser.add_argument_group('connect setting')
connect_setting.add_argument('-h','--host', dest='host', type=str,
help='Host the MySQL database server located', default='127.0.0.1')
connect_setting.add_argument('-u', '--user', dest='user', type=str,
help='MySQL Username to log in as', default='root')
connect_setting.add_argument('-p', '--password', dest='password', type=str,
help='MySQL Password to use', default='')
connect_setting.add_argument('-P', '--port', dest='port', type=int,
help='MySQL port to use', default=3306)
range = parser.add_argument_group('range filter')
range.add_argument('--start-position', '--start-pos', dest='startPos', type=int,
help='Start position of the --start-file', default=4)
range.add_argument('--stop-position', '--end-pos', dest='endPos', type=int,
help="Stop position of --stop-file. default: latest position of '--stop-file'", default=0)
range.add_argument('--start-datetime', dest='startTime', type=str,
help="Start reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='')
range.add_argument('--stop-datetime', dest='stopTime', type=str,
help="Stop reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='')
parser.add_argument('--stop-never', dest='stopnever', action='store_true',
help='Wait for more data from the server. default: stop replicate at the last binlog when you start binlog2sql', default=False)

parser.add_argument('--help', dest='help', action='store_true', help='help infomation', default=False)

schema = parser.add_argument_group('schema filter')
schema.add_argument('-d', '--databases', dest='databases', type=str, nargs='*',
help='dbs you want to process', default='')
schema.add_argument('-t', '--tables', dest='tables', type=str, nargs='*',
help='tables you want to process', default='')
parser.add_argument('-f', '--file-path', dest='file_path', type=str, nargs='*', help='binlog file path', default='')

# exclusive = parser.add_mutually_exclusive_group()
parser.add_argument('-K', '--no-primary-key', dest='nopk', action='store_true',
help='Generate insert sql without primary key if exists', default=False)
parser.add_argument('-B', '--flashback', dest='flashback', action='store_true',
help='Flashback data to start_postition of start_file', default=False)
return parser

def command_line_args(args):
needPrintHelp = False if args else True
parser = parse_args(args)
args = parser.parse_args(args)
if args.help or needPrintHelp:
parser.print_help()
sys.exit(1)
if args.flashback and args.stopnever:
raise ValueError('Only one of flashback or stop-never can be True')
if args.flashback and args.nopk:
raise ValueError('Only one of flashback or nopk can be True')
if (args.startTime and not is_valid_datetime(args.startTime)) or (args.stopTime and not is_valid_datetime(args.stopTime)):
raise ValueError('Incorrect datetime argument')
return args
Loading