Skip to content

Commit

Permalink
First version of multitenancy implementation (testing)
Browse files Browse the repository at this point in the history
  • Loading branch information
catttam committed Jan 19, 2024
1 parent c5cd713 commit e730f20
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 28 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
r := gin.Default()

// Define system group with basic auth middleware
system := r.Group("/system", auth.GetAuthMiddleware(cfg))
system := r.Group("/system", auth.GetAuthMiddleware(cfg, kubeClientset))

// Config path
system.GET("/config", handlers.MakeConfigHandler(cfg))
Expand Down
55 changes: 36 additions & 19 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,18 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
checkValues(&service, cfg)

if service.VO != "" {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)

authHeader := c.GetHeader("Authorization")
rawToken := strings.TrimPrefix(authHeader, "Bearer ")
hasVO, err2 := oidcManager.UserHasVO(rawToken, service.VO)

if err2 != nil {
c.String(http.StatusInternalServerError, err2.Error())
return
}

if !hasVO {
c.String(http.StatusBadRequest, fmt.Sprintf("This user isn't enrrolled on the vo: %v", service.VO))
return
err := checkVOIdentity(&service, cfg, authHeader)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintf("%v"), err)
}
}

uid_origin, _ := c.Get("uid_origin")
uid := fmt.Sprintf("%v", uid_origin)
service.Labels["uid"] = uid
service.AllowedUsers = append(service.AllowedUsers, uid)

// Create the service
if err := back.CreateService(service); err != nil {
// Check if error is caused because the service name provided already exists
Expand All @@ -93,7 +88,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
}

