Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge queue: embarking release-v3.13 (aa15e64) and #5032 together #5035

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2480,7 +2480,7 @@ var _ = Describe(cephfsType, func() {

By("test volumeGroupSnapshot", func() {
scName := "csi-cephfs-sc"
snapshotter, err := newCephFSVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3)
snapshotter, err := newCephFSVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3, 0)
if err != nil {
framework.Failf("failed to create volumeGroupSnapshot Base: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion e2e/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4881,7 +4881,7 @@ var _ = Describe("RBD", func() {
}

scName := "csi-rbd-sc"
snapshotter, err := newRBDVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3)
snapshotter, err := newRBDVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3, 5)
if err != nil {
framework.Failf("failed to create RBDVolumeGroupSnapshot: %v", err)
}
Expand Down
10 changes: 6 additions & 4 deletions e2e/volumegroupsnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ var _ VolumeGroupSnapshotter = &cephFSVolumeGroupSnapshot{}
func newCephFSVolumeGroupSnapshot(f *framework.Framework, namespace,
storageClass string,
blockPVC bool,
timeout, totalPVCCount int,
timeout, totalPVCCount, additionalVGSnapshotCount int,
) (VolumeGroupSnapshotter, error) {
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC, timeout, totalPVCCount)
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC,
timeout, totalPVCCount, additionalVGSnapshotCount)
if err != nil {
return nil, fmt.Errorf("failed to create volumeGroupSnapshotterBase: %w", err)
}
Expand Down Expand Up @@ -144,9 +145,10 @@ var _ VolumeGroupSnapshotter = &rbdVolumeGroupSnapshot{}
func newRBDVolumeGroupSnapshot(f *framework.Framework, namespace,
storageClass string,
blockPVC bool,
timeout, totalPVCCount int,
timeout, totalPVCCount, additionalVGSnapshotCount int,
) (VolumeGroupSnapshotter, error) {
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC, timeout, totalPVCCount)
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC,
timeout, totalPVCCount, additionalVGSnapshotCount)
if err != nil {
return nil, fmt.Errorf("failed to create volumeGroupSnapshotterBase: %w", err)
}
Expand Down
52 changes: 35 additions & 17 deletions e2e/volumegroupsnapshot_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,21 @@ type VolumeGroupSnapshotter interface {
}

type volumeGroupSnapshotterBase struct {
timeout int
framework *framework.Framework
groupclient *groupsnapclient.GroupsnapshotV1beta1Client
snapClient *snapclient.SnapshotV1Client
storageClassName string
blockPVC bool
totalPVCCount int
namespace string
timeout int
framework *framework.Framework
groupclient *groupsnapclient.GroupsnapshotV1beta1Client
snapClient *snapclient.SnapshotV1Client
storageClassName string
blockPVC bool
totalPVCCount int
additionalVGSnapshotCount int
namespace string
}

