diff --git a/pkg/groups/groups.go b/pkg/groups/groups.go index b3b78439..6874cca2 100644 --- a/pkg/groups/groups.go +++ b/pkg/groups/groups.go @@ -160,6 +160,8 @@ func GetMemberLags( GroupId: groupID, }, ) + log.Debugf("Received consumerOffsets: %+v", offsets) + if err != nil { return nil, err } diff --git a/pkg/groups/groups_test.go b/pkg/groups/groups_test.go index a9198009..73780643 100644 --- a/pkg/groups/groups_test.go +++ b/pkg/groups/groups_test.go @@ -289,10 +289,13 @@ func TestGetLags(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, len(lags)) + // We create topic with 2 partitions and send 10 messages. first, last offset for all partitions is 0 + // When we consume 4 messages, 5 is the latest/last/newest offset + // We consume 2 messages for each partition. Hence member_offset(2) <= last_offset(5) for l, lag := range lags { assert.Equal(t, l, lag.Partition) - assert.Equal(t, int64(4), lag.NewestOffset) - assert.LessOrEqual(t, lag.MemberOffset, int64(4)) + assert.Equal(t, int64(5), lag.NewestOffset) + assert.LessOrEqual(t, lag.MemberOffset, int64(5)) } } @@ -330,10 +333,12 @@ func TestGetEarliestOrLatestOffset(t *testing.T) { groupPartitions := groupDetails.Members[0].TopicPartitions[topicName] + // A topic with 2 partitions and produced 10 messages. first/earliest offset = 0, last/lastest offset = 5 + // Consume 8 messages. first/earliest offset = 0, member offset = 4, last/lastest offset = 5 for _, partition := range groupPartitions { offset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, partition) require.NoError(t, err) - assert.Equal(t, int64(4), offset) + assert.Equal(t, int64(5), offset) offset, err = GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, partition) require.NoError(t, err) @@ -390,10 +395,10 @@ func TestResetOffsets(t *testing.T) { assert.Equal(t, int64(2), lags[0].MemberOffset) assert.Equal(t, int64(1), lags[1].MemberOffset) - // latest offset of partition 0 + // latest offset of partition 0 -> This should be 5. first offset = 0, last offset = 5 (total 10 messages) latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, 0) require.NoError(t, err) - // earliest offset of partition 1 + // earliest offset of partition 1 -> This should be 0. first offset = 0, last offset = 5 (total 10 messages) earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, 1) require.NoError(t, err) @@ -413,7 +418,10 @@ func TestResetOffsets(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, len(lags)) - assert.Equal(t, int64(4), lags[0].MemberOffset) + // partiton 0, we reset offset to latestoffset which is 5 + assert.Equal(t, int64(5), lags[0].MemberOffset) + + // partiton 1, we reset offset to earliestoffset which is 0 assert.Equal(t, int64(0), lags[1].MemberOffset) } diff --git a/pkg/messages/bounds.go b/pkg/messages/bounds.go index 8ad973f6..45c1a9e4 100644 --- a/pkg/messages/bounds.go +++ b/pkg/messages/bounds.go @@ -174,7 +174,10 @@ func GetPartitionBounds( }, nil } - if minOffset > firstOffset { + // if minOffset is equal to lastOffset + // We read message (firstMessage) from minOffset+1 Which can lead to invalid reads + // Hence, We will not move first offset to match min offset if minOffset >= lastOffset + if minOffset > firstOffset && minOffset < lastOffset { log.Debugf( "Moving first offset forward to match min offset (%d)", minOffset, @@ -233,11 +236,18 @@ func GetPartitionBounds( ) } + log.Debugf( + "Final offsets for %d: %d->%d", + partition, + firstMessage.Offset, + lastMessage.Offset, + ) + return Bounds{ Partition: partition, FirstOffset: firstMessage.Offset, FirstTime: firstMessage.Time, - LastOffset: lastMessage.Offset, + LastOffset: lastMessage.Offset + 1, LastTime: lastMessage.Time, }, nil } diff --git a/pkg/messages/bounds_test.go b/pkg/messages/bounds_test.go index 38bfeac3..a1dfe5c1 100644 --- a/pkg/messages/bounds_test.go +++ b/pkg/messages/bounds_test.go @@ -64,16 +64,16 @@ func TestGetAllPartitionBounds(t *testing.T) { bounds, err := GetAllPartitionBounds(ctx, connector, topicName, nil) assert.NoError(t, err) - // The first partition gets 3 messages + // The first partition gets 3 messages. (i.e) earliest/first offset is 0 and latest/last is 3 assert.Equal(t, 4, len(bounds)) assert.Equal(t, 0, bounds[0].Partition) assert.Equal(t, int64(0), bounds[0].FirstOffset) - assert.Equal(t, int64(2), bounds[0].LastOffset) + assert.Equal(t, int64(3), bounds[0].LastOffset) - // The last partition gets only 2 messages + // The last partition gets only 2 messages. (i.e) earliest/first offset is 0 and latest/last is 2 assert.Equal(t, 3, bounds[3].Partition) assert.Equal(t, int64(0), bounds[3].FirstOffset) - assert.Equal(t, int64(1), bounds[3].LastOffset) + assert.Equal(t, int64(2), bounds[3].LastOffset) boundsWithOffsets, err := GetAllPartitionBounds( ctx, @@ -87,13 +87,13 @@ func TestGetAllPartitionBounds(t *testing.T) { assert.Equal(t, 4, len(boundsWithOffsets)) - // Start of first partition is moved forward + // Start of first partition is moved forward. First partition has earliest offset is 0 and latest is 3 assert.Equal(t, 0, boundsWithOffsets[0].Partition) assert.Equal(t, int64(1), boundsWithOffsets[0].FirstOffset) - assert.Equal(t, int64(2), boundsWithOffsets[0].LastOffset) + assert.Equal(t, int64(3), boundsWithOffsets[0].LastOffset) - // Other partition bounds are unchanged + // Other partition bounds are unchanged. Last partition has earliest offset is 0 and latest is 2 assert.Equal(t, 3, boundsWithOffsets[3].Partition) assert.Equal(t, int64(0), boundsWithOffsets[3].FirstOffset) - assert.Equal(t, int64(1), boundsWithOffsets[3].LastOffset) + assert.Equal(t, int64(2), boundsWithOffsets[3].LastOffset) }