diff --git a/pkg/handlers/create.go b/pkg/handlers/create.go index 5a5ad785..243d0353 100644 --- a/pkg/handlers/create.go +++ b/pkg/handlers/create.go @@ -110,20 +110,8 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand if len(service.AllowedUsers) > 0 { // If AllowedUsers is empty don't add uid - service.Labels["uid"] = full_uid[0:8] - - // If the uid of the owner is not on the allowed_users list append it - ownerOnList := false - for _, user := range service.AllowedUsers { - if user == service.Owner { - ownerOnList = true - break - } - } - if !ownerOnList { - service.AllowedUsers = append(service.AllowedUsers, uid) - } - // Check if the uid's from allowed_users have and associated MinIO user + service.Labels["uid"] = full_uid[:10] + // Check if the uid's from allowed_users have and asociated MinIO user // and create it if not uids := mc.CheckUsersInCache(service.AllowedUsers) if len(uids) > 0 { @@ -133,6 +121,39 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand mc.CreateSecretForOIDC(uid, sk) } } + + path := strings.Trim(service.Input[0].Path, "/") + splitPath := strings.SplitN(path, "/", 2) + + ownerOnList := false + // Create service bucket list if isolation_level = user + if strings.ToUpper(service.IsolationLevel) == "USER" { + var userBucket string + for _, user := range service.AllowedUsers { + + // Check the uid of the owner is on the allowed_users list + if user == service.Owner { + ownerOnList = true + } + // Fill the list of private buckets to create + userBucket = splitPath[0] + "-" + user[:10] + service.BucketList = append(service.BucketList, userBucket) + } + } else { + index := 0 + for !ownerOnList { + // Check the uid of the owner is on the allowed_users list + // If isolation level is not user the list may not need to be go through fully + if service.AllowedUsers[index] == service.Owner { + ownerOnList = true + } + index++ + } + } + if !ownerOnList { + service.AllowedUsers = append(service.AllowedUsers, uid) + } + } } @@ -155,7 +176,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand } // Create buckets/folders based on the Input and Output and enable notifications - if err := createBuckets(&service, cfg, minIOAdminClient, service.AllowedUsers, false); err != nil { + if err := createBuckets(&service, cfg, minIOAdminClient, false); err != nil { if err == errInput { c.String(http.StatusBadRequest, err.Error()) } else { @@ -232,12 +253,16 @@ func checkValues(service *types.Service, cfg *types.Config) { service.Token = utils.GenerateToken() } -func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *utils.MinIOAdminClient, allowed_users []string, isUpdate bool) error { +func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *utils.MinIOAdminClient, isUpdate bool) error { var s3Client *s3.S3 var cdmiClient *cdmi.Client var provName, provID string + // Create private buckets + + // Create shared buckets (if defined) // Create input buckets + createLogger.Printf("Creating input buckets ..") for _, in := range service.Input { provID, provName = getProviderInfo(in.Provider) @@ -269,10 +294,11 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient * path := strings.Trim(in.Path, " /") // Split buckets and folders from path splitPath := strings.SplitN(path, "/", 2) - // Create bucket + _, err := s3Client.CreateBucket(&s3.CreateBucketInput{ Bucket: aws.String(splitPath[0]), }) + if err != nil { if aerr, ok := err.(awserr.Error); ok { // Check if the error is caused because the bucket already exists @@ -286,13 +312,66 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient * } } + // Create generic folder(s) + var folderKey string + if len(splitPath) == 2 { + // Add "/" to the end of the key in order to create a folder + folderKey = fmt.Sprintf("%s/", splitPath[1]) + _, err := s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(splitPath[0]), + Key: aws.String(folderKey), + }) + if err != nil { + return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err) + } + // Enable MinIO notifications based on the Input []StorageIOConfig + if err := enableInputNotification(s3Client, service.GetMinIOWebhookARN(), splitPath[0], folderKey); err != nil { + return err + } + } + + if strings.ToUpper(service.IsolationLevel) == "USER" && len(service.BucketList) > 0 { + for i, b := range service.BucketList { + // Create a bucket for each allowed user if allowed_users is not empty + _, err = s3Client.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(b), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + // Check if the error is caused because the bucket already exists + if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou { + log.Printf("The bucket \"%s\" already exists\n", b) + } else { + return fmt.Errorf("error creating bucket %s: %v", b, err) + } + } else { + return fmt.Errorf("error creating bucket %s: %v", b, err) + } + } + _, err := s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(b), + Key: aws.String(folderKey), + }) + if err != nil { + return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, b, err) + } + if err := enableInputNotification(s3Client, service.GetMinIOWebhookARN(), b, folderKey); err != nil { + return err + } + + if !isAdminUser { + minIOAdminClient.CreateAddPolicy(b, service.AllowedUsers[i], false) + } + } + } + // Create group for the service and add users // Check if users in allowed_users have a MinIO associated user - // If new allowed users list is empty the service becomes public + // If new allowed users list is empty the service becames public if !isUpdate { if !isAdminUser { - if len(allowed_users) == 0 { - err = minIOAdminClient.AddServiceToAllUsersGroup(splitPath[0]) + if len(service.AllowedUsers) == 0 { + err = minIOAdminClient.CreateAddPolicy(splitPath[0], ALL_USERS_GROUP, true) if err != nil { return fmt.Errorf("error adding service %s to all users group: %v", splitPath[0], err) } @@ -302,38 +381,26 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient * return fmt.Errorf("error creating service group for bucket %s: %v", splitPath[0], err) } - err = minIOAdminClient.UpdateUsersInGroup(allowed_users, splitPath[0], false) + err = minIOAdminClient.UpdateUsersInGroup(service.AllowedUsers, splitPath[0], false) + if err != nil { + return err + } + err = minIOAdminClient.CreateAddPolicy(splitPath[0], splitPath[0], true) if err != nil { return err } } } } - // Create folder(s) - if len(splitPath) == 2 { - // Add "/" to the end of the key in order to create a folder - folderKey := fmt.Sprintf("%s/", splitPath[1]) - _, err := s3Client.PutObject(&s3.PutObjectInput{ - Bucket: aws.String(splitPath[0]), - Key: aws.String(folderKey), - }) - if err != nil { - return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err) - } - } - - // Enable MinIO notifications based on the Input []StorageIOConfig - if err := enableInputNotification(s3Client, service.GetMinIOWebhookARN(), in); err != nil { - return err - } } - + createLogger.Printf("Creating output buckets ..") // Create output buckets for _, out := range service.Output { provID, provName = getProviderInfo(out.Provider) // Check if the provider identifier is defined in StorageProviders if !isStorageProviderDefined(provName, provID, service.StorageProviders) { - disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider) + // TODO fix + disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "") return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID) } @@ -350,36 +417,54 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient * s3Client = service.StorageProviders.S3[provID].GetS3Client() } // Create bucket - _, err := s3Client.CreateBucket(&s3.CreateBucketInput{ - Bucket: aws.String(splitPath[0]), - }) - if err != nil { - if aerr, ok := err.(awserr.Error); ok { - // Check if the error is caused because the bucket already exists - if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou { - log.Printf("The bucket \"%s\" already exists\n", splitPath[0]) - } else { - disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider) - return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err) - } - } else { - disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider) - return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err) - } - } + // ==== TODO check if is necessary repeat the create bucket code ==== + // _, err := s3Client.CreateBucket(&s3.CreateBucketInput{ + // Bucket: aws.String(splitPath[0]), + // }) + + // // TODO Add disable notifications in case of an error for every bucket on service.BucketList + // if err != nil { + // if aerr, ok := err.(awserr.Error); ok { + // // Check if the error is caused because the bucket already exists + // if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou { + // log.Printf("The bucket \"%s\" already exists\n", splitPath[0]) + // } else { + // disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider) + // return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err) + // } + // } else { + // disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider) + // return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err) + // } + // } // Create folder(s) + var folderKey string if len(splitPath) == 2 { // Add "/" to the end of the key in order to create a folder - folderKey := fmt.Sprintf("%s/", splitPath[1]) + folderKey = fmt.Sprintf("%s/", splitPath[1]) _, err := s3Client.PutObject(&s3.PutObjectInput{ Bucket: aws.String(splitPath[0]), Key: aws.String(folderKey), }) if err != nil { - disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider) + disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), splitPath[0]) return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err) } } + + if service.IsolationLevel == "USER" && len(service.BucketList) > 0 { + for _, b := range service.BucketList { + _, err := s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(b), + Key: aws.String(folderKey), + }) + if err != nil { + return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, b, err) + } + + } + } + case types.OnedataName: cdmiClient = service.StorageProviders.Onedata[provID].GetCDMIClient() err := cdmiClient.CreateContainer(fmt.Sprintf("%s/%s", service.StorageProviders.Onedata[provID].Space, path), true) @@ -387,7 +472,8 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient * if err == cdmi.ErrBadRequest { log.Printf("Error creating \"%s\" folder in Onedata. Error: %v\n", path, err) } else { - disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider) + // TODO fix + disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "") return fmt.Errorf("error connecting to Onedata's Oneprovider \"%s\". Error: %v", service.StorageProviders.Onedata[provID].OneproviderHost, err) } } @@ -431,7 +517,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient * } if !isUpdate { if !isAdminUser { - if len(allowed_users) == 0 { + if len(service.AllowedUsers) == 0 { err = minIOAdminClient.AddServiceToAllUsersGroup(splitPath[0]) if err != nil { return fmt.Errorf("error adding service %s to all users group: %v", splitPath[0], err) @@ -442,7 +528,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient * return fmt.Errorf("error creating service group for bucket %s: %v", splitPath[0], err) } - err = minIOAdminClient.UpdateUsersInGroup(allowed_users, splitPath[0], false) + err = minIOAdminClient.UpdateUsersInGroup(service.AllowedUsers, splitPath[0], false) if err != nil { return err } @@ -530,18 +616,18 @@ func registerMinIOWebhook(name string, token string, minIO *types.MinIOProvider, return minIOAdminClient.RestartServer() } -func enableInputNotification(minIOClient *s3.S3, arnStr string, input types.StorageIOConfig) error { - path := strings.Trim(input.Path, " /") - // Split buckets and folders from path - splitPath := strings.SplitN(path, "/", 2) - +// TODO pass the user UID string +func enableInputNotification(minIOClient *s3.S3, arnStr string, bucket string, path string) error { + // path := strings.Trim(input.Path, " /") + // // Split buckets and folders from path + // splitPath := strings.SplitN(path, "/", 2) // Get current BucketNotificationConfiguration gbncRequest := &s3.GetBucketNotificationConfigurationRequest{ - Bucket: aws.String(splitPath[0]), + Bucket: aws.String(bucket), } nCfg, err := minIOClient.GetBucketNotificationConfiguration(gbncRequest) if err != nil { - return fmt.Errorf("error getting bucket \"%s\" notifications: %v", splitPath[0], err) + return fmt.Errorf("error getting bucket \"%s\" notifications: %v", bucket, err) } queueConfiguration := s3.QueueConfiguration{ QueueArn: aws.String(arnStr), @@ -549,13 +635,13 @@ func enableInputNotification(minIOClient *s3.S3, arnStr string, input types.Stor } // Add folder filter if required - if len(splitPath) == 2 { + if path != "" { queueConfiguration.Filter = &s3.NotificationConfigurationFilter{ Key: &s3.KeyFilter{ FilterRules: []*s3.FilterRule{ { Name: aws.String(s3.FilterRuleNamePrefix), - Value: aws.String(fmt.Sprintf("%s/", splitPath[1])), + Value: aws.String(path), }, }, }, @@ -565,13 +651,14 @@ func enableInputNotification(minIOClient *s3.S3, arnStr string, input types.Stor // Append the new queueConfiguration nCfg.QueueConfigurations = append(nCfg.QueueConfigurations, &queueConfiguration) pbncInput := &s3.PutBucketNotificationConfigurationInput{ - Bucket: aws.String(splitPath[0]), + Bucket: aws.String(bucket), NotificationConfiguration: nCfg, } // Enable the notification _, err = minIOClient.PutBucketNotificationConfiguration(pbncInput) - if err != nil { + + if err != nil && !strings.Contains(err.Error(), "An object key name filtering rule defined with overlapping prefixes") { return fmt.Errorf("error enabling bucket notification: %v", err) } diff --git a/pkg/handlers/delete.go b/pkg/handlers/delete.go index 19d0e126..241676b9 100644 --- a/pkg/handlers/delete.go +++ b/pkg/handlers/delete.go @@ -20,6 +20,8 @@ import ( "fmt" "log" "net/http" + "os" + "reflect" "strings" "github.com/aws/aws-sdk-go/aws" @@ -32,6 +34,9 @@ import ( "k8s.io/apimachinery/pkg/api/errors" ) +var ALL_USERS_GROUP = "all_users_group" +var deleteLogger = log.New(os.Stdout, "[DELETE-HANDLER] ", log.Flags()) + // MakeDeleteHandler makes a handler for deleting services func MakeDeleteHandler(cfg *types.Config, back types.ServerlessBackend) gin.HandlerFunc { return func(c *gin.Context) { @@ -65,33 +70,26 @@ func MakeDeleteHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand log.Printf("the provided MinIO configuration is not valid: %v", err) } - // Delete the group and policy - for _, in := range service.Input { - path := strings.Trim(in.Path, " /") - // Split buckets and folders from path - bucket := strings.SplitN(path, "/", 2) - var users []string - minIOAdminClient.UpdateUsersInGroup(users, bucket[0], true) - } - if service.Mount.Path != "" { path := strings.Trim(service.Mount.Path, " /") // Split buckets and folders from path bucket := strings.SplitN(path, "/", 2) var users []string + // Needed ? minIOAdminClient.UpdateUsersInGroup(users, bucket[0], true) } - // Disable input notifications - if err := disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, service.StorageProviders.MinIO[types.DefaultProvider]); err != nil { - log.Printf("Error disabling MinIO input notifications for service \"%s\": %v\n", service.Name, err) - } - // Remove the service's webhook in MinIO config and restart the server if err := removeMinIOWebhook(service.Name, minIOAdminClient); err != nil { log.Printf("Error removing MinIO webhook for service \"%s\": %v\n", service.Name, err) } + // Delete service buckets + err = deleteBuckets(service, cfg, minIOAdminClient) + if err != nil { + c.String(http.StatusInternalServerError, "Error deleting service buckets: ", err) + } + // Add Yunikorn queue if enabled if cfg.YunikornEnable { if err := utils.DeleteYunikornQueue(cfg, back.GetKubeClientset(), service); err != nil { @@ -112,44 +110,164 @@ func removeMinIOWebhook(name string, minIOAdminClient *utils.MinIOAdminClient) e return minIOAdminClient.RestartServer() } -func disableInputNotifications(arnStr string, input []types.StorageIOConfig, minIO *types.MinIOProvider) error { - parsedARN, _ := arn.Parse(arnStr) +func deleteBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *utils.MinIOAdminClient) error { + var s3Client *s3.S3 + var provName, provID string + + // Delete input buckets + for _, in := range service.Input { + provID, provName = getProviderInfo(in.Provider) + + // Only allow input from MinIO and dCache + if provName != types.MinIOName && provName != types.WebDavName { + return errInput + } - // Create S3 client for MinIO - minIOClient := minIO.GetS3Client() + // If the provider is WebDav (dCache) skip bucket creation + if provName == types.WebDavName { + continue + } + + // Check if the provider identifier is defined in StorageProviders + if !isStorageProviderDefined(provName, provID, service.StorageProviders) { + return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID) + } + + // Check if the input provider is the defined in the server config + if provID != types.DefaultProvider { + if !reflect.DeepEqual(*cfg.MinIOProvider, *service.StorageProviders.MinIO[provID]) { + return fmt.Errorf("the provided MinIO server \"%s\" is not the configured in OSCAR", service.StorageProviders.MinIO[provID].Endpoint) + } + } + + // Get client for the provider + s3Client = service.StorageProviders.MinIO[provID].GetS3Client() - for _, in := range input { path := strings.Trim(in.Path, " /") // Split buckets and folders from path splitPath := strings.SplitN(path, "/", 2) - updatedQueueConfigurations := []*s3.QueueConfiguration{} - // Get bucket notification - nCfg, err := minIOClient.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{Bucket: aws.String(splitPath[0])}) - if err != nil { - return fmt.Errorf("error getting bucket \"%s\" notifications: %v", splitPath[0], err) - } + // Delete policies from bucket + if len(service.AllowedUsers) == 0 { + deleteLogger.Println("Deleting public service bucket") + // Remove bucket resource from all users policy - // Filter elements that doesn't match with service's ARN - for _, q := range nCfg.QueueConfigurations { - queueARN, _ := arn.Parse(*q.QueueArn) - if queueARN.Resource == parsedARN.Resource && - queueARN.AccountID != parsedARN.AccountID { - updatedQueueConfigurations = append(updatedQueueConfigurations, q) + err := minIOAdminClient.RemoveFromPolicy(splitPath[0], ALL_USERS_GROUP, true) + if err != nil { + return fmt.Errorf("unable to remove bucket from policy %q, %v", ALL_USERS_GROUP, err) + } + } else { + // Empty users group + err := minIOAdminClient.UpdateUsersInGroup(service.AllowedUsers, splitPath[0], true) + if err != nil { + return fmt.Errorf("unable to delete users from group %q, %v", splitPath[0], err) + } + // Delete group + err = minIOAdminClient.UpdateUsersInGroup([]string{}, splitPath[0], true) + if err != nil { + return fmt.Errorf("unable delete group %q, %v", splitPath[0], err) + } + // Remove policy + err = minIOAdminClient.RemoveFromPolicy(splitPath[0], splitPath[0], true) + if err != nil { + return fmt.Errorf("unable to remove bucket from policy %q, %v", splitPath[0], err) + } + // Delete user's buckets if isolated spaces had been created + if strings.ToUpper(service.IsolationLevel) == "USER" && len(service.BucketList) > 0 { + // Delete all private buckets + deletePrivateBuckets(service, minIOAdminClient, s3Client) } } - // Put the updated bucket configuration - nCfg.QueueConfigurations = updatedQueueConfigurations - pbncInput := &s3.PutBucketNotificationConfigurationInput{ - Bucket: aws.String(splitPath[0]), - NotificationConfiguration: nCfg, + // Disable input notifications for service bucket + if err := disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), splitPath[0]); err != nil { + log.Printf("Error disabling MinIO input notifications for service \"%s\": %v\n", service.Name, err) + } + + } + + // Delete output buckets + for _, out := range service.Output { + provID, provName = getProviderInfo(out.Provider) + // Check if the provider identifier is defined in StorageProviders + if !isStorageProviderDefined(provName, provID, service.StorageProviders) { + // TODO fix + disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "") + return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID) } - _, err = minIOClient.PutBucketNotificationConfiguration(pbncInput) + + switch provName { + case types.MinIOName, types.S3Name: + // needed ? + + case types.OnedataName: + // TODO + } + } + + if service.Mount.Provider != "" { + // TODO check if some components of mount need to be deleted + + } + + return nil +} + +func deletePrivateBuckets(service *types.Service, minIOAdminClient *utils.MinIOAdminClient, s3Client *s3.S3) error { + for i, b := range service.BucketList { + // Disable input notifications for user bucket + if err := disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), b); err != nil { + log.Printf("Error disabling MinIO input notifications for service \"%s\": %v\n", service.Name, err) + } + //Delete bucket and unset the associated policy + err := minIOAdminClient.EmptyPolicy(service.AllowedUsers[i]) + if err != nil { + fmt.Println(err) + } + err = minIOAdminClient.RemoveFromPolicy(b, service.AllowedUsers[i], false) if err != nil { - return fmt.Errorf("error disabling bucket notification: %v", err) + return fmt.Errorf("unable to remove bucket from policy %q, %v", b, err) + } + /*if err := minIOAdminClient.DeleteBucket(s3Client, b, service.AllowedUsers[i]); err != nil { + return fmt.Errorf("unable to delete bucket %q, %v", b, err) + }*/ + } + return nil +} + +func disableInputNotifications(s3Client *s3.S3, arnStr string, bucket string) error { + parsedARN, _ := arn.Parse(arnStr) + + // path := strings.Trim(in.Path, " /") + // // Split buckets and folders from path + // splitPath := strings.SplitN(path, "/", 2) + + updatedQueueConfigurations := []*s3.QueueConfiguration{} + // Get bucket notification + nCfg, err := s3Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{Bucket: aws.String(bucket)}) + if err != nil { + return fmt.Errorf("error getting bucket \"%s\" notifications: %v", bucket, err) + } + + // Filter elements that doesn't match with service's ARN + for _, q := range nCfg.QueueConfigurations { + queueARN, _ := arn.Parse(*q.QueueArn) + if queueARN.Resource == parsedARN.Resource && + queueARN.AccountID != parsedARN.AccountID { + updatedQueueConfigurations = append(updatedQueueConfigurations, q) } } + // Put the updated bucket configuration + nCfg.QueueConfigurations = updatedQueueConfigurations + pbncInput := &s3.PutBucketNotificationConfigurationInput{ + Bucket: aws.String(bucket), + NotificationConfiguration: nCfg, + } + _, err = s3Client.PutBucketNotificationConfiguration(pbncInput) + if err != nil { + return fmt.Errorf("error disabling bucket notification: %v", err) + } + return nil } diff --git a/pkg/handlers/logs.go b/pkg/handlers/logs.go index cc809827..c26087a7 100644 --- a/pkg/handlers/logs.go +++ b/pkg/handlers/logs.go @@ -82,7 +82,12 @@ func MakeJobsInfoHandler(back types.ServerlessBackend, kubeClientset kubernetes. // Populate jobsInfo with status, start and finish times (from pods) for _, pod := range pods.Items { if jobName, ok := pod.Labels["job-name"]; ok { + fmt.Println(pod.Status) + //if pod.Status != nil && pod.Status.Phase != nil { jobsInfo[jobName].Status = string(pod.Status.Phase) + //} else { + // jobsInfo[jobName].Status = "unknown" + //} // Loop through job.Status.ContainerStatuses to find oscar-container for _, contStatus := range pod.Status.ContainerStatuses { if contStatus.Name == types.ContainerName { diff --git a/pkg/handlers/update.go b/pkg/handlers/update.go index e7dea13e..3e6fffb7 100644 --- a/pkg/handlers/update.go +++ b/pkg/handlers/update.go @@ -23,6 +23,7 @@ import ( "os" "strings" + "github.com/aws/aws-sdk-go/service/s3" "github.com/gin-gonic/gin" "github.com/grycap/oscar/v3/pkg/types" "github.com/grycap/oscar/v3/pkg/utils" @@ -36,7 +37,7 @@ var updateLogger = log.New(os.Stdout, "[CREATE-HANDLER] ", log.Flags()) // MakeUpdateHandler makes a handler for updating services func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.HandlerFunc { return func(c *gin.Context) { - var provName string + var provName, provID string var newService types.Service if err := c.ShouldBindJSON(&newService); err != nil { c.String(http.StatusBadRequest, fmt.Sprintf("The service specification is not valid: %v", err)) @@ -62,6 +63,7 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand } return } + if !isAdminUser { uid, err := auth.GetUIDFromContext(c) if err != nil { @@ -76,7 +78,7 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand // Set the owner on the new service definition newService.Owner = oldService.Owner - // If the service has changed VO check permission again + // If the service has changed VO check permisions again if newService.VO != "" && newService.VO != oldService.VO { for _, vo := range cfg.OIDCGroups { if vo == newService.VO { @@ -91,63 +93,44 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand } } minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg) - // Update the service - if err := back.UpdateService(newService); err != nil { - c.String(http.StatusInternalServerError, fmt.Sprintf("Error updating the service: %v", err)) - return - } for _, in := range oldService.Input { - // Split input provider - provSlice := strings.SplitN(strings.TrimSpace(in.Provider), types.ProviderSeparator, 2) - if len(provSlice) == 1 { - provName = strings.ToLower(provSlice[0]) - } else { - provName = strings.ToLower(provSlice[0]) - } + + provID, provName = getProviderInfo(in.Provider) + if provName == types.MinIOName { + s3Client := oldService.StorageProviders.MinIO[provID].GetS3Client() // Get bucket name path := strings.Trim(in.Path, " /") // Split buckets and folders from path splitPath := strings.SplitN(path, "/", 2) - oldAllowedLength := len(oldService.AllowedUsers) - newAllowedLength := len(newService.AllowedUsers) - if newAllowedLength != oldAllowedLength { - if newAllowedLength == 0 { - updateLogger.Printf("Updating service with public policies") - // If the new allowed users is empty make service public - err = minIOAdminClient.PrivateToPublicBucket(splitPath[0]) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - } else { - // If the service was public and now has a list of allowed users make its buckets private - if oldAllowedLength == 0 { - updateLogger.Printf("Updating service with private policies") - err = minIOAdminClient.PublicToPrivateBucket(splitPath[0], newService.AllowedUsers) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - } else { - // If allowed users list changed update policies on bucket - updateLogger.Printf("Updating service policies") - err = minIOAdminClient.UpdateUsersInGroup(oldService.AllowedUsers, splitPath[0], true) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - err = minIOAdminClient.UpdateUsersInGroup(newService.AllowedUsers, splitPath[0], false) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - } + // If isolation level was USER delete all private buckets + if oldService.IsolationLevel == "USER" { + err = deletePrivateBuckets(oldService, minIOAdminClient, s3Client) + if err != nil { + return + } + } + if newService.IsolationLevel == "USER" { + var newBucketList []string + var userBucket string + for _, user := range newService.AllowedUsers { + userBucket = splitPath[0] + "-" + user[:10] + newBucketList = append(newBucketList, userBucket) } + + newService.BucketList = newBucketList } + // Update the group with allowe users, it empthy and add them again + err = updateGroup(splitPath[0], oldService, &newService, minIOAdminClient, s3Client) + if err != nil { + return + } + + disableInputNotifications(s3Client, oldService.GetMinIOWebhookARN(), splitPath[0]) + // Register minio webhook and restart the server if err := registerMinIOWebhook(newService.Name, newService.Token, newService.StorageProviders.MinIO[types.DefaultProvider], cfg); err != nil { back.UpdateService(*oldService) @@ -155,8 +138,14 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand return } + // Update the service + if err := back.UpdateService(newService); err != nil { + c.String(http.StatusInternalServerError, fmt.Sprintf("Error updating the service: %v", err)) + return + } + // Update buckets - if err := updateBuckets(&newService, oldService, minIOAdminClient, cfg); err != nil { + if err := updateBuckets(&newService, &newService, minIOAdminClient, cfg); err != nil { if err == errInput { c.String(http.StatusBadRequest, err.Error()) } else { @@ -166,26 +155,44 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand back.UpdateService(*oldService) return } + } - } - // Add Yunikorn queue if enabled - if cfg.YunikornEnable { - if err := utils.AddYunikornQueue(cfg, back.GetKubeClientset(), &newService); err != nil { - log.Println(err.Error()) + // Add Yunikorn queue if enabled + if cfg.YunikornEnable { + if err := utils.AddYunikornQueue(cfg, back.GetKubeClientset(), &newService); err != nil { + log.Println(err.Error()) + } } + + c.Status(http.StatusNoContent) } - c.Status(http.StatusNoContent) } } +func updateGroup(group string, oldService *types.Service, newService *types.Service, minIOAdminClient *utils.MinIOAdminClient, s3Client *s3.S3) error { + //delete users in group + err := minIOAdminClient.UpdateUsersInGroup(oldService.AllowedUsers, group, true) + if err != nil { + return err + } + //add the new ones + err = minIOAdminClient.UpdateUsersInGroup(newService.AllowedUsers, group, false) + if err != nil { + return err + } + return nil +} + func updateBuckets(newService, oldService *types.Service, minIOAdminClient *utils.MinIOAdminClient, cfg *types.Config) error { // Disable notifications from oldService.Input - if err := disableInputNotifications(oldService.GetMinIOWebhookARN(), oldService.Input, oldService.StorageProviders.MinIO[types.DefaultProvider]); err != nil { - return fmt.Errorf("error disabling MinIO input notifications: %v", err) - } + + // TODO diable all old service notifications if needed + //if err := disableInputNotifications(oldService.GetMinIOWebhookARN(), oldService.Input); err != nil { + // return fmt.Errorf("error disabling MinIO input notifications: %v", err) + //} // Create the input and output buckets/folders from newService - return createBuckets(newService, cfg, minIOAdminClient, newService.AllowedUsers, true) + return createBuckets(newService, cfg, minIOAdminClient, true) } diff --git a/pkg/types/service.go b/pkg/types/service.go index 20098ab3..78c3c434 100644 --- a/pkg/types/service.go +++ b/pkg/types/service.go @@ -108,17 +108,6 @@ const ( // YAMLMarshal package-level yaml marshal function var YAMLMarshal = yaml.Marshal -type Expose struct { - MinScale int32 `json:"min_scale" default:"1"` - MaxScale int32 `json:"max_scale" default:"10"` - APIPort int `json:"api_port,omitempty" ` - CpuThreshold int32 `json:"cpu_threshold" default:"80" ` - RewriteTarget bool `json:"rewrite_target" default:"false" ` - NodePort int32 `json:"nodePort" default:"0" ` - DefaultCommand bool `json:"default_command" ` - SetAuth bool `json:"set_auth" ` -} - // Service represents an OSCAR service following the SCAR Function Definition Language type Service struct { // Name the name of the service @@ -224,7 +213,16 @@ type Service struct { // Optional ImagePullSecrets []string `json:"image_pull_secrets,omitempty"` - Expose Expose `json:"expose"` + Expose struct { + MinScale int32 `json:"min_scale" default:"1"` + MaxScale int32 `json:"max_scale" default:"10"` + APIPort int `json:"api_port,omitempty" ` + CpuThreshold int32 `json:"cpu_threshold" default:"80" ` + RewriteTarget bool `json:"rewrite_target" default:"false" ` + NodePort int32 `json:"nodePort" default:"0" ` + DefaultCommand bool `json:"default_command" ` + SetAuth bool `json:"set_auth" ` + } `json:"expose"` // The user-defined environment variables assigned to the service // Optional @@ -259,11 +257,18 @@ type Service struct { Owner string `json:"owner"` InterLinkNodeName string `json:"interlink_node_name"` - // List of EGI UID's identifying the users that will have visibility of the service and its MinIO storage provider - // Optional (If the list is empty we assume the visibility is public for all cluster users) + + // AllowedUsers list of EGI UID's identifying the users that will have visibility of the service and its MinIO storage provider + // Optional (If the list is empty we asume the visibility is public for all cluster users) AllowedUsers []string `json:"allowed_users"` - // Configuration to create a storage provider as a volume inside the service container + // IsolationLevel level of isolation for the buckets of the service (default:service) + IsolationLevel string `json:"isolation_level" default:"SERVICE"` + + // BucketList autogenerated list of private buckets based on the allowed_users of the service + BucketList []string `json:"bucket_list"` + + // Mount configuration to create a storage provider as a volume inside the service container // Optional Mount StorageIOConfig `json:"mount"` } diff --git a/pkg/utils/minio.go b/pkg/utils/minio.go index 8a71b6e7..36f47bae 100644 --- a/pkg/utils/minio.go +++ b/pkg/utils/minio.go @@ -27,6 +27,9 @@ import ( "os" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/grycap/oscar/v3/pkg/types" "github.com/minio/madmin-go" ) @@ -114,59 +117,37 @@ func (minIOAdminClient *MinIOAdminClient) CreateMinIOUser(ak string, sk string) return nil } -func (minIOAdminClient *MinIOAdminClient) PrivateToPublicBucket(bucketName string) error { - // Delete policy and group"" - var users []string - err := minIOAdminClient.UpdateUsersInGroup(users, bucketName, true) +func (minIOAdminClient *MinIOAdminClient) PrivateToPublicBucket(allowedUsers []string, bucketName string) error { + // Delete users in group + err := minIOAdminClient.UpdateUsersInGroup(allowedUsers, bucketName, true) if err != nil { return err } - // Add bucket to all_users_group policy - err = minIOAdminClient.AddServiceToAllUsersGroup(bucketName) + // Delete group + err = minIOAdminClient.UpdateUsersInGroup([]string{}, bucketName, true) if err != nil { return err } - return nil -} - -// TODO refactor to delete duplicated code -func (minIOAdminClient *MinIOAdminClient) PublicToPrivateBucket(bucketName string, allowedUsers []string) error { - // Delete bucket from all_users_group - rs := "arn:aws:s3:::" + bucketName + "/*" - groupName := ALL_USERS_GROUP - - policyInfo, errInfo := minIOAdminClient.adminClient.InfoCannedPolicyV2(context.TODO(), ALL_USERS_GROUP) - if errInfo != nil { - return errInfo - } - - actualPolicy := &Policy{} - json.Unmarshal(policyInfo.Policy, actualPolicy) - index := 0 - // Search for the resource index - resources := actualPolicy.Statement[0].Resource - for i, resource := range resources { - if resource == rs { - index = i - break - } - } - // Add new resource and create policy - actualPolicy.Statement[0].Resource = append(resources[:index], resources[index+1:]...) - policy, jsonErr := json.Marshal(actualPolicy) - if jsonErr != nil { - return jsonErr + // Remove policy + err = minIOAdminClient.RemoveFromPolicy(bucketName, bucketName, true) + if err != nil { + return err } - err := minIOAdminClient.adminClient.AddCannedPolicy(context.TODO(), groupName, []byte(policy)) + // Add bucket to all_users_group policy + err = minIOAdminClient.CreateAddPolicy(bucketName, ALL_USERS_GROUP, true) if err != nil { - return fmt.Errorf("error creating MinIO policy for group %s: %v", groupName, err) + return err } + return nil +} - err = minIOAdminClient.adminClient.SetPolicy(context.TODO(), groupName, groupName, true) +func (minIOAdminClient *MinIOAdminClient) PublicToPrivateBucket(bucketName string, allowedUsers []string) error { + // Delete bucket from all_users_group + err := minIOAdminClient.RemoveFromPolicy(bucketName, ALL_USERS_GROUP, true) if err != nil { - return fmt.Errorf("error setting MinIO policy for group %s: %v", groupName, err) + return err } err = minIOAdminClient.CreateServiceGroup(bucketName) @@ -215,7 +196,7 @@ func (minIOAdminClient *MinIOAdminClient) AddServiceToAllUsersGroup(bucketName s return nil } -// AddServiceToAllUsersGroup associates policy of all users to a service +// RemovedServiceFromAllUsersGroup associates policy of all users to a service func (minIOAdminClient *MinIOAdminClient) RemovedServiceFromAllUsersGroup(bucketName string) error { err := createPolicy(minIOAdminClient.adminClient, bucketName, true) if err != nil { @@ -225,21 +206,32 @@ func (minIOAdminClient *MinIOAdminClient) RemovedServiceFromAllUsersGroup(bucket return nil } +func (minIOAdminClient *MinIOAdminClient) DeleteBucket(s3Client *s3.S3, bucketName string, policyName string) error { + + iter := s3manager.NewDeleteListIterator(s3Client, &s3.ListObjectsInput{ + Bucket: aws.String(bucketName), + }) + + if err := s3manager.NewBatchDeleteWithClient(s3Client).Delete(aws.BackgroundContext(), iter); err != nil { + return fmt.Errorf("unable to delete objects from bucket %q, %v", bucketName, err) + } + + _, err := s3Client.DeleteBucket(&s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + return fmt.Errorf("unable to delete bucket %q, %v", bucketName, err) + } + + return nil +} + // UpdateUsersGroup func (minIOAdminClient *MinIOAdminClient) UpdateUsersInGroup(users []string, groupName string, remove bool) error { - var members []string - if len(users) < 1 { - description, err := minIOAdminClient.adminClient.GetGroupDescription(context.Background(), groupName) - if err != nil { - return err - } - members = description.Members - } else { - members = users - } + group := madmin.GroupAddRemove{ Group: groupName, - Members: members, + Members: users, Status: "enable", IsRemove: remove, } @@ -289,6 +281,56 @@ func (minIOAdminClient *MinIOAdminClient) RestartServer() error { return nil } +func (minIOAdminClient *MinIOAdminClient) CreateAddPolicy(bucketName string, policyName string, isGroup bool) error { + var jsonErr error + var policy []byte + + rs := "arn:aws:s3:::" + bucketName + "/*" + + policyInfo, errInfo := minIOAdminClient.adminClient.InfoCannedPolicyV2(context.TODO(), policyName) + if errInfo != nil { + // If the policy does not exist create it + p := `{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:*" + ], + "Resource": [ + "arn:aws:s3:::` + bucketName + `/*" + ] + } + ] + }` + policy = []byte(p) + } else { + actualPolicy := &Policy{} + json.Unmarshal(policyInfo.Policy, actualPolicy) + + // Add new resource and create policy + actualPolicy.Statement[0].Resource = append(actualPolicy.Statement[0].Resource, rs) + + policy, jsonErr = json.Marshal(actualPolicy) + if jsonErr != nil { + return jsonErr + } + } + + err := minIOAdminClient.adminClient.AddCannedPolicy(context.TODO(), policyName, []byte(policy)) + if err != nil { + return fmt.Errorf("error creating/adding MinIO policy for user/group %s: %v", policyName, err) + } + + err = minIOAdminClient.adminClient.SetPolicy(context.TODO(), policyName, policyName, isGroup) + if err != nil { + return fmt.Errorf("error setting MinIO policy for user/group %s: %v", policyName, err) + } + + return nil +} + func createPolicy(adminClient *madmin.AdminClient, bucketName string, allUsers bool) error { var groupName string var policy []byte @@ -345,6 +387,54 @@ func createPolicy(adminClient *madmin.AdminClient, bucketName string, allUsers b return nil } +func (minIOAdminClient *MinIOAdminClient) RemoveFromPolicy(bucketName string, policyName string, isGroup bool) error { + + rs := "arn:aws:s3:::" + bucketName + "/*" + policyInfo, errInfo := minIOAdminClient.adminClient.InfoCannedPolicyV2(context.TODO(), policyName) + if errInfo != nil { + return fmt.Errorf("policy '%s' does not exist: %v", policyName, errInfo) + } + actualPolicy := &Policy{} + json.Unmarshal(policyInfo.Policy, actualPolicy) + if len(actualPolicy.Statement[0].Resource) == 1 { + if err := minIOAdminClient.adminClient.RemoveCannedPolicy(context.TODO(), policyName); err != nil { + return fmt.Errorf("error removing canned policy: %v", err) + } + return nil + } else { + for i, r := range actualPolicy.Statement[0].Resource { + if r == rs { + actualPolicy.Statement[0].Resource = append(actualPolicy.Statement[0].Resource[:i], actualPolicy.Statement[0].Resource[i+1:]...) + break + } + } + } + + policy, jsonErr := json.Marshal(actualPolicy) + if jsonErr != nil { + return jsonErr + } + + err := minIOAdminClient.adminClient.AddCannedPolicy(context.TODO(), policyName, []byte(policy)) + if err != nil { + return fmt.Errorf("error creating MinIO policy for user %s: %v", policyName, err) + } + + err = minIOAdminClient.adminClient.SetPolicy(context.TODO(), policyName, policyName, isGroup) + if err != nil { + return fmt.Errorf("error setting MinIO policy for user %s: %v", policyName, err) + } + return nil +} + +func (minIOAdminClient *MinIOAdminClient) EmptyPolicy(policyName string) error { + err := minIOAdminClient.adminClient.SetPolicy(context.TODO(), "", policyName, false) + if err != nil { + return fmt.Errorf("error setting MinIO policy for group %s: %v", policyName, err) + } + return nil +} + func createGroup(adminClient *madmin.AdminClient, groupName string) error { group := madmin.GroupAddRemove{ Group: groupName,