// Create buckets/folders based on the Input and Output and enable notifications
if err := createBuckets(&service, cfg); err != nil {
if err := createBuckets(&service, cfg, service.AllowedUsers); err != nil {
if err == errInput {
c.String(http.StatusBadRequest, err.Error())
} else {
Expand Down Expand Up @@ -140,10 +135,6 @@ func checkValues(service *types.Service, cfg *types.Config) {
service.Labels[types.YunikornApplicationIDLabel] = service.Name
service.Labels[types.YunikornQueueLabel] = fmt.Sprintf("%s.%s.%s", types.YunikornRootQueue, types.YunikornOscarQueue, service.Name)

if service.VO != "" {
service.Labels["vo"] = service.VO
}

// Create default annotations map
if service.Annotations == nil {
service.Annotations = make(map[string]string)
Expand Down Expand Up @@ -171,7 +162,7 @@ func checkValues(service *types.Service, cfg *types.Config) {
service.Token = utils.GenerateToken()
}

func createBuckets(service *types.Service, cfg *types.Config) error {
func createBuckets(service *types.Service, cfg *types.Config, allowed_users []string) error {
var s3Client *s3.S3
var cdmiClient *cdmi.Client
var provName, provID string
Expand Down Expand Up @@ -233,6 +224,14 @@ func createBuckets(service *types.Service, cfg *types.Config) error {
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
}

// Create group for the service and add users
// TODO error control

minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg)
minIOAdminClient.CreateServiceGroup(splitPath[0])
minIOAdminClient.AddUserToGroup(allowed_users, splitPath[0])

// Create folder(s)
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
Expand Down Expand Up @@ -347,6 +346,24 @@ func isStorageProviderDefined(storageName string, storageID string, providers *t
return ok
}

func checkVOIdentity(service *types.Service, cfg *types.Config, authHeader string) error {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)
rawToken := strings.TrimPrefix(authHeader, "Bearer ")
hasVO, err := oidcManager.UserHasVO(rawToken, service.VO)

if err != nil {
return err
}

if !hasVO {
return fmt.Errorf("This user isn't enrrolled on the vo: %v", service.VO)
}

service.Labels["vo"] = service.VO

return nil
}

func registerMinIOWebhook(name string, token string, minIO *types.MinIOProvider, cfg *types.Config) error {
minIOAdminClient, err := utils.MakeMinIOAdminClient(cfg)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion pkg/handlers/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
)

// TODO comprobar VO como en createService
// MakeUpdateHandler makes a handler for updating services
func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.HandlerFunc {
return func(c *gin.Context) {
Expand All @@ -53,6 +54,14 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
return
}

if newService.VO != "" && newService.VO != oldService.VO {
authHeader := c.GetHeader("Authorization")
err := checkVOIdentity(&newService, cfg, authHeader)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintf("%v"), err)
}
}

// Update the service
if err := back.UpdateService(newService); err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintf("Error updating the service: %v", err))
Expand All @@ -68,6 +77,7 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
provName = strings.ToLower(provSlice[0])
}
if provName == types.MinIOName {

// Register minio webhook and restart the server
if err := registerMinIOWebhook(newService.Name, newService.Token, newService.StorageProviders.MinIO[types.DefaultProvider], cfg); err != nil {
back.UpdateService(*oldService)
Expand Down Expand Up @@ -107,5 +117,6 @@ func updateBuckets(newService, oldService *types.Service, cfg *types.Config) err
}

// Create the input and output buckets/folders from newService
return createBuckets(newService, cfg)
// TODO fix
return createBuckets(newService, cfg, newService.AllowedUsers)
}
4 changes: 4 additions & 0 deletions pkg/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ type Service struct {
// Clusters configuration for the OSCAR clusters that can be used as service's replicas
// Optional
Clusters map[string]Cluster `json:"clusters,omitempty"`

// List of EGI UID's identifying the users that will have visibility of the service and its MinIO storage provider
// Optional (If the list is empty we asume the visibility is public for all cluster users)
AllowedUsers []string `json:"allowed_users"`
}

// ToPodSpec returns a k8s podSpec from the Service
Expand Down
17 changes: 13 additions & 4 deletions pkg/utils/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,37 @@ import (

"github.com/gin-gonic/gin"
"github.com/grycap/oscar/v2/pkg/types"
"github.com/grycap/oscar/v2/pkg/utils"
"k8s.io/client-go/kubernetes"
)

// GetAuthMiddleware returns the appropriate gin auth middleware
func GetAuthMiddleware(cfg *types.Config) gin.HandlerFunc {
func GetAuthMiddleware(cfg *types.Config, kubeClientset *kubernetes.Clientset) gin.HandlerFunc {
if !cfg.OIDCEnable {
return gin.BasicAuth(gin.Accounts{
// Use the config's username and password for basic auth
cfg.Username: cfg.Password,
})
}
return CustomAuth(cfg)
return CustomAuth(cfg, kubeClientset)
}

// CustomAuth returns a custom auth handler (gin middleware)
func CustomAuth(cfg *types.Config) gin.HandlerFunc {
func CustomAuth(cfg *types.Config, kubeClientset *kubernetes.Clientset) gin.HandlerFunc {
basicAuthHandler := gin.BasicAuth(gin.Accounts{
// Use the config's username and password for basic auth
cfg.Username: cfg.Password,
})

oidcHandler := getOIDCMiddleware(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)
//TODO Initialize MinIO client and create all_users_group
minIOAdminClient, err := utils.MakeMinIOAdminClient(cfg)
if err != nil {
// TODO manage error
}

minIOAdminClient.CreateAllUsersGroup()

oidcHandler := getOIDCMiddleware(kubeClientset, minIOAdminClient, cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)
return func(c *gin.Context) {
authHeader := c.GetHeader("Authorization")
if strings.HasPrefix(authHeader, "Bearer ") {
Expand Down
92 changes: 92 additions & 0 deletions pkg/utils/auth/multitenancy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright (C) GRyCAP - I3M - UPV
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package auth

import (
"context"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

const ServicesNamespace = "oscar-svc"

type multitenancyConfig struct {
kubeClientset *kubernetes.Clientset
owner_uid string
usersCache []string
}

func NewMultitenancyConfig(kubeClientset *kubernetes.Clientset, uid string) *multitenancyConfig {
return &multitenancyConfig{
kubeClientset: kubeClientset,
owner_uid: uid,
usersCache: []string{uid},
}
}

// TODO periodically check that the users stored on cache still exist on MinIO (cronjob)
func (mc *multitenancyConfig) UpdateCacheStatus() {
// 1. List users on MinIO
// 2. List secrets
// 3. Compare both lists and delete from secrets the missmatchs
// 4. updateCache
}

func (mc *multitenancyConfig) UpdateCache(uid string) {
mc.usersCache = append(mc.usersCache, uid)
}

func (mc *multitenancyConfig) ClearCache() {
mc.usersCache = nil
}

func (mc *multitenancyConfig) UserExists(uid string) bool {
for _, id := range mc.usersCache {
if id == uid {
return true
}
}
return false
}

func (mc *multitenancyConfig) CreateSecretForOIDC(uid string, sk string) error {

secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: uid,
Namespace: ServicesNamespace,
},
Immutable: new(bool),
StringData: map[string]string{
"oidc_uid": uid,
"accessKey": uid,
"secretKey": sk,
},
}

_, err := mc.kubeClientset.CoreV1().Secrets(ServicesNamespace).Create(context.TODO(), secret, metav1.CreateOptions{})

if err != nil {
return err
}

mc.UpdateCache(uid)

return nil
}
42 changes: 39 additions & 3 deletions pkg/utils/auth/oidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,24 @@ package auth

import (
"context"

"crypto/rand"
"encoding/base64"
"net/http"
"strings"

"github.com/coreos/go-oidc/v3/oidc"
"github.com/gin-gonic/gin"
"github.com/grycap/oscar/v2/pkg/utils"
"golang.org/x/oauth2"
"k8s.io/client-go/kubernetes"
)

// EGIGroupsURNPrefix prefix to identify EGI group URNs
const EGIGroupsURNPrefix = "urn:mace:egi.eu:group"
const (
// EGIGroupsURNPrefix prefix to identify EGI group URNs
EGIGroupsURNPrefix = "urn:mace:egi.eu:group"
SecretKeyLength = 10
)

// oidcManager struct to represent a OIDC manager, including a cache of tokens
type oidcManager struct {
Expand Down Expand Up @@ -65,14 +73,16 @@ func NewOIDCManager(issuer string, subject string, groups []string) (*oidcManage
}

// getIODCMiddleware returns the Gin's handler middleware to validate OIDC-based auth
func getOIDCMiddleware(issuer string, subject string, groups []string) gin.HandlerFunc {
func getOIDCMiddleware(kubeClientset *kubernetes.Clientset, minIOAdminClient *utils.MinIOAdminClient, issuer string, subject string, groups []string) gin.HandlerFunc {
oidcManager, err := NewOIDCManager(issuer, subject, groups)
if err != nil {
return func(c *gin.Context) {
c.AbortWithStatus(http.StatusUnauthorized)
}
}

mc := NewMultitenancyConfig(kubeClientset, subject)

return func(c *gin.Context) {
// Get token from headers
authHeader := c.GetHeader("Authorization")
Expand All @@ -87,6 +97,23 @@ func getOIDCMiddleware(issuer string, subject string, groups []string) gin.Handl
c.AbortWithStatus(http.StatusUnauthorized)
return
}

ui, _ := oidcManager.getUserInfo(rawToken)
uid := ui.subject

// Check if exist MinIO user in cached users list
exists := mc.UserExists(uid)
if !exists {
sk, err := generateRandomKey(SecretKeyLength)
if err != nil {
//TODO manage errr
}
// Create MinIO user and k8s secret with credentials
mc.CreateSecretForOIDC(uid, sk)
minIOAdminClient.CreateMinIOUser(uid, sk)

c.Set("uid_origin", uid)
}
}
}

Expand Down Expand Up @@ -194,3 +221,12 @@ func (om *oidcManager) isAuthorised(rawToken string) bool {

return false
}

func generateRandomKey(length int) (string, error) {
key := make([]byte, length)
_, err := rand.Read(key)
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(key), nil
}
Loading

0 comments on commit e730f20

Please sign in to comment.