Skip to content

Commit

Permalink
minio multitenancy
Browse files Browse the repository at this point in the history
  • Loading branch information
SergioLangaritaBenitez committed Jan 9, 2025
1 parent c516e14 commit 963c1b5
Show file tree
Hide file tree
Showing 6 changed files with 549 additions and 237 deletions.
233 changes: 160 additions & 73 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,8 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand

if len(service.AllowedUsers) > 0 {
// If AllowedUsers is empty don't add uid
service.Labels["uid"] = full_uid[0:8]

// If the uid of the owner is not on the allowed_users list append it
ownerOnList := false
for _, user := range service.AllowedUsers {
if user == service.Owner {
ownerOnList = true
break
}
}
if !ownerOnList {
service.AllowedUsers = append(service.AllowedUsers, uid)
}
// Check if the uid's from allowed_users have and associated MinIO user
service.Labels["uid"] = full_uid[:10]
// Check if the uid's from allowed_users have and asociated MinIO user
// and create it if not
uids := mc.CheckUsersInCache(service.AllowedUsers)
if len(uids) > 0 {
Expand All @@ -133,6 +121,39 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
mc.CreateSecretForOIDC(uid, sk)
}
}

path := strings.Trim(service.Input[0].Path, "/")
splitPath := strings.SplitN(path, "/", 2)

ownerOnList := false
// Create service bucket list if isolation_level = user
if strings.ToUpper(service.IsolationLevel) == "USER" {
var userBucket string
for _, user := range service.AllowedUsers {

// Check the uid of the owner is on the allowed_users list
if user == service.Owner {
ownerOnList = true
}
// Fill the list of private buckets to create
userBucket = splitPath[0] + "-" + user[:10]
service.BucketList = append(service.BucketList, userBucket)
}
} else {
index := 0
for !ownerOnList {
// Check the uid of the owner is on the allowed_users list
// If isolation level is not user the list may not need to be go through fully
if service.AllowedUsers[index] == service.Owner {
ownerOnList = true
}
index++
}
}
if !ownerOnList {
service.AllowedUsers = append(service.AllowedUsers, uid)
}

}
}

Expand All @@ -155,7 +176,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
}

// Create buckets/folders based on the Input and Output and enable notifications
if err := createBuckets(&service, cfg, minIOAdminClient, service.AllowedUsers, false); err != nil {
if err := createBuckets(&service, cfg, minIOAdminClient, false); err != nil {
if err == errInput {
c.String(http.StatusBadRequest, err.Error())
} else {
Expand Down Expand Up @@ -232,12 +253,16 @@ func checkValues(service *types.Service, cfg *types.Config) {
service.Token = utils.GenerateToken()
}

func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *utils.MinIOAdminClient, allowed_users []string, isUpdate bool) error {
func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *utils.MinIOAdminClient, isUpdate bool) error {
var s3Client *s3.S3
var cdmiClient *cdmi.Client
var provName, provID string

// Create private buckets

// Create shared buckets (if defined)
// Create input buckets
createLogger.Printf("Creating input buckets ..")
for _, in := range service.Input {
provID, provName = getProviderInfo(in.Provider)

Expand Down Expand Up @@ -269,10 +294,11 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
path := strings.Trim(in.Path, " /")
// Split buckets and folders from path
splitPath := strings.SplitN(path, "/", 2)
// Create bucket

_, err := s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(splitPath[0]),
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Check if the error is caused because the bucket already exists
Expand All @@ -286,13 +312,66 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
}
}

// Create generic folder(s)
var folderKey string
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
folderKey = fmt.Sprintf("%s/", splitPath[1])
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(splitPath[0]),
Key: aws.String(folderKey),
})
if err != nil {
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err)
}
// Enable MinIO notifications based on the Input []StorageIOConfig
if err := enableInputNotification(s3Client, service.GetMinIOWebhookARN(), splitPath[0], folderKey); err != nil {
return err
}
}