func newVolumeGroupSnapshotBase(f *framework.Framework, namespace,
storageClass string,
blockPVC bool,
timeout, totalPVCCount int,
timeout, totalPVCCount, additionalVGSnapshotCount int,
) (*volumeGroupSnapshotterBase, error) {
config, err := framework.LoadConfig()
if err != nil {
Expand All @@ -105,14 +106,15 @@ func newVolumeGroupSnapshotBase(f *framework.Framework, namespace,
}

return &volumeGroupSnapshotterBase{
framework: f,
groupclient: c,
snapClient: s,
namespace: namespace,
storageClassName: storageClass,
blockPVC: blockPVC,
timeout: timeout,
totalPVCCount: totalPVCCount,
framework: f,
groupclient: c,
snapClient: s,
namespace: namespace,
storageClassName: storageClass,
blockPVC: blockPVC,
timeout: timeout,
totalPVCCount: totalPVCCount,
additionalVGSnapshotCount: additionalVGSnapshotCount,
}, err
}

Expand Down Expand Up @@ -478,6 +480,22 @@ func (v *volumeGroupSnapshotterBase) testVolumeGroupSnapshot(vol VolumeGroupSnap
return fmt.Errorf("failed to create volume group snapshot: %w", err)
}

// Create and delete additional group snapshots.
for i := range v.additionalVGSnapshotCount {
newVGSName := fmt.Sprintf("%s-%d", vgsName, i)
_, err = v.CreateVolumeGroupSnapshot(newVGSName, vgscName, pvcLabels)
if err != nil {
return fmt.Errorf("failed to create volume group snapshot %q: %w", newVGSName, err)
}
}
for i := range v.additionalVGSnapshotCount {
newVGSName := fmt.Sprintf("%s-%d", vgsName, i)
err = v.DeleteVolumeGroupSnapshot(newVGSName)
if err != nil {
return fmt.Errorf("failed to delete volume group snapshot %q: %w", newVGSName, err)
}
}

clonePVCs, err := v.CreatePVCClones(volumeGroupSnapshot)
if err != nil {
return fmt.Errorf("failed to create clones: %w", err)
Expand Down
37 changes: 26 additions & 11 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
// are more than the `minSnapshotOnImage` Add a task to flatten all the
// temporary cloned images.
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, err := rbdVol.listSnapshots()
snaps, children, err := rbdVol.listSnapAndChildren()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -589,9 +589,19 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
len(snaps),
rbdVol,
maxSnapshotsOnImage)

if len(children) == 0 {
// if none of the child images(are in trash) exist, we can't flatten them.
// return ResourceExhausted error message as we have reached the hard limit.
log.ErrorLog(ctx, "child images of image %q cannot be flatten", rbdVol)

return status.Errorf(codes.ResourceExhausted,
"rbd image %q has %d snapshots but child images cannot be flattened",
rbdVol, len(snaps))
}
err = flattenClonedRbdImages(
ctx,
snaps,
children,
rbdVol.Pool,
rbdVol.Monitors,
rbdVol.RbdImageName,
Expand All @@ -610,13 +620,23 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
len(snaps),
rbdVol,
minSnapshotsOnImageToStartFlatten)
if len(children) == 0 {
// if none of the child images(are in trash) exist, we can't flatten them.
// return nil since we have only reach the soft limit.
log.DebugLog(ctx, "child images of image %q cannot be flatten", rbdVol)

return nil
}
// If we start flattening all the snapshots at one shot the volume
// creation time will be affected,so we will flatten only the extra
// snapshots.
snaps = snaps[minSnapshotsOnImageToStartFlatten-1:]
// snapshots. Use the min of the extra snapshots and the number of children
// to avoid scenario where number of children are less than the extra snapshots.
// This occurs when the child images are in trash and not yet deleted.
extraSnapshots := min((len(snaps) - int(minSnapshotsOnImageToStartFlatten)), len(children))
children = children[:extraSnapshots]
err = flattenClonedRbdImages(
ctx,
snaps,
children,
rbdVol.Pool,
rbdVol.Monitors,
rbdVol.RbdImageName,
Expand Down Expand Up @@ -1157,7 +1177,7 @@ func (cs *ControllerServer) CreateSnapshot(
return cloneFromSnapshot(ctx, rbdVol, rbdSnap, cr, req.GetParameters())
}

err = flattenTemporaryClonedImages(ctx, rbdVol, cr)
err = rbdVol.PrepareVolumeForSnapshot(ctx, cr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1376,11 +1396,6 @@ func (cs *ControllerServer) doSnapshotClone(
return cloneRbd, err
}

err = cloneRbd.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return cloneRbd, err
}

return cloneRbd, nil
}

Expand Down
23 changes: 23 additions & 0 deletions internal/rbd/group_controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
// group snapshot and remove all images from the group again. This leaves the
// group and its snapshot around, the group snapshot can be inspected to list
// the snapshots of the images.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity.
func (cs *ControllerServer) CreateVolumeGroupSnapshot(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
Expand Down Expand Up @@ -130,6 +132,27 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(
"failed to get existing one with name %q: %v", vgsName, err)
}

creds, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer creds.DeleteCredentials()

errList := make([]error, 0)
for _, volume := range volumes {
err = volume.PrepareVolumeForSnapshot(ctx, creds)
if err != nil {
errList = append(errList, err)
}
}
if len(errList) > 0 {
// FIXME: we should probably choose a error code that has more priority.
return nil, status.Errorf(
status.Code(errList[0]),
"failed to prepare volumes for snapshot: %v",
errList)
}

// create a temporary VolumeGroup with a different name
vg, err = mgr.CreateVolumeGroup(ctx, vgName)
if err != nil {
Expand Down
101 changes: 37 additions & 64 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,13 +785,9 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) {
}
}

type trashSnapInfo struct {
origSnapName string
}

