Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topicctl bug fix for offsets #171

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/groups/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func GetMemberLags(
GroupId: groupID,
},
)
log.Debugf("Received consumerOffsets: %+v", offsets)

if err != nil {
return nil, err
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/groups/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,13 @@ func TestGetLags(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(lags))

// We create topic with 2 partitions and send 10 messages. first, last offset for all partitions is 0
// When we consume 4 messages, 5 is the latest/last/newest offset
// We consumer 2 messages for each partition. Hence member_offset(2) <= last_offset(5)
for l, lag := range lags {
assert.Equal(t, l, lag.Partition)
assert.Equal(t, int64(4), lag.NewestOffset)
assert.LessOrEqual(t, lag.MemberOffset, int64(4))
assert.Equal(t, int64(5), lag.NewestOffset)
assert.LessOrEqual(t, lag.MemberOffset, int64(5))
}
}

Expand Down Expand Up @@ -330,10 +333,12 @@ func TestGetEarliestOrLatestOffset(t *testing.T) {

groupPartitions := groupDetails.Members[0].TopicPartitions[topicName]

// A topic with 2 partitions and produced 10 messages. first/earliest offset = 0, last/lastest offset = 5
// Consume 8 messages. first/earliest offset = 0, member offset = 4, last/lastest offset = 5
for _, partition := range groupPartitions {
offset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, partition)
require.NoError(t, err)
assert.Equal(t, int64(4), offset)
assert.Equal(t, int64(5), offset)

offset, err = GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, partition)
require.NoError(t, err)
Expand Down Expand Up @@ -390,10 +395,10 @@ func TestResetOffsets(t *testing.T) {
assert.Equal(t, int64(2), lags[0].MemberOffset)
assert.Equal(t, int64(1), lags[1].MemberOffset)

// latest offset of partition 0
// latest offset of partition 0 -> This should be 5. first offset = 0, last offset = 5 (total 10 messages)
latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, 0)
require.NoError(t, err)
// earliest offset of partition 1
// earliest offset of partition 1 -> This should be 0. first offset = 0, last offset = 5 (total 10 messages)
earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, 1)
require.NoError(t, err)

Expand All @@ -413,7 +418,10 @@ func TestResetOffsets(t *testing.T) {
require.NoError(t, err)

require.Equal(t, 2, len(lags))
assert.Equal(t, int64(4), lags[0].MemberOffset)
// partiton 0, we reset offset to latestoffset which is 5
assert.Equal(t, int64(5), lags[0].MemberOffset)

// partiton 1, we reset offset to earliestoffset which is 0
assert.Equal(t, int64(0), lags[1].MemberOffset)

}
Expand Down
29 changes: 23 additions & 6 deletions pkg/messages/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,21 @@ func GetPartitionBounds(
}

if minOffset > firstOffset {
log.Debugf(
"Moving first offset forward to match min offset (%d)",
minOffset,
)
firstOffset = minOffset
// if minOffset is equal to lastOffset
// We read message (firstMessage) from minOffset+1 Which can lead to invalid reads
// Hence, We will not move first offset to match min offset
if minOffset >= lastOffset {
log.Debugf(
"Not Moving first offset forward to match min offset (%d) since minOffset is equal to lastOffset",
minOffset,
)
} else {
log.Debugf(
"Moving first offset forward to match min offset (%d)",
minOffset,
)
firstOffset = minOffset
}
}

var firstMessage kafka.Message
Expand Down Expand Up @@ -233,11 +243,18 @@ func GetPartitionBounds(
)
}

log.Debugf(
"Final offsets for %d: %d->%d",
partition,
firstMessage.Offset,
lastMessage.Offset,
)

return Bounds{
Partition: partition,
FirstOffset: firstMessage.Offset,
FirstTime: firstMessage.Time,
LastOffset: lastMessage.Offset,
LastOffset: lastMessage.Offset + 1,
LastTime: lastMessage.Time,
}, nil
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/messages/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ func TestGetAllPartitionBounds(t *testing.T) {
bounds, err := GetAllPartitionBounds(ctx, connector, topicName, nil)
assert.NoError(t, err)

// The first partition gets 3 messages
// The first partition gets 3 messages. (i.e) earliest/first offset is 0 and latest/last is 3
assert.Equal(t, 4, len(bounds))
assert.Equal(t, 0, bounds[0].Partition)
assert.Equal(t, int64(0), bounds[0].FirstOffset)
assert.Equal(t, int64(2), bounds[0].LastOffset)
assert.Equal(t, int64(3), bounds[0].LastOffset)

// The last partition gets only 2 messages
// The last partition gets only 2 messages. (i.e) earliest/first offset is 0 and latest/last is 2
assert.Equal(t, 3, bounds[3].Partition)
assert.Equal(t, int64(0), bounds[3].FirstOffset)
assert.Equal(t, int64(1), bounds[3].LastOffset)
assert.Equal(t, int64(2), bounds[3].LastOffset)

boundsWithOffsets, err := GetAllPartitionBounds(
ctx,
Expand All @@ -87,13 +87,13 @@ func TestGetAllPartitionBounds(t *testing.T) {

assert.Equal(t, 4, len(boundsWithOffsets))

// Start of first partition is moved forward
// Start of first partition is moved forward. First partition has earliest offset is 0 and latest is 3
assert.Equal(t, 0, boundsWithOffsets[0].Partition)
assert.Equal(t, int64(1), boundsWithOffsets[0].FirstOffset)
assert.Equal(t, int64(2), boundsWithOffsets[0].LastOffset)
assert.Equal(t, int64(3), boundsWithOffsets[0].LastOffset)

// Other partition bounds are unchanged
// Other partition bounds are unchanged. Last partition has earliest offset is 0 and latest is 2
assert.Equal(t, 3, boundsWithOffsets[3].Partition)
assert.Equal(t, int64(0), boundsWithOffsets[3].FirstOffset)
assert.Equal(t, int64(1), boundsWithOffsets[3].LastOffset)
assert.Equal(t, int64(2), boundsWithOffsets[3].LastOffset)
}
Loading