Skip to content

Commit

Permalink
Merge pull request #83 from guybe7/rdb_v8
Browse files Browse the repository at this point in the history
RDB v8 support
  • Loading branch information
oranagra authored Feb 21, 2017
2 parents 4a74574 + 3a01ef7 commit 41d9cda
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 38 deletions.
37 changes: 15 additions & 22 deletions rdbtools/memprofiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ def end_rdb(self):

def set(self, key, value, expiry, info):
self._current_encoding = info['encoding']
size = self.sizeof_string(key) + self.sizeof_string(value) + self.top_level_object_overhead()
size += 2*self.robj_overhead()
size += self.key_expiry_overhead(expiry)
size = self.top_level_object_overhead(key, expiry) + self.sizeof_string(value)

length = element_length(value)
self.emit_record("string", key, size, self._current_encoding, length, length)
Expand All @@ -193,10 +191,7 @@ def set(self, key, value, expiry, info):
def start_hash(self, key, length, expiry, info):
self._current_encoding = info['encoding']
self._current_length = length
size = self.sizeof_string(key)
size += 2*self.robj_overhead()
size += self.top_level_object_overhead()
size += self.key_expiry_overhead(expiry)
size = self.top_level_object_overhead(key, expiry)

if 'sizeof_value' in info:
size += info['sizeof_value']
Expand All @@ -216,7 +211,8 @@ def hset(self, key, field, value):
self._current_size += self.sizeof_string(field)
self._current_size += self.sizeof_string(value)
self._current_size += self.hashtable_entry_overhead()
self._current_size += 2*self.robj_overhead()
if self._redis_version < StrictVersion('4.0'):
self._current_size += 2*self.robj_overhead()