func flattenClonedRbdImages(
ctx context.Context,
snaps []librbd.SnapInfo,
children []string,
pool, monitors, rbdImageName string,
cr *util.Credentials,
) error {
Expand All @@ -807,26 +803,9 @@ func flattenClonedRbdImages(

return err
}
var origNameList []trashSnapInfo
for _, snapInfo := range snaps {
// check if the snapshot belongs to trash namespace.
isTrash, retErr := rv.isTrashSnap(snapInfo.Id)
if retErr != nil {
return retErr
}

if isTrash {
// get original snap name for the snapshot in trash namespace
origSnapName, retErr := rv.getOrigSnapName(snapInfo.Id)
if retErr != nil {
return retErr
}
origNameList = append(origNameList, trashSnapInfo{origSnapName})
}
}

for _, snapName := range origNameList {
rv.RbdImageName = snapName.origSnapName
for _, childName := range children {
rv.RbdImageName = childName
err = rv.flattenRbdImage(ctx, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
log.ErrorLog(ctx, "failed to flatten %s; err %v", rv, err)
Expand Down Expand Up @@ -2052,57 +2031,26 @@ func (ri *rbdImage) DisableDeepFlatten() error {
return image.UpdateFeatures(librbd.FeatureDeepFlatten, false)
}

func (ri *rbdImage) listSnapshots() ([]librbd.SnapInfo, error) {
// listSnapAndChildren returns list of names of snapshots and child images.
func (ri *rbdImage) listSnapAndChildren() ([]librbd.SnapInfo, []string, error) {
image, err := ri.open()
if err != nil {
return nil, err
return nil, nil, err
}
defer image.Close()

snapInfoList, err := image.GetSnapshotNames()
if err != nil {
return nil, err
}

return snapInfoList, nil
}

// isTrashSnap returns true if the snapshot belongs to trash namespace.
func (ri *rbdImage) isTrashSnap(snapID uint64) (bool, error) {
image, err := ri.open()
if err != nil {
return false, err
}
defer image.Close()

// Get namespace type for the snapshot
nsType, err := image.GetSnapNamespaceType(snapID)
if err != nil {
return false, err
}

if nsType == librbd.SnapNamespaceTypeTrash {
return true, nil
}

return false, nil
}

// getOrigSnapName returns the original snap name for
// the snapshots in Trash Namespace.
func (ri *rbdImage) getOrigSnapName(snapID uint64) (string, error) {
image, err := ri.open()
snaps, err := image.GetSnapshotNames()
if err != nil {
return "", err
return nil, nil, err
}
defer image.Close()

origSnapName, err := image.GetSnapTrashNamespace(snapID)
// ListChildren() returns pools, images, err.
_, children, err := image.ListChildren()
if err != nil {
return "", err
return nil, nil, err
}

return origSnapName, nil
return snaps, children, nil
}

func (ri *rbdImage) isCompatibleEncryption(dst *rbdImage) error {
Expand Down Expand Up @@ -2277,3 +2225,28 @@ func (ri *rbdImage) GetClusterID(ctx context.Context) (string, error) {

return ri.ClusterID, nil
}

func (rv *rbdVolume) PrepareVolumeForSnapshot(ctx context.Context, cr *util.Credentials) error {
hardLimit := rbdHardMaxCloneDepth
softLimit := rbdSoftMaxCloneDepth
err := flattenTemporaryClonedImages(ctx, rv, cr)
if err != nil {
return err
}

// choosing 2, since snapshot adds one depth and we'll be flattening the parent.
const depthToAvoidFlatten = 2
if rbdHardMaxCloneDepth > depthToAvoidFlatten {
hardLimit = rbdHardMaxCloneDepth - depthToAvoidFlatten
}
if rbdSoftMaxCloneDepth > depthToAvoidFlatten {
softLimit = rbdSoftMaxCloneDepth - depthToAvoidFlatten
}

err = rv.flattenParent(ctx, hardLimit, softLimit)
if err != nil {
return getGRPCErrorForCreateVolume(err)
}

return nil
}
4 changes: 4 additions & 0 deletions internal/rbd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func (rv *rbdVolume) NewSnapshotByID(
return nil, err
}

// set the features for the clone image.
f := []string{librbd.FeatureNameLayering, librbd.FeatureNameDeepFlatten}
rv.ImageFeatureSet = librbd.FeatureSetFromNames(f)

options, err := rv.constructImageOptions(ctx)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions internal/rbd/types/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Volume interface {
// if the parent image is in trash, it returns an error.
// if the parent image exists and is not enabled for mirroring, it returns an error.
HandleParentImageExistence(ctx context.Context, flattenMode FlattenMode) error
// PrepareVolumeForSnapshot prepares the volume for snapshot by
// checking snapshots limit and clone depth limit and flatten it
// if required.
PrepareVolumeForSnapshot(ctx context.Context, cr *util.Credentials) error

// ToMirror converts the Volume to a Mirror.
ToMirror() (Mirror, error)
Expand Down
Loading