if strings.ToUpper(service.IsolationLevel) == "USER" && len(service.BucketList) > 0 {
for i, b := range service.BucketList {
// Create a bucket for each allowed user if allowed_users is not empty
_, err = s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(b),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Check if the error is caused because the bucket already exists
if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou {
log.Printf("The bucket \"%s\" already exists\n", b)
} else {
return fmt.Errorf("error creating bucket %s: %v", b, err)
}
} else {
return fmt.Errorf("error creating bucket %s: %v", b, err)
}
}
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(b),
Key: aws.String(folderKey),
})
if err != nil {
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, b, err)
}
if err := enableInputNotification(s3Client, service.GetMinIOWebhookARN(), b, folderKey); err != nil {
return err
}

if !isAdminUser {
minIOAdminClient.CreateAddPolicy(b, service.AllowedUsers[i], false)
}
}
}

// Create group for the service and add users
// Check if users in allowed_users have a MinIO associated user
// If new allowed users list is empty the service becomes public
// If new allowed users list is empty the service becames public
if !isUpdate {
if !isAdminUser {
if len(allowed_users) == 0 {
err = minIOAdminClient.AddServiceToAllUsersGroup(splitPath[0])
if len(service.AllowedUsers) == 0 {
err = minIOAdminClient.CreateAddPolicy(splitPath[0], ALL_USERS_GROUP, true)
if err != nil {
return fmt.Errorf("error adding service %s to all users group: %v", splitPath[0], err)
}
Expand All @@ -302,38 +381,26 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
return fmt.Errorf("error creating service group for bucket %s: %v", splitPath[0], err)
}

err = minIOAdminClient.UpdateUsersInGroup(allowed_users, splitPath[0], false)
err = minIOAdminClient.UpdateUsersInGroup(service.AllowedUsers, splitPath[0], false)
if err != nil {
return err
}
err = minIOAdminClient.CreateAddPolicy(splitPath[0], splitPath[0], true)
if err != nil {
return err
}
}
}
}
// Create folder(s)
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
folderKey := fmt.Sprintf("%s/", splitPath[1])
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(splitPath[0]),
Key: aws.String(folderKey),
})
if err != nil {
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err)
}
}

// Enable MinIO notifications based on the Input []StorageIOConfig
if err := enableInputNotification(s3Client, service.GetMinIOWebhookARN(), in); err != nil {
return err
}
}

createLogger.Printf("Creating output buckets ..")
// Create output buckets
for _, out := range service.Output {
provID, provName = getProviderInfo(out.Provider)
// Check if the provider identifier is defined in StorageProviders
if !isStorageProviderDefined(provName, provID, service.StorageProviders) {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
// TODO fix
disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID)
}

Expand All @@ -350,44 +417,63 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
s3Client = service.StorageProviders.S3[provID].GetS3Client()
}
// Create bucket
_, err := s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(splitPath[0]),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Check if the error is caused because the bucket already exists
if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou {
log.Printf("The bucket \"%s\" already exists\n", splitPath[0])
} else {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
} else {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
}
// ==== TODO check if is necessary repeat the create bucket code ====
// _, err := s3Client.CreateBucket(&s3.CreateBucketInput{
// Bucket: aws.String(splitPath[0]),
// })

// // TODO Add disable notifications in case of an error for every bucket on service.BucketList
// if err != nil {
// if aerr, ok := err.(awserr.Error); ok {
// // Check if the error is caused because the bucket already exists
// if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou {
// log.Printf("The bucket \"%s\" already exists\n", splitPath[0])
// } else {
// disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
// return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
// }
// } else {
// disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
// return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
// }
// }
// Create folder(s)
var folderKey string
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
folderKey := fmt.Sprintf("%s/", splitPath[1])
folderKey = fmt.Sprintf("%s/", splitPath[1])
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(splitPath[0]),
Key: aws.String(folderKey),
})
if err != nil {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), splitPath[0])
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err)
}
}

if service.IsolationLevel == "USER" && len(service.BucketList) > 0 {
for _, b := range service.BucketList {
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(b),
Key: aws.String(folderKey),
})
if err != nil {
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, b, err)
}

}
}

