Skip to content

Commit

Permalink
add validation and set defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
petedannemann committed Nov 16, 2023
1 parent 32a9490 commit 39349ad
Show file tree
Hide file tree
Showing 5 changed files with 383 additions and 14 deletions.
46 changes: 44 additions & 2 deletions pkg/config/acl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package config

import (
"errors"

"github.com/hashicorp/go-multierror"
"github.com/segmentio/kafka-go"
)

Expand Down Expand Up @@ -46,6 +49,45 @@ func (a ACLConfig) ToNewACLEntries() []kafka.ACLEntry {
return acls
}

// TODO: Set defaults for missing values
// SetDeaults sets the default host and permission for each ACL in an ACL config
// if these aren't set
func (a *ACLConfig) SetDefaults() {
for i, acl := range a.Spec.ACLs {
if acl.Resource.Host == "" {
a.Spec.ACLs[i].Resource.Host = "*"
}
if acl.Resource.Permission == kafka.ACLPermissionTypeUnknown {
a.Spec.ACLs[i].Resource.Permission = kafka.ACLPermissionTypeAllow
}
}
}

// Validate evaluates whether the ACL config is valid.
func (a *ACLConfig) Validate() error {
var err error

err = a.Meta.Validate()

for _, acl := range a.Spec.ACLs {
if acl.Resource.Type == kafka.ResourceTypeUnknown {
err = multierror.Append(err, errors.New("ACL resource type cannot be unknown"))
}
if acl.Resource.Name == "" {
err = multierror.Append(err, errors.New("ACL resource name cannot be empty"))
}
if acl.Resource.PatternType == kafka.PatternTypeUnknown {
err = multierror.Append(err, errors.New("ACL resource pattern type cannot be unknown"))
}
if acl.Resource.Principal == "" {
err = multierror.Append(err, errors.New("ACL resource principal cannot be empty"))
}

for _, operation := range acl.Operations {
if operation == kafka.ACLOperationTypeUnknown {
err = multierror.Append(err, errors.New("ACL operation cannot be unknown"))
}
}
}

// TODO: Validate fields cannot be empty
return err
}
267 changes: 267 additions & 0 deletions pkg/config/acl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package config

import (
"testing"

"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

func TestACLValidate(t *testing.T) {
type testCase struct {
description string
aclConfig ACLConfig
expError bool
}

testCases := []testCase{
{
description: "valid ACL config",
aclConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeTopic,
Name: "test-topic",
PatternType: kafka.PatternTypeLiteral,
Principal: "User:Alice",
Host: "*",
Permission: kafka.ACLPermissionTypeAllow,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeRead,
},
},
},
},
},
expError: false,
},
{
description: "unknown resource type",
aclConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeUnknown,
Name: "test-topic",
PatternType: kafka.PatternTypeLiteral,
Principal: "User:Alice",
Host: "*",
Permission: kafka.ACLPermissionTypeAllow,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeRead,
},
},
},
},
},
expError: true,
},
{
description: "empty resource name",
aclConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeTopic,
PatternType: kafka.PatternTypeLiteral,
Principal: "User:Alice",
Host: "*",
Permission: kafka.ACLPermissionTypeAllow,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeRead,
},
},
},
},
},
expError: true,
},
{
description: "unknown resource pattern type",
aclConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeTopic,
PatternType: kafka.PatternTypeUnknown,
Principal: "User:Alice",
Host: "*",
Permission: kafka.ACLPermissionTypeAllow,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeRead,
},
},
},
},
},
expError: true,
},
{
description: "empty principal",
aclConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeTopic,
PatternType: kafka.PatternTypeUnknown,
Principal: "",
Host: "*",
Permission: kafka.ACLPermissionTypeAllow,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeRead,
},
},
},
},
},
expError: true,
},
{
description: "unknown ACL operation type",
aclConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeTopic,
PatternType: kafka.PatternTypeUnknown,
Principal: "",
Host: "*",
Permission: kafka.ACLPermissionTypeAllow,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeUnknown,
},
},
},
},
},
expError: true,
},
}

for _, testCase := range testCases {
testCase.aclConfig.SetDefaults()
err := testCase.aclConfig.Validate()
if testCase.expError {
assert.Error(t, err, testCase.description)
} else {
assert.NoError(t, err, testCase.description)
}
}
}

func TestACLSetDefaults(t *testing.T) {
type testCase struct {
description string
aclConfig ACLConfig
expConfig ACLConfig
}

testCases := []testCase{
{
description: "set defaults",
aclConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeTopic,
Name: "test-topic",
PatternType: kafka.PatternTypeLiteral,
Principal: "User:Alice",
Permission: kafka.ACLPermissionTypeUnknown,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeRead,
},
},
},
},
},
expConfig: ACLConfig{
Meta: ResourceMeta{
Name: "acl-test",
Cluster: "test-cluster",
Region: "test-region",
Environment: "test-env",
},
Spec: ACLSpec{
ACLs: []ACL{
{
Resource: ACLResource{
Type: kafka.ResourceTypeTopic,
Name: "test-topic",
PatternType: kafka.PatternTypeLiteral,
Principal: "User:Alice",
Host: "*",
Permission: kafka.ACLPermissionTypeAllow,
},
Operations: []kafka.ACLOperationType{
kafka.ACLOperationTypeRead,
},
},
},
},
},
},
}

for _, testCase := range testCases {
testCase.aclConfig.SetDefaults()
assert.Equal(t, testCase.expConfig, testCase.aclConfig, testCase.description)
}
}
24 changes: 24 additions & 0 deletions pkg/config/meta.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package config

import (
"errors"

"github.com/hashicorp/go-multierror"
)

// ResourceMeta stores the (mostly immutable) metadata associated with a resource.
// Inspired by the meta structs in Kubernetes objects.
type ResourceMeta struct {
Expand All @@ -14,3 +20,21 @@ type ResourceMeta struct {
// topic.
Consumers []string `json:"consumers,omitempty"`
}

// Validate evalutes whether the ResourceMeta is valid.
func (rm *ResourceMeta) Validate() error {
var err error
if rm.Name == "" {
err = multierror.Append(err, errors.New("Name must be set"))
}
if rm.Cluster == "" {
err = multierror.Append(err, errors.New("Cluster must be set"))
}
if rm.Region == "" {
err = multierror.Append(err, errors.New("Region must be set"))
}
if rm.Environment == "" {
err = multierror.Append(err, errors.New("Environment must be set"))
}
return err
}
Loading

0 comments on commit 39349ad

Please sign in to comment.