From 08b921f092baa0e29ec2a24ba2750872ee85fc3e Mon Sep 17 00:00:00 2001 From: xuyi Date: Mon, 10 Apr 2017 23:34:00 +0800 Subject: [PATCH 1/3] add binlog file parser --- binlog2sql/bin2sql.py | 116 ++++++++++++++ binlog2sql/bin2sql_util.py | 61 ++++++++ binlog2sql/binlogfile.py | 313 +++++++++++++++++++++++++++++++++++++ 3 files changed, 490 insertions(+) create mode 100755 binlog2sql/bin2sql.py create mode 100644 binlog2sql/bin2sql_util.py create mode 100644 binlog2sql/binlogfile.py diff --git a/binlog2sql/bin2sql.py b/binlog2sql/bin2sql.py new file mode 100755 index 0000000..9e099cc --- /dev/null +++ b/binlog2sql/bin2sql.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os, sys, datetime +import pymysql +from binlogfile import BinLogFileReader +from pymysqlreplication.row_event import ( + WriteRowsEvent, + UpdateRowsEvent, + DeleteRowsEvent, +) +from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent +from binlog2sql_util import concat_sql_from_binlogevent, create_unique_file, reversed_lines +from bin2sql_util import command_line_args + +class Bin2sql(object): + def __init__(self, filePath, connectionSettings, startPos=None, endPos=None, startTime=None, + stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False): + ''' + connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave} + ''' + #if not startFile: + # raise ValueError('lack of parameter,startFile.') + + self.filePath = filePath + self.connectionSettings = connectionSettings + self.startPos = startPos if startPos else 4 # use binlog v4 + self.endPos = endPos + self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") + self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") + + self.only_schemas = only_schemas if only_schemas else None + self.only_tables = only_tables if only_tables else None + self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever) + + self.binlogList = [] + self.connection = pymysql.connect(**self.connectionSettings) + + + def process_binlog(self): + stream = BinLogFileReader(self.filePath, ctl_connection_settings=self.connectionSettings, + log_pos=self.startPos, only_schemas=self.only_schemas, + only_tables=self.only_tables, resume_stream=True) + + cur = self.connection.cursor() + tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile. + ftmp = open(tmpFile ,"w") + flagLastEvent = False + eStartPos, lastPos = stream.log_pos, stream.log_pos + try: + for binlogevent in stream: + if not self.stopnever: + if datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime: + if not ( + isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): + lastPos = binlogevent.packet.log_pos + continue + elif datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime: + break + else: + pass + + if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN': + eStartPos = lastPos + + if isinstance(binlogevent, QueryEvent): + sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk) + if sql: + print sql + elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent): + for row in binlogevent.rows: + sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos) + if self.flashback: + ftmp.write(sql + '\n') + else: + print sql + + if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): + lastPos = binlogevent.packet.log_pos + if flagLastEvent: + break + ftmp.close() + + if self.flashback: + self.print_rollback_sql(tmpFile) + finally: + os.remove(tmpFile) + cur.close() + stream.close() + return True + + def print_rollback_sql(self, fin): + '''print rollback sql from tmpfile''' + with open(fin) as ftmp: + sleepInterval = 1000 + i = 0 + for line in reversed_lines(ftmp): + print line.rstrip() + if i >= sleepInterval: + print 'SELECT SLEEP(1);' + i = 0 + else: + i += 1 + + def __del__(self): + pass + + +if __name__ == '__main__': + args = command_line_args(sys.argv[1:]) + connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password} + bin2sql = Bin2sql(filePath=args.file_path[0], connectionSettings=connectionSettings, + startPos=args.startPos, endPos=args.endPos, + startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases, + only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever) + bin2sql.process_binlog() \ No newline at end of file diff --git a/binlog2sql/bin2sql_util.py b/binlog2sql/bin2sql_util.py new file mode 100644 index 0000000..79a0f0f --- /dev/null +++ b/binlog2sql/bin2sql_util.py @@ -0,0 +1,61 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import argparse +import sys +from binlog2sql_util import is_valid_datetime + +def parse_args(args): + """parse args for binlog2sql""" + parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL you want', add_help=False) + connect_setting = parser.add_argument_group('connect setting') + connect_setting.add_argument('-h','--host', dest='host', type=str, + help='Host the MySQL database server located', default='127.0.0.1') + connect_setting.add_argument('-u', '--user', dest='user', type=str, + help='MySQL Username to log in as', default='root') + connect_setting.add_argument('-p', '--password', dest='password', type=str, + help='MySQL Password to use', default='') + connect_setting.add_argument('-P', '--port', dest='port', type=int, + help='MySQL port to use', default=3306) + range = parser.add_argument_group('range filter') + range.add_argument('--start-position', '--start-pos', dest='startPos', type=int, + help='Start position of the --start-file', default=4) + range.add_argument('--stop-position', '--end-pos', dest='endPos', type=int, + help="Stop position of --stop-file. default: latest position of '--stop-file'", default=0) + range.add_argument('--start-datetime', dest='startTime', type=str, + help="Start reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='') + range.add_argument('--stop-datetime', dest='stopTime', type=str, + help="Stop reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='') + parser.add_argument('--stop-never', dest='stopnever', action='store_true', + help='Wait for more data from the server. default: stop replicate at the last binlog when you start binlog2sql', default=False) + + parser.add_argument('--help', dest='help', action='store_true', help='help infomation', default=False) + + schema = parser.add_argument_group('schema filter') + schema.add_argument('-d', '--databases', dest='databases', type=str, nargs='*', + help='dbs you want to process', default='') + schema.add_argument('-t', '--tables', dest='tables', type=str, nargs='*', + help='tables you want to process', default='') + parser.add_argument('-f', '--file-path', dest='file_path', type=str, nargs='*', help='binlog file path', default='') + + # exclusive = parser.add_mutually_exclusive_group() + parser.add_argument('-K', '--no-primary-key', dest='nopk', action='store_true', + help='Generate insert sql without primary key if exists', default=False) + parser.add_argument('-B', '--flashback', dest='flashback', action='store_true', + help='Flashback data to start_postition of start_file', default=False) + return parser + +def command_line_args(args): + needPrintHelp = False if args else True + parser = parse_args(args) + args = parser.parse_args(args) + if args.help or needPrintHelp: + parser.print_help() + sys.exit(1) + if args.flashback and args.stopnever: + raise ValueError('Only one of flashback or stop-never can be True') + if args.flashback and args.nopk: + raise ValueError('Only one of flashback or nopk can be True') + if (args.startTime and not is_valid_datetime(args.startTime)) or (args.stopTime and not is_valid_datetime(args.stopTime)): + raise ValueError('Incorrect datetime argument') + return args \ No newline at end of file diff --git a/binlog2sql/binlogfile.py b/binlog2sql/binlogfile.py new file mode 100644 index 0000000..ebc6636 --- /dev/null +++ b/binlog2sql/binlogfile.py @@ -0,0 +1,313 @@ +# -*- coding: utf-8 -*- + +import pymysql +import struct + +from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE +from pymysql.cursors import DictCursor +from pymysql.util import int2byte + +from pymysqlreplication.packet import BinLogPacketWrapper +from pymysqlreplication.constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT +from pymysqlreplication.gtid import GtidSet +from pymysqlreplication.event import ( + QueryEvent, RotateEvent, FormatDescriptionEvent, + XidEvent, GtidEvent, StopEvent, + BeginLoadQueryEvent, ExecuteLoadQueryEvent, + HeartbeatLogEvent, NotImplementedEvent) +from pymysqlreplication.row_event import ( + UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) + +try: + from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID +except ImportError: + # Handle old pymysql versions + # See: https://github.com/PyMySQL/PyMySQL/pull/261 + COM_BINLOG_DUMP_GTID = 0x1e + +from StringIO import StringIO +from pymysql.util import byte2int + +# 2013 Connection Lost +# 2006 MySQL server has gone away +MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] + + +class StringIOAdvance(StringIO): + def advance(self, length): + self.seek(self.tell() + length) + + +class BinLogFileReader(object): + + """Connect to replication stream and read event + """ + report_slave = None + _expected_magic = b'\xfebin' + + def __init__(self, file_path, ctl_connection_settings=None, resume_stream=False, + blocking=False, only_events=None, log_file=None, log_pos=None, + filter_non_implemented_events=True, + ignored_events=None, auto_position=None, + only_tables=None, ignored_tables=None, + only_schemas=None, ignored_schemas=None, + freeze_schema=False, skip_to_timestamp=None, + report_slave=None, slave_uuid=None, + pymysql_wrapper=None, + fail_on_table_metadata_unavailable=False, + slave_heartbeat=None): + + # open file + self._file = None + self._file_path = file_path + self._pos = None + + self.__connected_ctl = False + self._ctl_connection = None + self._ctl_connection_settings = ctl_connection_settings + if ctl_connection_settings: + self._ctl_connection_settings.setdefault("charset", "utf8") + + self.__only_tables = only_tables + self.__ignored_tables = ignored_tables + self.__only_schemas = only_schemas + self.__ignored_schemas = ignored_schemas + self.__freeze_schema = freeze_schema + self.__allowed_events = self._allowed_event_list( + only_events, ignored_events, filter_non_implemented_events) + self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable + + # We can't filter on packet level TABLE_MAP and rotate event because + # we need them for handling other operations + self.__allowed_events_in_packet = frozenset( + [TableMapEvent, RotateEvent]).union(self.__allowed_events) + + self.__use_checksum = self.__checksum_enabled() + + # Store table meta information + self.table_map = {} + self.log_pos = log_pos + self.log_file = log_file + self.auto_position = auto_position + self.skip_to_timestamp = skip_to_timestamp + + self.report_slave = None + self.slave_uuid = slave_uuid + self.slave_heartbeat = slave_heartbeat + + if pymysql_wrapper: + self.pymysql_wrapper = pymysql_wrapper + else: + self.pymysql_wrapper = pymysql.connect + + def close(self): + if self._file: + self._file.close() + self._file_path = None + if self.__connected_ctl: + self._ctl_connection._get_table_information = None + self._ctl_connection.close() + self.__connected_ctl = False + + def __connect_to_ctl(self): + self._ctl_connection_settings["db"] = "information_schema" + self._ctl_connection_settings["cursorclass"] = DictCursor + self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) + self._ctl_connection._get_table_information = self.__get_table_information + self.__connected_ctl = True + + def __checksum_enabled(self): + """Return True if binlog-checksum = CRC32. Only for MySQL > 5.6""" + try: + self._ctl_connection.execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'") + result = self._ctl_connection.fetchone() + + if result is None: + return False + var, value = result[:2] + if value == 'NONE': + return False + return True + except Exception: + return True + + def __connect_to_stream(self): + if self._file is None: + self._file = open(self._file_path, 'rb+') + self._pos = self._file.tell() + assert self._pos == 0 + # read magic + if self._pos == 0: + magic = self._file.read(4) + if magic == self._expected_magic: + self._pos += len(magic) + else: + messagefmt = 'Magic bytes {0!r} did not match expected {1!r}' + message = messagefmt.format(magic, self._expected_magic) + raise BadMagicBytesError(message) + + def fetchone(self): + while True: + if not self._file: + self.__connect_to_stream() + + if not self.__connected_ctl and self._ctl_connection_settings: + self.__connect_to_ctl() + + # read pkt + pkt = StringIOAdvance() + # headerlength 19 + header = self._file.read(19) + if not header: + break + + unpacked = struct.unpack(' Date: Tue, 11 Apr 2017 00:01:02 +0800 Subject: [PATCH 2/3] fix __checksum_enabled bug --- binlog2sql/binlogfile.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/binlog2sql/binlogfile.py b/binlog2sql/binlogfile.py index ebc6636..6a28f41 100644 --- a/binlog2sql/binlogfile.py +++ b/binlog2sql/binlogfile.py @@ -82,8 +82,6 @@ def __init__(self, file_path, ctl_connection_settings=None, resume_stream=False, self.__allowed_events_in_packet = frozenset( [TableMapEvent, RotateEvent]).union(self.__allowed_events) - self.__use_checksum = self.__checksum_enabled() - # Store table meta information self.table_map = {} self.log_pos = log_pos @@ -100,6 +98,9 @@ def __init__(self, file_path, ctl_connection_settings=None, resume_stream=False, else: self.pymysql_wrapper = pymysql.connect + # checksum with database + self.__use_checksum = self.__checksum_enabled() + def close(self): if self._file: self._file.close() @@ -119,17 +120,21 @@ def __connect_to_ctl(self): def __checksum_enabled(self): """Return True if binlog-checksum = CRC32. Only for MySQL > 5.6""" try: - self._ctl_connection.execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'") - result = self._ctl_connection.fetchone() + if not self.__connected_ctl and self._ctl_connection_settings: + self.__connect_to_ctl() - if result is None: + cur = self._ctl_connection.cursor() + cur.execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'") + _result = cur.fetchone() + cur.close() + if _result is None: return False - var, value = result[:2] + value = _result.get('Value', 'NONE') if value == 'NONE': return False return True except Exception: - return True + return False def __connect_to_stream(self): if self._file is None: From 072edfd39c587e7af32f3da554e564869fad844a Mon Sep 17 00:00:00 2001 From: xuyi Date: Tue, 11 Apr 2017 00:14:48 +0800 Subject: [PATCH 3/3] add bin2sql.py usage --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index d67c257..4f70dab 100644 --- a/README.md +++ b/README.md @@ -215,3 +215,13 @@ INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', 欢迎提问题提需求,欢迎pull requests! + +bin2sql +======================== + +Usage: + +```bash +python bin2sql.py -h 127.0.0.1 --start-datetime='2017-04-05 21:44:54' --stop-datetime='2017-04-05 21:45:00' -f mysql-bin.000001 +``` +