case types.OnedataName:
cdmiClient = service.StorageProviders.Onedata[provID].GetCDMIClient()
err := cdmiClient.CreateContainer(fmt.Sprintf("%s/%s", service.StorageProviders.Onedata[provID].Space, path), true)
if err != nil {
if err == cdmi.ErrBadRequest {
log.Printf("Error creating \"%s\" folder in Onedata. Error: %v\n", path, err)
} else {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
// TODO fix
disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
return fmt.Errorf("error connecting to Onedata's Oneprovider \"%s\". Error: %v", service.StorageProviders.Onedata[provID].OneproviderHost, err)
}
}
Expand Down Expand Up @@ -431,7 +517,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
}
if !isUpdate {
if !isAdminUser {
if len(allowed_users) == 0 {
if len(service.AllowedUsers) == 0 {
err = minIOAdminClient.AddServiceToAllUsersGroup(splitPath[0])
if err != nil {
return fmt.Errorf("error adding service %s to all users group: %v", splitPath[0], err)
Expand All @@ -442,7 +528,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
return fmt.Errorf("error creating service group for bucket %s: %v", splitPath[0], err)
}

err = minIOAdminClient.UpdateUsersInGroup(allowed_users, splitPath[0], false)
err = minIOAdminClient.UpdateUsersInGroup(service.AllowedUsers, splitPath[0], false)
if err != nil {
return err
}
Expand Down Expand Up @@ -530,32 +616,32 @@ func registerMinIOWebhook(name string, token string, minIO *types.MinIOProvider,
return minIOAdminClient.RestartServer()
}

func enableInputNotification(minIOClient *s3.S3, arnStr string, input types.StorageIOConfig) error {
path := strings.Trim(input.Path, " /")
// Split buckets and folders from path
splitPath := strings.SplitN(path, "/", 2)

// TODO pass the user UID string
func enableInputNotification(minIOClient *s3.S3, arnStr string, bucket string, path string) error {
// path := strings.Trim(input.Path, " /")
// // Split buckets and folders from path
// splitPath := strings.SplitN(path, "/", 2)
// Get current BucketNotificationConfiguration
gbncRequest := &s3.GetBucketNotificationConfigurationRequest{
Bucket: aws.String(splitPath[0]),
Bucket: aws.String(bucket),
}
nCfg, err := minIOClient.GetBucketNotificationConfiguration(gbncRequest)
if err != nil {
return fmt.Errorf("error getting bucket \"%s\" notifications: %v", splitPath[0], err)
return fmt.Errorf("error getting bucket \"%s\" notifications: %v", bucket, err)
}
queueConfiguration := s3.QueueConfiguration{
QueueArn: aws.String(arnStr),
Events: []*string{aws.String(s3.EventS3ObjectCreated)},
}

// Add folder filter if required
if len(splitPath) == 2 {
if path != "" {
queueConfiguration.Filter = &s3.NotificationConfigurationFilter{
Key: &s3.KeyFilter{
FilterRules: []*s3.FilterRule{
{
Name: aws.String(s3.FilterRuleNamePrefix),
Value: aws.String(fmt.Sprintf("%s/", splitPath[1])),
Value: aws.String(path),
},
},
},
Expand All @@ -565,13 +651,14 @@ func enableInputNotification(minIOClient *s3.S3, arnStr string, input types.Stor
// Append the new queueConfiguration
nCfg.QueueConfigurations = append(nCfg.QueueConfigurations, &queueConfiguration)
pbncInput := &s3.PutBucketNotificationConfigurationInput{
Bucket: aws.String(splitPath[0]),
Bucket: aws.String(bucket),
NotificationConfiguration: nCfg,
}

// Enable the notification
_, err = minIOClient.PutBucketNotificationConfiguration(pbncInput)
if err != nil {

if err != nil && !strings.Contains(err.Error(), "An object key name filtering rule defined with overlapping prefixes") {
return fmt.Errorf("error enabling bucket notification: %v", err)
}

Expand Down
Loading

0 comments on commit 963c1b5

Please sign in to comment.