def end_hash(self, key):
self.emit_record("hash", key, self._current_size, self._current_encoding, self._current_length,
Expand All @@ -234,7 +230,8 @@ def sadd(self, key, member):
if self._current_encoding == 'hashtable':
self._current_size += self.sizeof_string(member)
self._current_size += self.hashtable_entry_overhead()
self._current_size += self.robj_overhead()
if self._redis_version < StrictVersion('4.0'):
self._current_size += self.robj_overhead()

def end_set(self, key):
self.emit_record("set", key, self._current_size, self._current_encoding, self._current_length,
Expand All @@ -246,10 +243,7 @@ def start_list(self, key, expiry, info):
self._list_items_size = 0
self._list_items_zipped_size = 0
self._current_encoding = info['encoding']
size = self.sizeof_string(key)
size += 2*self.robj_overhead()
size += self.top_level_object_overhead()
size += self.key_expiry_overhead(expiry)
size = self.top_level_object_overhead(key, expiry)

# ignore the encoding in the rdb, and predict the encoding that will be used at the target redis version
if self._redis_version >= StrictVersion('3.2'):
Expand Down Expand Up @@ -298,7 +292,8 @@ def end_list(self, key, info):
else: # linkedlist
self._current_size += self.linkedlist_entry_overhead() * self._current_length
self._current_size += self.linkedlist_overhead()
self._current_size += self.robj_overhead() * self._current_length
if self._redis_version < StrictVersion('4.0'):
self._current_size += self.robj_overhead() * self._current_length
self._current_size += self._list_items_size
self.emit_record("list", key, self._current_size, self._current_encoding, self._current_length,
self._len_largest_element)
Expand All @@ -307,10 +302,7 @@ def end_list(self, key, info):
def start_sorted_set(self, key, length, expiry, info):
self._current_length = length
self._current_encoding = info['encoding']
size = self.sizeof_string(key)
size += 2*self.robj_overhead()
size += self.top_level_object_overhead()
size += self.key_expiry_overhead(expiry)
size = self.top_level_object_overhead(key, expiry)

if 'sizeof_value' in info:
size += info['sizeof_value']
Expand All @@ -325,9 +317,10 @@ def zadd(self, key, score, member):
self._len_largest_element = element_length(member)

if self._current_encoding == 'skiplist':
self._current_size += 8 # self.sizeof_string(score)
self._current_size += 8 # score (double)
self._current_size += self.sizeof_string(member)
self._current_size += 2*self.robj_overhead()
if self._redis_version < StrictVersion('4.0'):
self._current_size += self.robj_overhead()
self._current_size += self.skiplist_entry_overhead()

def end_sorted_set(self, key):
Expand Down Expand Up @@ -364,10 +357,10 @@ def sizeof_string(self, string):
return self.malloc_overhead(l + 1 + 8 + 1)
return self.malloc_overhead(l + 1 + 16 + 1)

def top_level_object_overhead(self):
def top_level_object_overhead(self, key, expiry):
# Each top level object is an entry in a dictionary, and so we have to include
# the overhead of a dictionary entry
return self.hashtable_entry_overhead()
return self.hashtable_entry_overhead() + self.sizeof_string(key) + self.robj_overhead() + self.key_expiry_overhead(expiry)

def key_expiry_overhead(self, expiry):
# If there is no expiry, there isn't any overhead
Expand Down
63 changes: 47 additions & 16 deletions rdbtools/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

REDIS_RDB_6BITLEN = 0
REDIS_RDB_14BITLEN = 1
REDIS_RDB_32BITLEN = 2
REDIS_RDB_32BITLEN = 0x80
REDIS_RDB_64BITLEN = 0x81
REDIS_RDB_ENCVAL = 3

REDIS_RDB_OPCODE_AUX = 250
Expand All @@ -38,6 +39,8 @@
REDIS_RDB_TYPE_SET = 2
REDIS_RDB_TYPE_ZSET = 3
REDIS_RDB_TYPE_HASH = 4
REDIS_RDB_TYPE_ZSET_2 = 5 # ZSET version 2 with doubles stored in binary.
REDIS_RDB_TYPE_MODULE = 6
REDIS_RDB_TYPE_HASH_ZIPMAP = 9
REDIS_RDB_TYPE_LIST_ZIPLIST = 10
REDIS_RDB_TYPE_SET_INTSET = 11
Expand All @@ -51,7 +54,7 @@
REDIS_RDB_ENC_LZF = 3

DATA_TYPE_MAPPING = {
0 : "string", 1 : "list", 2 : "set", 3 : "sortedset", 4 : "hash",
0 : "string", 1 : "list", 2 : "set", 3 : "sortedset", 4 : "hash", 5 : "sortedset", 6 : "module",
9 : "hash", 10 : "list", 11 : "set", 12 : "sortedset", 13 : "hash", 14 : "list"}

class RdbCallback(object):
Expand Down Expand Up @@ -393,8 +396,12 @@ def read_length_with_encoding(self, f) :
elif enc_type == REDIS_RDB_14BITLEN :
bytes.append(read_unsigned_char(f))
length = ((bytes[0]&0x3F)<<8)|bytes[1]
else :
elif bytes[0] == REDIS_RDB_32BITLEN:
length = ntohl(f)
elif bytes[0] == REDIS_RDB_64BITLEN:
length = ntohu64(f)
else:
raise Exception('read_length_with_encoding', "Invalid string encoding %s (encoding byte 0x%X)" % (enc_type, bytes[0]))
return (length, is_encoded)

def read_length(self, f) :
Expand Down Expand Up @@ -463,12 +470,12 @@ def read_object(self, f, enc_type) :
val = self.read_string(f)
self._callback.sadd(self._key, val)
self._callback.end_set(self._key)
elif enc_type == REDIS_RDB_TYPE_ZSET :
elif enc_type == REDIS_RDB_TYPE_ZSET or enc_type == REDIS_RDB_TYPE_ZSET_2 :
length = self.read_length(f)
self._callback.start_sorted_set(self._key, length, self._expiry, info={'encoding':'skiplist'})
for count in range(0, length) :
val = self.read_string(f)
score = self.read_float(f)
score = read_binary_double(f) if enc_type == REDIS_RDB_TYPE_ZSET_2 else self.read_float(f)
self._callback.zadd(self._key, score, val)
self._callback.end_sorted_set(self._key)
elif enc_type == REDIS_RDB_TYPE_HASH :
Expand All @@ -491,6 +498,8 @@ def read_object(self, f, enc_type) :
self.read_hash_from_ziplist(f)
elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST:
self.read_list_from_quicklist(f)
elif enc_type == REDIS_RDB_TYPE_MODULE :
raise Exception('read_object', 'Unable to read Redis Modules RDB objects (key %s)' % (enc_type, self._key))
else :
raise Exception('read_object', 'Invalid object type %d for key %s' % (enc_type, self._key))

Expand Down Expand Up @@ -518,6 +527,14 @@ def skip_string(self, f):
bytes_to_skip = length

skip(f, bytes_to_skip)

def skip_float(self, f):
dbl_length = read_unsigned_char(f)
if dbl_length < 253:
skip(f, dbl_length)

def skip_binary_double(self, f):
skip(f, 8)

def skip_object(self, f, enc_type):
skip_strings = 0
Expand All @@ -527,8 +544,11 @@ def skip_object(self, f, enc_type):
skip_strings = self.read_length(f)
elif enc_type == REDIS_RDB_TYPE_SET :
skip_strings = self.read_length(f)
elif enc_type == REDIS_RDB_TYPE_ZSET :
skip_strings = self.read_length(f) * 2
elif enc_type == REDIS_RDB_TYPE_ZSET or enc_type == REDIS_RDB_TYPE_ZSET_2 :
length = self.read_length(f)
for x in range(length):
skip_string(f)
skip_binary_double(f) if enc_type == REDIS_RDB_TYPE_ZSET_2 else skip_float(f)
elif enc_type == REDIS_RDB_TYPE_HASH :
skip_strings = self.read_length(f) * 2
elif enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP :
Expand All @@ -543,6 +563,8 @@ def skip_object(self, f, enc_type):
skip_strings = 1
elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST:
skip_strings = self.read_length(f)
elif enc_type == REDIS_RDB_TYPE_MODULE:
raise Exception('skip_object', 'Unable to skip Redis Modules RDB objects (key %s)' % (enc_type, self._key))
else :
raise Exception('skip_object', 'Invalid object type %d for key %s' % (enc_type, self._key))
for x in range(0, skip_strings):
Expand Down Expand Up @@ -712,7 +734,7 @@ def verify_magic_string(self, magic_string) :

def verify_version(self, version_str) :
version = int(version_str)
if version < 1 or version > 7:
if version < 1 or version > 8:
raise Exception('verify_version', 'Invalid RDB version number %d' % version)
self._rdb_version = version

Expand Down Expand Up @@ -741,7 +763,7 @@ def init_filter(self, filters):
self._filters['not_keys'] = str2regexp(filters['not_keys'])

if not 'types' in filters:
self._filters['types'] = ('set', 'hash', 'sortedset', 'string', 'list')
self._filters['types'] = ('set', 'hash', 'sortedset', 'module', 'string', 'list')
elif isinstance(filters['types'], bytes):
self._filters['types'] = (filters['types'], )
elif isinstance(filters['types'], list):
Expand Down Expand Up @@ -813,14 +835,20 @@ def skip(f, free):
if free :
f.read(free)

def memrev(arr):
l = len(arr)
new_arr = bytearray(l)
for i in range(l):
new_arr[-i-1] = arr[i]
return str(new_arr)

def ntohl(f) :
val = read_unsigned_int(f)
new_val = 0
new_val = new_val | ((val & 0x000000ff) << 24)
new_val = new_val | ((val & 0xff000000) >> 24)
new_val = new_val | ((val & 0x0000ff00) << 8)
new_val = new_val | ((val & 0x00ff0000) >> 8)
return new_val
val = memrev(f.read(4))
return struct.unpack('I', val)[0]

def ntohu64(f) :
val = memrev(f.read(8))
return struct.unpack('Q', val)[0]

def to_datetime(usecs_since_epoch):
seconds_since_epoch = usecs_since_epoch // 1000000
Expand Down Expand Up @@ -862,6 +890,9 @@ def read_signed_long(f) :

def read_unsigned_long(f) :
return struct.unpack('Q', f.read(8))[0]

def read_binary_double(f) :
return struct.unpack('d', f.read(8))[0]

def string_as_hexcode(string) :
for s in string :
Expand Down
Binary file not shown.
7 changes: 7 additions & 0 deletions tests/parser_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ def test_rdb_version_5_with_checksum(self):
self.assertEquals(r.databases[0][b'abcdef'], b'abcdef')
self.assertEquals(r.databases[0][b'longerstring'], b'thisisalongerstring.idontknowwhatitmeans')

def test_rdb_version_8_with_64b_length_and_scores(self):
r = load_rdb('rdb_version_8_with_64b_length_and_scores.rdb')
self.assertEquals(r.databases[0][b'foo'], b'bar')
zset = r.databases[0][b"bigset"]
self.assertEquals(len(zset), 1000)
self.assert_(floateq(zset[b'finalfield'], 2.718))

def test_multiple_databases_stream(self):
r = load_rdb_stream('multiple_databases.rdb')
self.assert_(len(r.databases), 2)
Expand Down

0 comments on commit 41d9cda

Please sign in to comment.