Skip to content

Commit

Permalink
Bug fixes and test case fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
ssingudasu committed Jan 31, 2024
1 parent c425205 commit a82e229
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
2 changes: 2 additions & 0 deletions pkg/groups/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func GetMemberLags(
GroupId: groupID,
},
)
log.Debugf("Received consumerOffsets: %+v", offsets)

if err != nil {
return nil, err
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/groups/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,13 @@ func TestGetLags(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(lags))

// We create topic with 2 partitions and send 10 messages. first, last offset for all partitions is 0
// When we consume 4 messages, 5 is the latest/last/newest offset
// We consumer 2 messages for each partition. Hence member_offset(2) <= last_offset(5)
for l, lag := range lags {
assert.Equal(t, l, lag.Partition)
assert.Equal(t, int64(4), lag.NewestOffset)
assert.LessOrEqual(t, lag.MemberOffset, int64(4))
assert.Equal(t, int64(5), lag.NewestOffset)
assert.LessOrEqual(t, lag.MemberOffset, int64(5))
}
}

Expand Down Expand Up @@ -330,10 +333,12 @@ func TestGetEarliestOrLatestOffset(t *testing.T) {

groupPartitions := groupDetails.Members[0].TopicPartitions[topicName]

// A topic with 2 partitions and produced 10 messages. first/earliest offset = 0, last/lastest offset = 5
// Consume 8 messages. first/earliest offset = 0, member offset = 4, last/lastest offset = 5
for _, partition := range groupPartitions {
offset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, partition)
require.NoError(t, err)
assert.Equal(t, int64(4), offset)
assert.Equal(t, int64(5), offset)

offset, err = GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, partition)
require.NoError(t, err)
Expand Down Expand Up @@ -390,10 +395,10 @@ func TestResetOffsets(t *testing.T) {
assert.Equal(t, int64(2), lags[0].MemberOffset)
assert.Equal(t, int64(1), lags[1].MemberOffset)

// latest offset of partition 0
// latest offset of partition 0 -> This should be 5. first offset = 0, last offset = 5 (total 10 messages)
latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, 0)
require.NoError(t, err)
// earliest offset of partition 1
// earliest offset of partition 1 -> This should be 0. first offset = 0, last offset = 5 (total 10 messages)
earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, 1)
require.NoError(t, err)

Expand All @@ -413,7 +418,10 @@ func TestResetOffsets(t *testing.T) {
require.NoError(t, err)

require.Equal(t, 2, len(lags))
assert.Equal(t, int64(4), lags[0].MemberOffset)
// partiton 0, we reset offset to latestoffset which is 5
assert.Equal(t, int64(5), lags[0].MemberOffset)

// partiton 1, we reset offset to earliestoffset which is 0
assert.Equal(t, int64(0), lags[1].MemberOffset)

}
Expand Down
20 changes: 15 additions & 5 deletions pkg/messages/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,21 @@ func GetPartitionBounds(
}

if minOffset > firstOffset {
log.Debugf(
"Moving first offset forward to match min offset (%d)",
minOffset,
)
firstOffset = minOffset - 1
// if minOffset is equal to lastOffset
// We read message (firstMessage) from minOffset+1 Which can lead to invalid reads
// Hence, We will not move first offset to match min offset
if minOffset >= lastOffset {
log.Debugf(
"Not Moving first offset forward to match min offset (%d) since minOffset is equal to lastOffset",
minOffset,
)
} else {
log.Debugf(
"Moving first offset forward to match min offset (%d)",
minOffset,
)
firstOffset = minOffset
}
}

var firstMessage kafka.Message
Expand Down
16 changes: 8 additions & 8 deletions pkg/messages/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ func TestGetAllPartitionBounds(t *testing.T) {
bounds, err := GetAllPartitionBounds(ctx, connector, topicName, nil)
assert.NoError(t, err)

// The first partition gets 3 messages
// The first partition gets 3 messages. (i.e) earliest/first offset is 0 and latest/last is 3
assert.Equal(t, 4, len(bounds))
assert.Equal(t, 0, bounds[0].Partition)
assert.Equal(t, int64(0), bounds[0].FirstOffset)
assert.Equal(t, int64(2), bounds[0].LastOffset)
assert.Equal(t, int64(3), bounds[0].LastOffset)

// The last partition gets only 2 messages
// The last partition gets only 2 messages. (i.e) earliest/first offset is 0 and latest/last is 2
assert.Equal(t, 3, bounds[3].Partition)
assert.Equal(t, int64(0), bounds[3].FirstOffset)
assert.Equal(t, int64(1), bounds[3].LastOffset)
assert.Equal(t, int64(2), bounds[3].LastOffset)

boundsWithOffsets, err := GetAllPartitionBounds(
ctx,
Expand All @@ -87,13 +87,13 @@ func TestGetAllPartitionBounds(t *testing.T) {

assert.Equal(t, 4, len(boundsWithOffsets))

// Start of first partition is moved forward
// Start of first partition is moved forward. First partition has earliest offset is 0 and latest is 3
assert.Equal(t, 0, boundsWithOffsets[0].Partition)
assert.Equal(t, int64(1), boundsWithOffsets[0].FirstOffset)
assert.Equal(t, int64(2), boundsWithOffsets[0].LastOffset)
assert.Equal(t, int64(3), boundsWithOffsets[0].LastOffset)

// Other partition bounds are unchanged
// Other partition bounds are unchanged. Last partition has earliest offset is 0 and latest is 2
assert.Equal(t, 3, boundsWithOffsets[3].Partition)
assert.Equal(t, int64(0), boundsWithOffsets[3].FirstOffset)
assert.Equal(t, int64(1), boundsWithOffsets[3].LastOffset)
assert.Equal(t, int64(2), boundsWithOffsets[3].LastOffset)
}

0 comments on commit a82e229

Please sign in to comment.