From 15a2ece98be4d46f473c90950fcda3666c37aa0c Mon Sep 17 00:00:00 2001 From: Aaron Jeyaraj Date: Fri, 12 Jul 2024 23:25:19 +0000 Subject: [PATCH] Initialize CSI Driver --- .github/workflows/build-publish-image.yml | 46 +++ .github/workflows/golangci-lint.yml | 24 ++ .github/workflows/tag-release.yml | 43 ++ .gitignore | 15 + .gitlab-ci.yml | 42 ++ .gitlab/CODEOWNERS | 12 + .golangci.yml | 118 ++++++ Dockerfile | 30 ++ Makefile | 81 ++++ README.md | 40 +- charts/crusoe-csi-driver/Chart.yaml | 18 + .../templates/controller.yaml | 85 ++++ .../templates/csidriver.yaml | 7 + charts/crusoe-csi-driver/templates/node.yaml | 85 ++++ charts/crusoe-csi-driver/templates/rbac.yaml | 165 ++++++++ charts/crusoe-csi-driver/values.yaml | 14 + cmd/crusoe-csi-driver/main.go | 26 ++ examples/web_server.yaml | 38 ++ go.mod | 29 ++ go.sum | 66 ++++ internal/config/config.go | 21 + internal/crusoe-csi-driver.go | 217 +++++++++++ internal/driver/auth.go | 197 ++++++++++ internal/driver/controller.go | 313 +++++++++++++++ internal/driver/controller_util.go | 366 ++++++++++++++++++ internal/driver/driver.go | 57 +++ internal/driver/identity.go | 81 ++++ internal/driver/node.go | 192 +++++++++ internal/driver/node_util.go | 54 +++ internal/driver/secrets.go | 45 +++ internal/driver/util.go | 224 +++++++++++ internal/locker/locker.go | 56 +++ scripts/tag_semver.sh | 25 ++ versions.env | 2 + 34 files changed, 2833 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/build-publish-image.yml create mode 100644 .github/workflows/golangci-lint.yml create mode 100644 .github/workflows/tag-release.yml create mode 100644 .gitignore create mode 100644 .gitlab-ci.yml create mode 100644 .gitlab/CODEOWNERS create mode 100644 .golangci.yml create mode 100644 Dockerfile create mode 100644 Makefile create mode 100644 charts/crusoe-csi-driver/Chart.yaml create mode 100644 charts/crusoe-csi-driver/templates/controller.yaml create mode 100644 charts/crusoe-csi-driver/templates/csidriver.yaml create mode 100644 charts/crusoe-csi-driver/templates/node.yaml create mode 100644 charts/crusoe-csi-driver/templates/rbac.yaml create mode 100644 charts/crusoe-csi-driver/values.yaml create mode 100644 cmd/crusoe-csi-driver/main.go create mode 100644 examples/web_server.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/config/config.go create mode 100644 internal/crusoe-csi-driver.go create mode 100644 internal/driver/auth.go create mode 100644 internal/driver/controller.go create mode 100644 internal/driver/controller_util.go create mode 100644 internal/driver/driver.go create mode 100644 internal/driver/identity.go create mode 100644 internal/driver/node.go create mode 100644 internal/driver/node_util.go create mode 100644 internal/driver/secrets.go create mode 100644 internal/driver/util.go create mode 100644 internal/locker/locker.go create mode 100644 scripts/tag_semver.sh create mode 100644 versions.env diff --git a/.github/workflows/build-publish-image.yml b/.github/workflows/build-publish-image.yml new file mode 100644 index 0000000..32e0542 --- /dev/null +++ b/.github/workflows/build-publish-image.yml @@ -0,0 +1,46 @@ +name: build and publish + +on: + push: + tags: + - v* + +permissions: + contents: read + packages: write + attestations: write + id-token: write + +jobs: + docker: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Log into the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build and push + uses: docker/build-push-action@v6 + with: + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ steps.meta.outputs.tags }} + + - name: Generate artifact attestation + uses: actions/attest-build-provenance@v1 + with: + subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}} + subject-digest: ${{ steps.push.outputs.digest }} + push-to-registry: true diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml new file mode 100644 index 0000000..02ab945 --- /dev/null +++ b/.github/workflows/golangci-lint.yml @@ -0,0 +1,24 @@ +name: golangci-lint +on: + push: + tags: + - v* + branches: + - main + pull_request: +permissions: + contents: read +jobs: + golangci: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v4 + with: + go-version: "1.22" + cache: false + - uses: actions/checkout@v3 + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.55.2 diff --git a/.github/workflows/tag-release.yml b/.github/workflows/tag-release.yml new file mode 100644 index 0000000..d16a814 --- /dev/null +++ b/.github/workflows/tag-release.yml @@ -0,0 +1,43 @@ +name: semver tag + +on: + push: + branches: + - main + +permissions: + contents: write + +jobs: + semver-tag: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Set up Git + run: | + git config user.name "crusoe-cloud" + git config user.email "support@crusoecloud.com" + + - name: Load versions from text file + run: | + source versions.env + echo "MAJOR_VERSION=${MAJOR_VERSION}" >> $GITHUB_ENV + echo "MINOR_VERSION=${MINOR_VERSION}" >> $GITHUB_ENV + + - name: Calculate and set new version + run: | + chmod +x ./scripts/tag_semver.sh + ./scripts/tag_semver.sh $MAJOR_VERSION $MINOR_VERSION "" + shell: bash + + - name: Read version and push tag + run: | + source variables.env + echo "Calculated version: $RELEASE_VERSION" + source variables.env + git tag $RELEASE_VERSION + git push origin $RELEASE_VERSION \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2263116 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +*.jar +*.swp +*tmp/ +.idea/ +.DS_Store +.vscode/* +aliaslint.so +coverage.out +dist/ +gokart_result.json +golangci-lint.json +node_modules/ +package*.json +tests.xml +coverage.xml diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..ef453b2 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,42 @@ +include: + - project: 'crusoeenergy/tools' + file: '/templates/go.gitlab-ci.yml' + +variables: + CI_IMAGE: registry.gitlab.com/crusoeenergy/tools/go-ci-1.22 + +test_and_lint: + rules: + - if: '$CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH || $CI_COMMIT_BRANCH == "release"' + changes: !reference [.code-changes, changes] + - if: '$CI_MERGE_REQUEST_ID' + changes: !reference [.code-changes, changes] + # don't run CI for semver tags, do run it for custom tags + - if: '$CI_COMMIT_TAG && $CI_COMMIT_TAG !~ /^(.+\/)?v[0-9]+\.[0-9]+\.[0-9]+$/' + +build_and_push: + script: + - |- + if [ "${BUILD_IN_SUBPATH:-true}" == "true" ]; then + cd ${SUBPROJECT_REL_PATH} + fi + - docker login -u gitlab-ci-token -p $CI_JOB_TOKEN registry.gitlab.com + # Build an image based on the commit ref, this will be available + # internally for testing feature branches and testing pre-release versions + - docker build --no-cache --pull -f "${DOCKERFILE:-./Dockerfile}" --build-arg CI_SERVER_HOST="$CI_SERVER_HOST" --build-arg CI_JOB_TOKEN="$CI_JOB_TOKEN" --build-arg CRUSOE_CSI_DRIVER_NAME="$CRUSOE_CSI_DRIVER_NAME" --build-arg CRUSOE_CSI_DRIVER_VERSION="$CI_COMMIT_REF_NAME" -t $RELEASE_IMAGE:"${CI_COMMIT_REF_NAME##*/}" . + - docker push $RELEASE_IMAGE:"${CI_COMMIT_REF_NAME##*/}" + +# Remove the tag_semver and pages jobs from merges into main. +# The tag_semver job will be run using a GitHub action instead +# and new versions will be pushed then. +tag_semver: + rules: + - when: never + +code_intelligence: + rules: + - when: never + +pages: + rules: + - when: never \ No newline at end of file diff --git a/.gitlab/CODEOWNERS b/.gitlab/CODEOWNERS new file mode 100644 index 0000000..cc17421 --- /dev/null +++ b/.gitlab/CODEOWNERS @@ -0,0 +1,12 @@ +# default = everyone is codeowner +* @crusoeenergy/island + +.gitlab/CODEOWNERS @lanwall12 @nitper @bsherrycrusoe @ksosnowski3 @dpattishall + +# more restrictive ownership for adding dependencies and changing ci/build +.gitlab-ci.yml @lanwall12 @nitper @crusoeenergy/island/ccx +.golangci-lint.yml @lanwall12 @nitper @crusoeenergy/island/ccx +go.mod @lanwall12 @nitper @crusoeenergy/island/ccx +Makefile @lanwall12 @nitper @crusoeenergy/island/ccx +Dockerfile @lanwall12 @nitper @crusoeenergy/island/ccx +Dockerfile.* @lanwall12 @nitper @crusoeenergy/island/ccx diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..8b5b3c8 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,118 @@ +# see https://golangci-lint.run/usage/configuration/ +linters-settings: + errcheck: + # report about not checking of errors in type assertions: `a := b.(MyStruct)`; + # default is false: such cases aren't reported by default. + check-type-assertions: true + + # report about assignment of errors to blank identifier: `num, _ := strconv.Atoi(numStr)`; + # default is false: such cases aren't reported by default. + check-blank: true + govet: + check-shadowing: true + gci: + sections: + - standard + - default + - prefix(github.com/crusoecloud) + gocritic: + enabled-tags: + - diagnostic + - experimental + - opinionated + - performance + - style + disabled-checks: + - commentedOutCode + whitespace: + multi-if: true # Enforces newlines (or comments) after every multi-line if statement + gosec: + global: + audit: enabled # Run extra checks that might be "nosy" + gomoddirectives: + replace-allow-list: + - github.com/crusoecloud/client-go + +linters: + disable-all: true + enable: + - asciicheck + - bodyclose + - cyclop + - dogsled + - dupl + - durationcheck + - errcheck + - errorlint + - exhaustive + - exportloopref + - forbidigo + - forcetypeassert + - funlen + - gci + - gochecknoinits + - gochecknoglobals + - gocognit + - goconst + - gocritic + - gocyclo + - godot + - goerr113 + - gofmt + - gofumpt + - goheader + - goimports + - gomnd + - gomoddirectives + - gomodguard + - goprintffuncname + - gosec + - gosimple + - govet + - ineffassign + - lll + - makezero + - misspell + - nakedret + - nestif + - nilerr + - nlreturn + - noctx + - nolintlint + - paralleltest + - prealloc + - predeclared + - promlinter + - revive + - rowserrcheck + - sqlclosecheck + - staticcheck + - stylecheck + - tagliatelle + - testpackage + - thelper + - tparallel + - typecheck + - unconvert + - unparam + - unused + - wastedassign + - whitespace + - wrapcheck + +issues: + # Excluding configuration per-path, per-linter, per-text and per-source + exclude-rules: + # Exclude lll issues for long lines with go:generate + - linters: + - lll + source: "^//go:generate " + +run: + # include test files or not + tests: true + +# golangci.com configuration +# https://github.com/golangci/golangci/wiki/Configuration +service: + golangci-lint-version: 1.50.1 # use a fixed version for consistent results diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8ce3893 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +########################################## +# STEP 1: build crusoe-csi-driver binary # +########################################## + +FROM golang:1.22 as builder + +ARG CRUSOE_CSI_DRIVER_NAME +ENV CRUSOE_CSI_DRIVER_NAME=$CRUSOE_CSI_DRIVER_NAME +ARG CRUSOE_CSI_DRIVER_VERSION +ENV CRUSOE_CSI_DRIVER_VERSION=$CRUSOE_CSI_DRIVER_VERSION + +WORKDIR /build +COPY . . + +RUN make cross + +################################################################ +# STEP 2: build a small image and run crusoe-csi-driver binary # +################################################################ +FROM alpine + +# Need to get these updates for k8s mount-utils library to work properly +RUN apk update && \ + apk add --no-cache e2fsprogs && \ + apk add --no-cache blkid && \ + rm -rf /var/cache/apk/* + +COPY --from=builder /build/dist/crusoe-csi-driver /usr/local/go/bin/crusoe-csi-driver + +ENTRYPOINT ["/usr/local/go/bin/crusoe-csi-driver"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..781b4be --- /dev/null +++ b/Makefile @@ -0,0 +1,81 @@ +PREFIX?=$(shell pwd) + +NAME := crusoe-csi-driver +PKG := github.com/crusoecloud/crusoe-csi-driver/cmd/$(NAME) + +BUILDDIR := ${PREFIX}/dist +# Set any default go build tags +BUILDTAGS := + +GOLANGCI_VERSION = v1.55.2 +GO_ACC_VERSION = latest +GOTESTSUM_VERSION = latest +GOCOVER_VERSION = latest + +GO_LDFLAGS=-ldflags "-X 'github.com/crusoecloud/crusoe-csi-driver/internal/driver.version=$$CRUSOE_CSI_DRIVER_VERSION' -X 'github.com/crusoecloud/crusoe-csi-driver/internal/driver.name=$$CRUSOE_CSI_DRIVER_NAME'" + +.PHONY: run +run: + go run cmd/crusoe-csi-driver/main.go + +.PHONY: dev +dev: test build-deps lint ## Runs a build-deps, test, lint + +.PHONY: ci +ci: test-ci build-deps lint-ci ## Runs test, build-deps, lint + +.PHONY: build-deps +build-deps: ## Install build dependencies + @echo "==> $@" + @go install github.com/golangci/golangci-lint/cmd/golangci-lint@${GOLANGCI_VERSION} + +.PHONY: precommit +precommit: ## runs various formatters that will be checked by linter (but can/should be automatic in your editor) + @echo "==> $@" + @go mod tidy + @golangci-lint run --fix ./... + +.PHONY: test +test: ## Runs the go tests. + @echo "==> $@" + @go test -tags "$(BUILDTAGS)" -cover -race -v ./... + +.PHONY: test-ci +test-ci: ## Runs the go tests with additional options for a CI environment + @echo "==> $@" + @go mod tidy + @git diff --exit-code go.mod go.sum # fail if go.mod is not tidy + @go install github.com/ory/go-acc@${GO_ACC_VERSION} + @go install gotest.tools/gotestsum@${GOTESTSUM_VERSION} + @go install github.com/boumenot/gocover-cobertura@${GOCOVER_VERSION} + @gotestsum --junitfile tests.xml --raw-command -- go-acc -o coverage.out ./... -- -json -tags "$(BUILDTAGS)" -race -v + @go tool cover -func=coverage.out + @gocover-cobertura < coverage.out > coverage.xml + +.PHONY: lint +lint: ## Verifies `golangci-lint` passes + @echo "==> $@" + @golangci-lint version + @golangci-lint run ./... + +.PHONY: lint-ci +lint-ci: ## Verifies `golangci-lint` passes and outputs in CI-friendly format + @echo "==> $@" + @golangci-lint version + @golangci-lint run ./... --out-format code-climate > golangci-lint.json + +.PHONY: build +build: ## Builds the executable and places it in the build dir + @go build -o ${BUILDDIR}/${NAME} ${PKG} + +.PHONY: cross +cross: ## Builds the cross compiled executable for use within a container + @GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o ${BUILDDIR}/${NAME} ${GO_LDFLAGS} ${PKG} + +.PHONY: install +install: ## Builds and installs the executable on PATH + @go install ${PKG} + +.PHONY: help +help: + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' diff --git a/README.md b/README.md index ec383dd..c260123 100644 --- a/README.md +++ b/README.md @@ -1 +1,39 @@ -# crusoe-csi-driver \ No newline at end of file +# Crusoe Container Storage Interface (CSI) Driver + +This repository defines the official Container Storage Interface (CSI) Driver for use with [Crusoe Cloud](https://crusoecloud.com/), the world's first carbon-reducing, low-cost GPU cloud platform. + +## Getting Started + +### Prerequisites + +This guide assumes that the user has already set up a Container Orchestrator (CO) on Crusoe Cloud compute. +If you have not, we would recommend beginning by deploying one of our existing solutions – +the [Crusoe Cloud RKE2 solution](https://github.com/crusoecloud/crusoe-ml-rke2) is a great place to start. + +### Setting up credentials + +As the CSI Driver will communicate with the Crusoe Cloud API to orchestrate storage operations, you will have to set up +credentials in your Kubernetes cluster which the driver can then use to communicate with the API. Here is a `.yaml` file +which can be modified with your credentials and applied to your cluster (using `kubectl apply -f credentials.yaml`). + +```yaml +apiVersion: v1 +data: + crusoe-csi-accesskey: +kind: Secret +type: Opaque +metadata: + name: crusoe-csi-accesskey +--- +apiVersion: v1 +data: + crusoe-csi-secretkey: +kind: Secret +type: Opaque +metadata: + name: crusoe-csi-secretkey +``` + +### Installing the Driver + +We recommend using Helm to install the CSI driver. \ No newline at end of file diff --git a/charts/crusoe-csi-driver/Chart.yaml b/charts/crusoe-csi-driver/Chart.yaml new file mode 100644 index 0000000..089167c --- /dev/null +++ b/charts/crusoe-csi-driver/Chart.yaml @@ -0,0 +1,18 @@ +apiVersion: v2 +name: crusoe-csi-driver +description: A Helm chart for the Crusoe Cloud CSI Driver + +type: application +version: 0.0.1 +appVersion: 0.0.1 + +kubeVersion: ">=1.17.0-0" +home: https://github.com/crusoecloud/crusoe-csi-driver +sources: + - https://github.com/crusoecloud/crusoe-csi-driver +keywords: + - crusoe + - csi +maintainers: + - name: Crusoe Cloud + url: https://github.com/crusoecloud/crusoe-csi-driver/ diff --git a/charts/crusoe-csi-driver/templates/controller.yaml b/charts/crusoe-csi-driver/templates/controller.yaml new file mode 100644 index 0000000..89f57c4 --- /dev/null +++ b/charts/crusoe-csi-driver/templates/controller.yaml @@ -0,0 +1,85 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.Release.Name }}-controller +spec: + replicas: {{.Values.replicaCount }} + selector: + matchLabels: + app: {{.Release.Name }} + template: + metadata: + labels: + app: {{.Release.Name }} + spec: + imagePullSecrets: + - name: registry-credentials + serviceAccountName: crusoe-csi-driver-controller-sa + containers: + - name: crusoe-csi-driver + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + args: ["--services", "controller,identity", "--socket-address", "unix:/csi/csi-controller.sock"] + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + envFrom: + - secretRef: + name: crusoe-csi-accesskey + optional: false + - secretRef: + name: crusoe-csi-secretkey + optional: false + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: mountpoint-dir + mountPath: /var/lib/kubelet + mountPropagation: "Bidirectional" + securityContext: + privileged: true + - name: csi-attacher + image: registry.k8s.io/sig-storage/csi-attacher:v4.4.0 + args: + - "--v=5" + - "--csi-address=/csi/csi-controller.sock" + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: csi-provisioner + image: registry.k8s.io/sig-storage/csi-provisioner:v3.6.0 + args: + - "--v=5" + - "--csi-address=/csi/csi-controller.sock" + - "--feature-gates=Topology=true" + - "--extra-create-metadata" + - "--default-fstype=ext4" + - "--controller-publish-readonly=true" + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: csi-resizer + image: registry.k8s.io/sig-storage/csi-resizer:v1.9.0 + args: + - "--v=5" + - "--csi-address=/csi/csi-controller.sock" + - "--handle-volume-inuse-error=false" + volumeMounts: + - name: socket-dir + mountPath: /csi + volumes: + # This volume is where the socket for kubelet->driver communication is done + - name: socket-dir + hostPath: + path: /var/lib/kubelet/plugins/{{.Release.Name }} + type: DirectoryOrCreate + # This volume is where the driver mounts volumes + - name: mountpoint-dir + hostPath: + path: /var/lib/kubelet + type: Directory \ No newline at end of file diff --git a/charts/crusoe-csi-driver/templates/csidriver.yaml b/charts/crusoe-csi-driver/templates/csidriver.yaml new file mode 100644 index 0000000..1da5655 --- /dev/null +++ b/charts/crusoe-csi-driver/templates/csidriver.yaml @@ -0,0 +1,7 @@ +apiVersion: storage.k8s.io/v1 +kind: CSIDriver +metadata: + name: {{ .Values.driverName }} +spec: + attachRequired: true + podInfoOnMount: false \ No newline at end of file diff --git a/charts/crusoe-csi-driver/templates/node.yaml b/charts/crusoe-csi-driver/templates/node.yaml new file mode 100644 index 0000000..4b1db08 --- /dev/null +++ b/charts/crusoe-csi-driver/templates/node.yaml @@ -0,0 +1,85 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: {{.Release.Name }}-node +spec: + selector: + matchLabels: + app: {{.Release.Name }} + template: + metadata: + labels: + app: {{.Release.Name }} + spec: + imagePullSecrets: + - name: registry-credentials + containers: + - name: crusoe-csi-driver + image: "registry.gitlab.com/crusoeenergy/island/external/crusoe-csi-driver:latest" + args: + - "--services=node,identity" + - "--socket-address=unix:/csi/csi-node.sock" + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + envFrom: + - secretRef: + name: crusoe-csi-accesskey + optional: false + - secretRef: + name: crusoe-csi-secretkey + optional: false + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: mountpoint-dir + mountPath: /var/lib/kubelet + mountPropagation: "Bidirectional" + - name: device-dir + mountPath: /dev + securityContext: + privileged: true + - name: csi-driver-registrar + image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.9.0 + args: + - "--v=5" + - "--csi-address=/csi/csi-node.sock" + - "--kubelet-registration-path=/var/lib/kubelet/plugins/{{.Release.Name }}/csi-node.sock" + env: + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: registration-dir + mountPath: /registration + volumes: + # This volume is where the socket for kubelet->driver communication is done + - name: socket-dir + hostPath: + path: /var/lib/kubelet/plugins/{{.Release.Name }} + type: DirectoryOrCreate + # This volume is where the driver mounts volumes + - name: mountpoint-dir + hostPath: + path: /var/lib/kubelet + type: Directory + # This volume is where the node-driver-registrar registers the plugin + # with kubelet + - name: registration-dir + hostPath: + path: /var/lib/kubelet/plugins_registry + type: Directory + # This volume is where devices are located on the underlying instance + - name: device-dir + hostPath: + path: /dev + type: Directory \ No newline at end of file diff --git a/charts/crusoe-csi-driver/templates/rbac.yaml b/charts/crusoe-csi-driver/templates/rbac.yaml new file mode 100644 index 0000000..696974b --- /dev/null +++ b/charts/crusoe-csi-driver/templates/rbac.yaml @@ -0,0 +1,165 @@ +# This file contains the objects representing the Service +# Accounts and RBAC tied to that account that need to be +# made for the CSI controller deployment. This is so the +# CSI controller sidecars have permissions to watch the +# K8s API to see if PVCs are created. +apiVersion: v1 +kind: ServiceAccount +metadata: + name: crusoe-csi-driver-controller-sa +namespace: {{ .Release.Namespace }} +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: external-provisioner-runner +rules: + - apiGroups: [""] + resources: ["persistentvolumes"] + verbs: ["get", "list", "watch", "create", "patch", "delete"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "watch", "update"] + - apiGroups: ["storage.k8s.io"] + resources: ["storageclasses"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["events"] + verbs: ["list", "watch", "create", "update", "patch"] + - apiGroups: ["snapshot.storage.k8s.io"] + resources: ["volumesnapshots"] + verbs: ["get", "list"] + - apiGroups: ["snapshot.storage.k8s.io"] + resources: ["volumesnapshotcontents"] + verbs: ["get", "list"] + - apiGroups: ["storage.k8s.io"] + resources: ["csinodes"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch"] + - apiGroups: ["storage.k8s.io"] + resources: ["volumeattachments"] + verbs: ["get", "list", "watch"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: csi-provisioner-role +subjects: + - kind: ServiceAccount + name: crusoe-csi-driver-controller-sa + namespace: {{ .Release.Namespace }} +roleRef: + kind: ClusterRole + name: external-provisioner-runner + apiGroup: rbac.authorization.k8s.io +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: csi-provisioner-role-cfg +namespace: {{ .Release.Namespace }} +subjects: + - kind: ServiceAccount + name: crusoe-csi-driver-controller-sa + namespace: {{ .Release.Namespace }} +roleRef: + kind: Role + name: external-provisioner-cfg + apiGroup: rbac.authorization.k8s.io +--- +# Attacher must be able to work with PVs, CSINodes and VolumeAttachments +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: external-attacher-runner +rules: + - apiGroups: [""] + resources: ["persistentvolumes"] + verbs: ["get", "list", "watch", "patch"] + - apiGroups: ["storage.k8s.io"] + resources: ["csinodes"] + verbs: ["get", "list", "watch"] + - apiGroups: ["storage.k8s.io"] + resources: ["volumeattachments"] + verbs: ["get", "list", "watch", "patch"] + - apiGroups: ["storage.k8s.io"] + resources: ["volumeattachments/status"] + verbs: ["patch"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: csi-attacher-role +subjects: + - kind: ServiceAccount + name: crusoe-csi-driver-controller-sa + namespace: {{ .Release.Namespace }} +roleRef: + kind: ClusterRole + name: external-attacher-runner + apiGroup: rbac.authorization.k8s.io +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: csi-attacher-role-cfg +namespace: {{ .Release.Namespace }} +subjects: + - kind: ServiceAccount + name: crusoe-csi-driver-controller-sa + namespace: {{ .Release.Namespace }} +roleRef: + kind: Role + name: external-attacher-cfg + apiGroup: rbac.authorization.k8s.io +# Resizer must be able to work with PVCs, PVs, SCs. +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: external-resizer-runner +rules: + - apiGroups: [""] + resources: ["persistentvolumes"] + verbs: ["get", "list", "watch", "patch"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["persistentvolumeclaims/status"] + verbs: ["patch"] + - apiGroups: [""] + resources: ["events"] + verbs: ["list", "watch", "create", "update", "patch"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: csi-resizer-role +subjects: + - kind: ServiceAccount + name: crusoe-csi-driver-controller-sa + namespace: {{ .Release.Namespace }} +roleRef: + kind: ClusterRole + name: external-resizer-runner + apiGroup: rbac.authorization.k8s.io +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: csi-resizer-role-cfg + namespace: {{ .Release.Namespace }} +subjects: + - kind: ServiceAccount + name: crusoe-csi-driver-controller-sa + namespace: {{ .Release.Namespace }} +roleRef: + kind: Role + name: external-resizer-cfg + apiGroup: rbac.authorization.k8s.io \ No newline at end of file diff --git a/charts/crusoe-csi-driver/values.yaml b/charts/crusoe-csi-driver/values.yaml new file mode 100644 index 0000000..53d2f6d --- /dev/null +++ b/charts/crusoe-csi-driver/values.yaml @@ -0,0 +1,14 @@ +# Default values for crusoe-csi-driver. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. +replicaCount: 1 + +image: + repository: ghcr.io/crusoecloud/crusoe-csi-driver + tag: "main" + +driverName: "csi.crusoe.ai" + +resources: {} + +ingress: {} diff --git a/cmd/crusoe-csi-driver/main.go b/cmd/crusoe-csi-driver/main.go new file mode 100644 index 0000000..924dc0d --- /dev/null +++ b/cmd/crusoe-csi-driver/main.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/crusoecloud/crusoe-csi-driver/internal" + "github.com/crusoecloud/crusoe-csi-driver/internal/config" +) + +// Start executing the Crusoe CSI driver. +func main() { + rootCmd := &cobra.Command{ + Use: "crusoe-csi-driver", + Short: "Crusoe Container Storage Interface (CSI) driver", + Args: cobra.NoArgs, + RunE: internal.RunDriver, + } + config.AddFlags(rootCmd) + if err := rootCmd.Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/examples/web_server.yaml b/examples/web_server.yaml new file mode 100644 index 0000000..e98078a --- /dev/null +++ b/examples/web_server.yaml @@ -0,0 +1,38 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: crusoe-csi-driver-sc +provisioner: csi.crusoe.ai +parameters: + type: pd-standard +volumeBindingMode: WaitForFirstConsumer +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: podpvc +spec: + accessModes: + - ReadWriteOnce + storageClassName: crusoe-csi-driver-sc + resources: + requests: + storage: 10Gi + volumeMode: Block +--- +apiVersion: v1 +kind: Pod +metadata: + name: web-server +spec: + containers: + - name: web-server + image: nginx + volumeDevices: + - devicePath: /var/lib/www/html + name: mypvc + volumes: + - name: mypvc + persistentVolumeClaim: + claimName: podpvc + readOnly: false diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8041665 --- /dev/null +++ b/go.mod @@ -0,0 +1,29 @@ +module github.com/crusoecloud/crusoe-csi-driver + +go 1.22.0 + +require ( + github.com/antihax/optional v1.0.0 + github.com/container-storage-interface/spec v1.9.0 + github.com/crusoecloud/client-go v0.1.47 + github.com/spf13/cobra v1.8.0 + google.golang.org/grpc v1.57.0 + k8s.io/klog/v2 v2.120.1 + k8s.io/mount-utils v0.30.1 + k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 +) + +require ( + github.com/go-logr/logr v1.4.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/moby/sys/mountinfo v0.6.2 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/oauth2 v0.7.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6567d82 --- /dev/null +++ b/go.sum @@ -0,0 +1,66 @@ +github.com/antihax/optional v1.0.0 h1:xK2lYat7ZLaVVcIuj82J8kIro4V6kDe0AUDFboUCwcg= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/container-storage-interface/spec v1.9.0 h1:zKtX4STsq31Knz3gciCYCi1SXtO2HJDecIjDVboYavY= +github.com/container-storage-interface/spec v1.9.0/go.mod h1:ZfDu+3ZRyeVqxZM0Ds19MVLkN2d1XJ5MAfi1L3VjlT0= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/crusoecloud/client-go v0.1.47 h1:wyKZyRkcpNI140n+QyYPKutynuLAJxJLQdXvfjXKLsA= +github.com/crusoecloud/client-go v0.1.47/go.mod h1:k1FgpUllEJtE53osEwsF+JfbFKILn5t3UuBdHYBVpdY= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= +github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= +golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 h1:eSaPbMR4T7WfH9FvABk36NBMacoTUKdWCvV0dx+KfOg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5/go.mod h1:zBEcrKX2ZOcEkHWxBPAIvYUWOKKMIhYcmNiUIu2ji3I= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/mount-utils v0.30.1 h1:4HEFqo2bzRjCHHXRu7yQh6tvpMnplwWaqhuU7oE3710= +k8s.io/mount-utils v0.30.1/go.mod h1:9sCVmwGLcV1MPvbZ+rToMDnl1QcGozy+jBPd0MsQLIo= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..31ec3fe --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,21 @@ +package config + +import "github.com/spf13/cobra" + +const ( + APIEndpointFlag = "api-endpoint" + APIEndpointDefault = "https://api.crusoecloud.com/v1alpha5" + SocketAddressFlag = "socket-address" + SocketAddressDefault = "unix:/tmp/csi.sock" + ServicesFlag = "services" +) + +// AddFlags attaches the CLI flags the CSI Driver needs to the provided command. +func AddFlags(cmd *cobra.Command) { + cmd.Flags().String(APIEndpointFlag, APIEndpointDefault, + "Crusoe API Endpoint") + cmd.Flags().String(SocketAddressFlag, SocketAddressDefault, + "Socket which the gRPC server will listen on") + cmd.Flags().StringSlice(ServicesFlag, []string{}, + "CSI Driver services to return") +} diff --git a/internal/crusoe-csi-driver.go b/internal/crusoe-csi-driver.go new file mode 100644 index 0000000..abec38e --- /dev/null +++ b/internal/crusoe-csi-driver.go @@ -0,0 +1,217 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "net" + "net/url" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "github.com/spf13/cobra" + "google.golang.org/grpc" + "k8s.io/klog/v2" + + crusoeapi "github.com/crusoecloud/client-go/swagger/v1alpha5" + "github.com/crusoecloud/crusoe-csi-driver/internal/config" + "github.com/crusoecloud/crusoe-csi-driver/internal/driver" +) + +const ( + maxRetries = 10 + retryIntervalSeconds = 5 + identityArg = "identity" + controllerArg = "controller" + nodeArg = "node" +) + +var ( + errAccessKeyEmpty = errors.New("access key is empty") + errSecretKeyEmpty = errors.New("secret key is empty") + errNoServicesProvided = errors.New("cannot initialize CSI driver with no services") +) + +type service interface { + Init(apiClient *crusoeapi.APIClient, driver *driver.Config, services []driver.Service) error + RegisterServer(srv *grpc.Server) error +} + +// RunDriver starts up and runs the Crusoe Cloud CSI Driver. +// +//nolint:funlen,cyclop // a lot statements here because all set up is done here, already factored +func RunDriver(cmd *cobra.Command, _ /*args*/ []string) error { + // Listen for interrupt signals. + interrupt := make(chan os.Signal, 1) + // Ctrl-C + signal.Notify(interrupt, os.Interrupt) + // this is what docker sends when shutting down a container + signal.Notify(interrupt, syscall.SIGTERM) + var wg sync.WaitGroup + wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-ctx.Done(): + return + + case <-interrupt: + wg.Done() + cancel() + } + }() + + requestedServices, accessKey, secretKey, socketAddress, apiEndpoint, parseErr := parseAndValidateArguments(cmd) + if parseErr != nil { + return fmt.Errorf("failed to parse arguments: %w", parseErr) + } + + // get endpoint from flags + endpointURL, err := url.Parse(socketAddress) + if err != nil { + return fmt.Errorf("failed to parse socket address (%s): %w", endpointURL, err) + } + + listener, listenErr := startListener(endpointURL) + if listenErr != nil { + return fmt.Errorf("failed to start listener on provided socket url: %w", listenErr) + } + + klog.Infof("Started listener on: %s (scheme: %s)", endpointURL.Path, endpointURL.Scheme) + + srv := grpc.NewServer() + + grpcServers := []service{} + for _, grpcSrvc := range requestedServices { + switch grpcSrvc { + case driver.ControllerService: + grpcServers = append(grpcServers, driver.NewControllerServer()) + case driver.NodeService: + grpcServers = append(grpcServers, driver.NewNodeServer()) + case driver.IdentityService: + grpcServers = append(grpcServers, driver.NewIdentityServer()) + } + } + + if len(grpcServers) == 0 { + return errNoServicesProvided + } + + apiClient := driver.NewAPIClient(apiEndpoint, accessKey, secretKey, + fmt.Sprintf("%s/%s", driver.GetVendorName(), driver.GetVendorVersion())) + + instanceID, projectID, location, err := driver.GetInstanceID(ctx, apiClient) + if err != nil { + return fmt.Errorf("failed to get instance id of nodeL %w", err) + } + + crusoeDriver := &driver.Config{ + VendorName: driver.GetVendorName(), + VendorVersion: driver.GetVendorVersion(), + NodeID: instanceID, + NodeProject: projectID, + NodeLocation: location, + } + + // Initialize gRPC services and register with the gRPC servers + for _, server := range grpcServers { + initErr := server.Init(apiClient, crusoeDriver, requestedServices) + if initErr != nil { + return fmt.Errorf("failed to initialize server: %w", initErr) + } + + registerErr := server.RegisterServer(srv) + if registerErr != nil { + return fmt.Errorf("failed to register server: %w", registerErr) + } + } + + go func() { + err = srv.Serve(listener) + }() + + wg.Wait() + + srv.GracefulStop() + + return nil +} + +//nolint:gocritic,cyclop // there are a lot of returned variables here because we parse all args here +func parseAndValidateArguments(cmd *cobra.Command) ( + requestedServices []driver.Service, + accessKey, secretKey, socketAddress, apiEndpoint string, + err error, +) { + accessKey = driver.GetCrusoeAccessKey() + if accessKey == "" { + return nil, "", "", "", "", errAccessKeyEmpty + } + secretKey = driver.GetCrusoeSecretKey() + if secretKey == "" { + return nil, "", "", "", "", errSecretKeyEmpty + } + + services, err := cmd.Flags().GetStringSlice(config.ServicesFlag) + if err != nil { + return nil, "", "", "", "", + fmt.Errorf("failed to get services flag: %w", err) + } + requestedServices = []driver.Service{} + for _, reqService := range services { + switch reqService { + case identityArg: + requestedServices = append(requestedServices, driver.IdentityService) + case controllerArg: + requestedServices = append(requestedServices, driver.ControllerService) + case nodeArg: + requestedServices = append(requestedServices, driver.NodeService) + default: + //nolint:goerr113 // use dynamic errors for more informative error handling + return nil, "", "", "", "", + fmt.Errorf("received unknown service type: %s", reqService) + } + } + socketAddress, err = cmd.Flags().GetString(config.SocketAddressFlag) + if err != nil { + return nil, "", "", "", "", + fmt.Errorf("failed to get socket address flag: %w", err) + } + apiEndpoint, err = cmd.Flags().GetString(config.APIEndpointFlag) + if err != nil { + return nil, "", "", "", "", + fmt.Errorf("failed to get api endpoint flag: %w", err) + } + + return requestedServices, accessKey, secretKey, socketAddress, apiEndpoint, nil +} + +func startListener(endpointURL *url.URL) (net.Listener, error) { + tryCount := 0 + var listener net.Listener + for { + tryListener, listenErr := net.Listen(endpointURL.Scheme, endpointURL.Path) + if listenErr != nil { + // if old pods are being terminated, they might not have closed the gRPC server listening on the socket + // let's wait and try again + if strings.Contains(listenErr.Error(), "bind: address already in use") && tryCount < maxRetries { + klog.Infof("Address (%s//%s) already in use, retrying...", endpointURL.Scheme, endpointURL.Path) + time.Sleep(retryIntervalSeconds * time.Second) + tryCount++ + + continue + } + + return nil, fmt.Errorf("failed to listen on provided socket: %w", listenErr) + } + listener = tryListener + + break + } + + return listener, nil +} diff --git a/internal/driver/auth.go b/internal/driver/auth.go new file mode 100644 index 0000000..a4d0e0a --- /dev/null +++ b/internal/driver/auth.go @@ -0,0 +1,197 @@ +package driver + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "errors" + "fmt" + "net/http" + "net/url" + "sort" + "strings" + "time" + + crusoeapi "github.com/crusoecloud/client-go/swagger/v1alpha5" +) + +// AuthenticatingTransport is a struct implementing http.Roundtripper +// that authenticates a request to Crusoe Cloud before sending it out. +type AuthenticatingTransport struct { + keyID string + secretKey string + http.RoundTripper +} + +func NewAuthenticatingTransport(r http.RoundTripper, keyID, secretKey string) AuthenticatingTransport { + if r == nil { + r = http.DefaultTransport + } + + return AuthenticatingTransport{ + RoundTripper: r, + keyID: keyID, + secretKey: secretKey, + } +} + +func (t AuthenticatingTransport) RoundTrip(r *http.Request) (*http.Response, error) { + if err := addSignature(r, t.keyID, t.secretKey); err != nil { + return nil, err + } + + //nolint:wrapcheck // error should be forwarded here. + return t.RoundTripper.RoundTrip(r) +} + +const ( + timestampHeader = "X-Crusoe-Timestamp" + authHeader = "Authorization" + authVersion = "1.0" +) + +// Verifies if the token signature is valid for a given request. +func addSignature(req *http.Request, encodedKeyID, encodedKey string) error { + req.Header.Set(timestampHeader, time.Now().UTC().Format(time.RFC3339)) + + message, err := generateMessageV1_0(req) + if err != nil { + return err + } + signature, err := signMessageV1_0(message, encodedKey) + if err != nil { + return err + } + + req.Header.Set(authHeader, + "Bearer "+fmt.Sprintf("%s:%s:%s", authVersion, encodedKeyID, base64.RawURLEncoding.EncodeToString(signature))) + + return nil +} + +// Generates a sha256/hmac checksum of a given message. +func signMessageV1_0(message []byte, encodedKey string) ([]byte, error) { + // Key is b64 encoded. + expectedKey, err := base64.RawURLEncoding.DecodeString(encodedKey) + if err != nil { + return nil, fmt.Errorf("failed to decode key: %w", err) + } + + mac := hmac.New(sha256.New, expectedKey) + mac.Write(message) + + return mac.Sum(nil), nil +} + +// Per RFC, the message consists of: +// --start-- +// http_path\n +// canonicalized_request_params\n +// http_verb\n +// timestamp_header_value\n +// --end--. +func generateMessageV1_0(req *http.Request) ([]byte, error) { + messageString := strings.Builder{} + + // http_path\n + messageString.WriteString(req.URL.Path + "\n") + // canonicalized_request_params\n + canonicalQuery, err := canonicalizeQuery(req.URL.RawQuery) + if err != nil { + return nil, err + } + messageString.WriteString(canonicalQuery + "\n") + // http_verb\n + messageString.WriteString(req.Method + "\n") + // timestamp_header_value\n + messageString.WriteString(req.Header.Get(timestampHeader) + "\n") + + return []byte(messageString.String()), nil +} + +var errSemicolonSeparator = errors.New("invalid semicolon separator in query") + +// Canonicalizes the query into a deterministic string. +// see https://cs.opensource.google/go/go/+/refs/tags/go1.18.8:src/net/url/url.go;l=921 +func canonicalizeQuery(query string) (canonicalQuery string, err error) { + values := make(map[string][]string) + for query != "" { + key := query + if i := strings.IndexAny(key, "&"); i >= 0 { + key, query = key[:i], key[i+1:] + } else { + query = "" + } + if strings.Contains(key, ";") { + err = errSemicolonSeparator + + continue + } + if key == "" { + continue + } + + key, value, _ := strings.Cut(key, "=") + key, err1 := url.QueryUnescape(key) + if err1 != nil { + if err == nil { + err = err1 + } + + continue + } + value, err1 = url.QueryUnescape(value) + if err1 != nil { + if err == nil { + err = err1 + } + + continue + } + values[key] = append(values[key], value) + } + + return encodeQuery(values), err +} + +// encodeQuery encodes a key-value map representing the query into a deterministic string. +// see https://cs.opensource.google/go/go/+/refs/tags/go1.17.6:src/net/url/url.go;l=974 +func encodeQuery(values map[string][]string) string { + if values == nil { + return "" + } + var buf strings.Builder + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + vs := values[k] + keyEscaped := url.QueryEscape(k) + for _, v := range vs { + if buf.Len() > 0 { + buf.WriteByte('&') + } + buf.WriteString(keyEscaped) + buf.WriteByte('=') + buf.WriteString(url.QueryEscape(v)) + } + } + + return buf.String() +} + +// NewAPIClient initializes a new Crusoe API client with the given configuration. +func NewAPIClient(host, key, secret, userAgent string) *crusoeapi.APIClient { + cfg := crusoeapi.NewConfiguration() + cfg.UserAgent = userAgent + cfg.BasePath = host + if cfg.HTTPClient == nil { + cfg.HTTPClient = http.DefaultClient + } + + cfg.HTTPClient.Transport = NewAuthenticatingTransport(cfg.HTTPClient.Transport, key, secret) + + return crusoeapi.NewAPIClient(cfg) +} diff --git a/internal/driver/controller.go b/internal/driver/controller.go new file mode 100644 index 0000000..589f059 --- /dev/null +++ b/internal/driver/controller.go @@ -0,0 +1,313 @@ +package driver + +import ( + "context" + "errors" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" + + crusoeapi "github.com/crusoecloud/client-go/swagger/v1alpha5" +) + +//nolint:gochecknoglobals // we will use this slice to determine what the controller service supports +var controllerServerCapabilities = []csi.ControllerServiceCapability_RPC_Type{ + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, + csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, +} + +const diskUnsatisfactoryMsg = "disk does not satisfied the required capability" + +var ( + errRPCUnimplemented = errors.New("this RPC is currently not implemented") + errExpandVolumeWithSmallerSize = errors.New("disk currently has larger size than expand volume request") +) + +type ControllerServer struct { + apiClient *crusoeapi.APIClient + driver *Config +} + +func NewControllerServer() *ControllerServer { + return &ControllerServer{} +} + +func (c *ControllerServer) Init(apiClient *crusoeapi.APIClient, driver *Config, _ []Service) error { + c.driver = driver + c.apiClient = apiClient + + return nil +} + +func (c *ControllerServer) RegisterServer(srv *grpc.Server) error { + csi.RegisterControllerServer(srv, c) + + return nil +} + +func (c *ControllerServer) CreateVolume(ctx context.Context, + request *csi.CreateVolumeRequest, +) (*csi.CreateVolumeResponse, error) { + klog.Infof("Received request to create volume: %+v", request) + + capabilities := request.GetVolumeCapabilities() + if capErr := validateVolumeCapabilities(capabilities); capErr != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid volume capabilities: %s", capErr.Error()) + } + + reqCapacity := parseCapacity(request.GetCapacityRange()) + createReq, err := getCreateDiskRequest(request.GetName(), reqCapacity, c.driver.GetNodeLocation(), + capabilities, request.GetParameters()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid arguments to create volume: %s", err.Error()) + } + + // We will check if a disk already exists with the provided name + foundDisk, findErr := findDisk(ctx, c.apiClient, c.driver.GetNodeProject(), request.GetName()) + if findErr != nil { + return nil, status.Errorf(codes.FailedPrecondition, + "failed to validate disk if disk already exists: %s", findErr.Error()) + } + var disk *crusoeapi.DiskV1Alpha5 + // If a disk already exists, make sure that it lines up with what we want + if foundDisk != nil { + verifyErr := verifyExistingDisk(foundDisk, createReq) + if verifyErr != nil { + return nil, status.Errorf(codes.AlreadyExists, + "failed to validate disk if disk already exists: %s", verifyErr.Error()) + } + disk = foundDisk + } else { + // Create the disk if it does not already exist + createdDisk, createErr := createDisk(ctx, c.apiClient, c.driver.GetNodeProject(), createReq) + if createErr != nil { + return nil, status.Errorf(codes.Internal, "failed to create disk: %s", createErr.Error()) + } + disk = createdDisk + } + + volume, parseErr := getVolumeFromDisk(disk) + if parseErr != nil { + return nil, status.Errorf(codes.Internal, + "failed to convert crusoe disk to csi volume: %s", parseErr.Error()) + } + + klog.Infof("Successfully created volume with name: %s and capacity: %s", request.GetName(), reqCapacity) + + return &csi.CreateVolumeResponse{ + Volume: volume, + }, nil +} + +func (c *ControllerServer) ControllerExpandVolume(ctx context.Context, + request *csi.ControllerExpandVolumeRequest, +) (*csi.ControllerExpandVolumeResponse, error) { + klog.Infof("Received request to expand volume: %+v", request) + capacityRange := request.GetCapacityRange() + + reqCapacity := parseCapacity(capacityRange) + + disk, getErr := getDisk(ctx, c.apiClient, c.driver.GetNodeProject(), request.GetVolumeId()) + if getErr != nil { + return nil, status.Errorf(codes.FailedPrecondition, "failed to get existing disk: %s", + getErr.Error()) + } + + if disk.Size > reqCapacity { + return nil, status.Errorf(codes.InvalidArgument, "invalid expand volume request: %s", + errExpandVolumeWithSmallerSize.Error()) + } + + patchReq := &crusoeapi.DisksPatchRequest{ + Size: reqCapacity, + } + + volumeID := request.GetVolumeId() + + updatedDisk, updateErr := updateDisk(ctx, c.apiClient, c.driver.GetNodeProject(), volumeID, patchReq) + if updateErr != nil { + return nil, updateErr + } + + newBytes, err := convertStorageUnitToBytes(updatedDisk.Size) + if err != nil { + return nil, err + } + + klog.Infof("Successfully expanded volume with ID: %s", request.GetVolumeId()) + + return &csi.ControllerExpandVolumeResponse{ + CapacityBytes: newBytes, + NodeExpansionRequired: false, + }, nil +} + +func (c *ControllerServer) DeleteVolume(ctx context.Context, + request *csi.DeleteVolumeRequest, +) (*csi.DeleteVolumeResponse, error) { + err := deleteDisk(ctx, c.apiClient, c.driver.GetNodeProject(), request.GetVolumeId()) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to delete disk: %s", err.Error()) + } + + return &csi.DeleteVolumeResponse{}, nil +} + +func (c *ControllerServer) ControllerPublishVolume(ctx context.Context, + request *csi.ControllerPublishVolumeRequest, +) (*csi.ControllerPublishVolumeResponse, error) { + klog.Infof("Received request to publish volume: %+v", request) + diskID := request.GetVolumeId() + instanceID := request.GetNodeId() + attachmentMode, err := getAttachmentTypeFromVolumeCapability(request.GetVolumeCapability()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "received unexpected capability: %s", err.Error()) + } + + attachment := crusoeapi.DiskAttachment{ + AttachmentType: dataDiskAttachmentType, + DiskId: diskID, + Mode: attachmentMode, + } + + attachReq := &crusoeapi.InstancesAttachDiskPostRequestV1Alpha5{ + AttachDisks: []crusoeapi.DiskAttachment{attachment}, + } + + attachErr := attachDisk(ctx, c.apiClient, c.driver.GetNodeProject(), instanceID, attachReq) + if attachErr != nil { + return nil, status.Errorf(codes.Internal, "failed to attach disk to node: %s", attachErr.Error()) + } + + klog.Infof("Successfully published volume with ID: %s to node: %s", + request.GetVolumeId(), request.GetNodeId()) + + return &csi.ControllerPublishVolumeResponse{ + PublishContext: nil, + }, nil +} + +func (c *ControllerServer) ControllerUnpublishVolume(ctx context.Context, + request *csi.ControllerUnpublishVolumeRequest, +) (*csi.ControllerUnpublishVolumeResponse, error) { + klog.Infof("Received request to unpublish volume: %+v", request) + diskID := request.GetVolumeId() + instanceID := request.GetNodeId() + + detachReq := &crusoeapi.InstancesDetachDiskPostRequest{ + DetachDisks: []string{diskID}, + } + + detachErr := detachDisk(ctx, c.apiClient, c.driver.GetNodeProject(), instanceID, detachReq) + if detachErr != nil { + return nil, status.Errorf(codes.Internal, "failed to detach disk from vm: %s", detachErr.Error()) + } + + return &csi.ControllerUnpublishVolumeResponse{}, nil +} + +func (c *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, + request *csi.ValidateVolumeCapabilitiesRequest, +) (*csi.ValidateVolumeCapabilitiesResponse, error) { + klog.Infof("Received request to validate volume capabilities: %+v", request) + capabilities := request.GetVolumeCapabilities() + if capErr := validateVolumeCapabilities(capabilities); capErr != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid volume capabilities: %s", capErr.Error()) + } + + disk, getErr := getDisk(ctx, c.apiClient, c.driver.GetNodeProject(), request.GetVolumeId()) + if getErr != nil { + return nil, status.Errorf(codes.FailedPrecondition, "failed to get existing disk %s", getErr.Error()) + } + + desiredType := getDiskTypeFromVolumeType(capabilities) + // as part of the CSI specification, if the set of capabilities is not supported, the Confirmed field of the + // response should be empty – when Confirmed is empty, we can optionally include a message for K8s to report + // why the capabilities are unsupported + if desiredType != disk.Type_ { + return &csi.ValidateVolumeCapabilitiesResponse{ + Message: diskUnsatisfactoryMsg, + }, nil + } + + return &csi.ValidateVolumeCapabilitiesResponse{ + Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{ + VolumeContext: request.GetVolumeContext(), + VolumeCapabilities: request.GetVolumeCapabilities(), + Parameters: request.GetParameters(), + }, + }, nil +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (c *ControllerServer) ListVolumes(_ context.Context, + _ *csi.ListVolumesRequest, +) (*csi.ListVolumesResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (c *ControllerServer) ControllerGetVolume(_ context.Context, + _ *csi.ControllerGetVolumeRequest, +) (*csi.ControllerGetVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (c *ControllerServer) GetCapacity(_ context.Context, + _ *csi.GetCapacityRequest, +) (*csi.GetCapacityResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (c *ControllerServer) CreateSnapshot(_ context.Context, + _ *csi.CreateSnapshotRequest, +) (*csi.CreateSnapshotResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (c *ControllerServer) DeleteSnapshot(_ context.Context, + _ *csi.DeleteSnapshotRequest, +) (*csi.DeleteSnapshotResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (c *ControllerServer) ListSnapshots(_ context.Context, + _ *csi.ListSnapshotsRequest, +) (*csi.ListSnapshotsResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (c *ControllerServer) ControllerModifyVolume(_ context.Context, + _ *csi.ControllerModifyVolumeRequest, +) (*csi.ControllerModifyVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +func (c *ControllerServer) ControllerGetCapabilities(_ context.Context, + _ *csi.ControllerGetCapabilitiesRequest, +) (*csi.ControllerGetCapabilitiesResponse, error) { + controllerCapabilities := make([]*csi.ControllerServiceCapability, 0, len(controllerServerCapabilities)) + + for _, capability := range controllerServerCapabilities { + controllerCapabilities = append(controllerCapabilities, &csi.ControllerServiceCapability{ + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: capability, + }, + }, + }) + } + + return &csi.ControllerGetCapabilitiesResponse{ + Capabilities: controllerCapabilities, + }, nil +} diff --git a/internal/driver/controller_util.go b/internal/driver/controller_util.go new file mode 100644 index 0000000..d19b3ea --- /dev/null +++ b/internal/driver/controller_util.go @@ -0,0 +1,366 @@ +package driver + +import ( + "context" + "errors" + "fmt" + "strconv" + + "github.com/container-storage-interface/spec/lib/go/csi" + + crusoeapi "github.com/crusoecloud/client-go/swagger/v1alpha5" +) + +const ( + BytesInGiB = 1024 * 1024 * 1024 + BytesInTiB = 1024 * 1024 * 1024 * 1024 + blockVolumeDiskType = "persistent-ssd" + mountVolumeDiskType = "shared-volume" + dataDiskAttachmentType = "data" + readOnlyDiskMode = "read-only" + readWriteDiskMode = "read-write" + BlockSizeParam = "csi.crusoe.ai/block-size" +) + +var ( + errUnsupportedVolumeAccessMode = errors.New("unsupported access mode for volume") + + errUnexpectedVolumeCapability = errors.New("unknown volume capability") + errDiskDifferentSize = errors.New("disk has different size") + errDiskDifferentName = errors.New("disk has different name") + errDiskDifferentLocation = errors.New("disk has different location") + errDiskDifferentBlockSize = errors.New("disk has different block size") + errDiskDifferentType = errors.New("disk has different type") + errUnsupportedMountAccessMode = errors.New("unsupported access mode for mount volume") + errUnsupportedBlockAccessMode = errors.New("unsupported access mode for block volume") + errNoCapabilitiesSpecified = errors.New("neither block nor mount capability specified") + errBlockAndMountSpecified = errors.New("both block and mount capabilities specified") + errInvalidBlockSize = errors.New("invalid block size specified: must be 512 or 4096") + + //nolint:gochecknoglobals // use this map to determine what capabilities are supported + supportedBlockVolumeAccessMode = map[csi.VolumeCapability_AccessMode_Mode]struct{}{ + csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER: {}, + csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY: {}, + csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY: {}, + csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER: {}, + csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER: {}, + } + //nolint:gochecknoglobals // use this map to determine what capabilities are supported + supportedMountVolumeAccessMode = map[csi.VolumeCapability_AccessMode_Mode]struct{}{ + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER: {}, + csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER: {}, + } +) + +func createDisk(ctx context.Context, apiClient *crusoeapi.APIClient, + projectID string, createReq *crusoeapi.DisksPostRequestV1Alpha5, +) (*crusoeapi.DiskV1Alpha5, error) { + dataResp, httpResp, err := apiClient.DisksApi.CreateDisk(ctx, *createReq, projectID) + if err != nil { + return nil, fmt.Errorf("failed to start a create disk operation: %w", err) + } + defer httpResp.Body.Close() + + disk, _, err := awaitOperationAndResolve[crusoeapi.DiskV1Alpha5](ctx, dataResp.Operation, projectID, + apiClient.DiskOperationsApi.GetStorageDisksOperation) + if err != nil { + return nil, fmt.Errorf("failed to create disk: %w", err) + } + + return disk, nil +} + +func attachDisk(ctx context.Context, apiClient *crusoeapi.APIClient, projectID, vmID string, + attachReq *crusoeapi.InstancesAttachDiskPostRequestV1Alpha5, +) error { + dataResp, httpResp, err := apiClient.VMsApi.UpdateInstanceAttachDisks(ctx, *attachReq, projectID, vmID) + if err != nil { + return fmt.Errorf("failed to start an attach disk operation: %w", err) + } + defer httpResp.Body.Close() + + _, err = awaitOperation(ctx, dataResp.Operation, projectID, + apiClient.VMOperationsApi.GetComputeVMsInstancesOperation) + if err != nil { + return fmt.Errorf("failed to attach disk: %w", err) + } + + return nil +} + +func detachDisk(ctx context.Context, apiClient *crusoeapi.APIClient, projectID, vmID string, + detachReq *crusoeapi.InstancesDetachDiskPostRequest, +) error { + dataResp, httpResp, err := apiClient.VMsApi.UpdateInstanceDetachDisks(ctx, *detachReq, projectID, vmID) + if err != nil { + return fmt.Errorf("failed to start a detach disk operation: %w", err) + } + defer httpResp.Body.Close() + + _, err = awaitOperation(ctx, dataResp.Operation, projectID, + apiClient.VMOperationsApi.GetComputeVMsInstancesOperation) + if err != nil { + return fmt.Errorf("failed to detach disk: %w", err) + } + + return nil +} + +func updateDisk(ctx context.Context, apiClient *crusoeapi.APIClient, + projectID, diskID string, updateReq *crusoeapi.DisksPatchRequest, +) (*crusoeapi.DiskV1Alpha5, error) { + dataResp, httpResp, err := apiClient.DisksApi.ResizeDisk(ctx, *updateReq, projectID, diskID) + if err != nil { + return nil, fmt.Errorf("failed to start a create disk operation: %w", err) + } + defer httpResp.Body.Close() + + disk, _, err := awaitOperationAndResolve[crusoeapi.DiskV1Alpha5](ctx, dataResp.Operation, projectID, + apiClient.DiskOperationsApi.GetStorageDisksOperation) + if err != nil { + return nil, fmt.Errorf("failed to create disk: %w", err) + } + + return disk, nil +} + +func deleteDisk(ctx context.Context, apiClient *crusoeapi.APIClient, projectID, diskID string) error { + dataResp, httpResp, err := apiClient.DisksApi.DeleteDisk(ctx, projectID, diskID) + if err != nil { + return fmt.Errorf("failed to start a delete disk operation: %w", err) + } + defer httpResp.Body.Close() + + _, err = awaitOperation(ctx, dataResp.Operation, projectID, apiClient.DiskOperationsApi.GetStorageDisksOperation) + if err != nil { + return fmt.Errorf("failed to delete disk: %w", err) + } + + return nil +} + +func findDisk(ctx context.Context, apiClient *crusoeapi.APIClient, + projectID, name string, +) (*crusoeapi.DiskV1Alpha5, error) { + disks, httpResp, listErr := apiClient.DisksApi.ListDisks(ctx, projectID) + if listErr != nil { + return nil, fmt.Errorf("error checking if volume exists: %w", listErr) + } + defer httpResp.Body.Close() + var foundDisk *crusoeapi.DiskV1Alpha5 + for i := range disks.Items { + currDisk := disks.Items[i] + if currDisk.Name == name { + foundDisk = &currDisk + + break + } + } + + return foundDisk, nil +} + +func getDisk(ctx context.Context, apiClient *crusoeapi.APIClient, + projectID, diskID string, +) (*crusoeapi.DiskV1Alpha5, error) { + disk, httpResp, listErr := apiClient.DisksApi.GetDisk(ctx, projectID, diskID) + if listErr != nil { + return nil, fmt.Errorf("error checking if volume exists: %w", listErr) + } + defer httpResp.Body.Close() + + return &disk, nil +} + +func convertStorageUnitToBytes(storageStr string) (int64, error) { + valueStr := storageStr[:len(storageStr)-3] + unit := storageStr[len(storageStr)-3:] + + value, err := strconv.Atoi(valueStr) + if err != nil { + return 0, fmt.Errorf("invalid numeric value: %w", err) + } + + var totalBytes int64 + switch unit { + case "GiB": + totalBytes = int64(value * BytesInGiB) + case "TiB": + totalBytes = int64(value * BytesInTiB) + default: + //nolint:goerr113 // use dynamic errors for more informative error handling + return 0, fmt.Errorf("received invalid unit: %s", unit) + } + + return totalBytes, nil +} + +// convertBytesToStorageUnit converts bytes to a specified unit (GiB or TiB) and returns the result as a string. +func convertBytesToStorageUnit(bytes int64) string { + var size int64 + var unit string + + if unitsTiB := bytes / BytesInTiB; unitsTiB > 1 { + size = unitsTiB + unit = "TiB" + } else { + size = bytes / BytesInGiB + unit = "GiB" + } + + return fmt.Sprintf("%d%s", size, unit) +} + +func getVolumeFromDisk(disk *crusoeapi.DiskV1Alpha5) (*csi.Volume, error) { + volBytes, err := convertStorageUnitToBytes(disk.Size) + if err != nil { + return nil, fmt.Errorf("failed to parse disk storage: %w", err) + } + + // The disk is only attachable to instances in its location + accessibleTopology := &csi.Topology{ + Segments: map[string]string{ + TopologyLocationKey: disk.Location, + }, + } + + volumeContext := map[string]string{ + VolumeContextDiskTypeKey: disk.Type_, + VolumeContextDiskSerialNumberKey: disk.SerialNumber, + } + + return &csi.Volume{ + CapacityBytes: volBytes, + VolumeId: disk.Id, + VolumeContext: volumeContext, + ContentSource: nil, + AccessibleTopology: []*csi.Topology{accessibleTopology}, + }, nil +} + +//nolint:cyclop // complexity comes from argument validation +func validateVolumeCapabilities(capabilities []*csi.VolumeCapability) error { + for _, capability := range capabilities { + if capability.GetBlock() != nil && capability.GetMount() != nil { + return errBlockAndMountSpecified + } + if capability.GetBlock() == nil && capability.GetMount() == nil { + return errNoCapabilitiesSpecified + } + + accessMode := capability.GetAccessMode().GetMode() + if capability.GetBlock() != nil { + if _, ok := supportedBlockVolumeAccessMode[accessMode]; !ok { + return fmt.Errorf("%w: %s", errUnsupportedBlockAccessMode, accessMode) + } + } + if capability.GetMount() != nil { + _, mountOk := supportedMountVolumeAccessMode[accessMode] + _, blockOk := supportedBlockVolumeAccessMode[accessMode] + + // mount volumes can do everything block can too + if !blockOk && !mountOk { + return fmt.Errorf("%w: %s", errUnsupportedMountAccessMode, accessMode) + } + } + } + + return nil +} + +func getDiskTypeFromVolumeType(capabilities []*csi.VolumeCapability) string { + for _, capability := range capabilities { + accessMode := capability.GetAccessMode().GetMode() + if _, mountOk := supportedMountVolumeAccessMode[accessMode]; mountOk { + return mountVolumeDiskType + } else if _, blockOk := supportedBlockVolumeAccessMode[accessMode]; blockOk { + return blockVolumeDiskType + } + } + + return "" +} + +func parseAndValidateBlockSize(strBlockSize string) (int64, error) { + parsedBlockSize, err := strconv.Atoi(strBlockSize) + if err != nil { + return 0, fmt.Errorf("invalid block size argument: %w", err) + } + if parsedBlockSize != 512 && parsedBlockSize != 4096 { + return 0, errInvalidBlockSize + } + + return int64(parsedBlockSize), nil +} + +func getCreateDiskRequest(name, capacity, location string, + capabilities []*csi.VolumeCapability, optionalParameters map[string]string, +) (*crusoeapi.DisksPostRequestV1Alpha5, error) { + params := &crusoeapi.DisksPostRequestV1Alpha5{ + Name: name, + Size: capacity, + Location: location, + } + if blockSize, ok := optionalParameters[BlockSizeParam]; ok { + parsedBlockSize, err := parseAndValidateBlockSize(blockSize) + if err != nil { + return nil, fmt.Errorf("failed to validate block size: %w", err) + } + params.BlockSize = parsedBlockSize + } + + params.Type_ = getDiskTypeFromVolumeType(capabilities) + + return params, nil +} + +func verifyExistingDisk(currentDisk *crusoeapi.DiskV1Alpha5, createReq *crusoeapi.DisksPostRequestV1Alpha5) error { + if currentDisk.Size != createReq.Size { + return errDiskDifferentSize + } + if currentDisk.Name != createReq.Name { + return errDiskDifferentName + } + if currentDisk.Location != createReq.Location { + return errDiskDifferentLocation + } + if currentDisk.BlockSize != createReq.BlockSize { + return errDiskDifferentBlockSize + } + if currentDisk.Type_ != createReq.Type_ { + return errDiskDifferentType + } + + return nil +} + +func parseCapacity(capacityRange *csi.CapacityRange) string { + // Note: both RequiredBytes and LimitBytes SHOULD be set to the same value, + // however, it is only guaranteed that one of them is set. + reqBytes := capacityRange.GetRequiredBytes() + if reqBytes == 0 { + reqBytes = capacityRange.GetLimitBytes() + } + reqCapacity := convertBytesToStorageUnit(reqBytes) + + return reqCapacity +} + +func getAttachmentTypeFromVolumeCapability(capability *csi.VolumeCapability) (string, error) { + accessMode := capability.GetAccessMode().GetMode() + switch accessMode { + case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER: + return readWriteDiskMode, nil + case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY, + csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY: + return readOnlyDiskMode, nil + case csi.VolumeCapability_AccessMode_UNKNOWN: + return "", errUnexpectedVolumeCapability + } + + return "", fmt.Errorf("%w: %s", errUnsupportedVolumeAccessMode, accessMode.String()) +} diff --git a/internal/driver/driver.go b/internal/driver/driver.go new file mode 100644 index 0000000..4bc2d25 --- /dev/null +++ b/internal/driver/driver.go @@ -0,0 +1,57 @@ +package driver + +type Config struct { + // These should be consistent regardless of which node the driver is running on. + VendorName string + VendorVersion string + // These are initialized on a per-node unique basis + NodeID string + NodeLocation string + NodeProject string +} + +type Service int + +const ( + NodeService Service = iota + IdentityService + ControllerService +) + +// Note: these are injected during build +// This name MUST correspond with the name provided to the storage class +// This is how Kubernetes knows to invoke our CSI. +// +//nolint:gochecknoglobals // we will use these global vars to identify the name and version of the CSI +var ( + name string + version string +) + +func GetVendorName() string { + return name +} + +func GetVendorVersion() string { + return version +} + +func (d *Config) GetName() string { + return d.VendorName +} + +func (d *Config) GetVendorVersion() string { + return d.VendorVersion +} + +func (d *Config) GetNodeID() string { + return d.NodeID +} + +func (d *Config) GetNodeProject() string { + return d.NodeProject +} + +func (d *Config) GetNodeLocation() string { + return d.NodeLocation +} diff --git a/internal/driver/identity.go b/internal/driver/identity.go new file mode 100644 index 0000000..780da43 --- /dev/null +++ b/internal/driver/identity.go @@ -0,0 +1,81 @@ +package driver + +import ( + "context" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + + crusoeapi "github.com/crusoecloud/client-go/swagger/v1alpha5" +) + +type IdentityServer struct { + apiClient *crusoeapi.APIClient + driver *Config + capabilities []*csi.PluginCapability +} + +func NewIdentityServer() *IdentityServer { + return &IdentityServer{} +} + +func (i *IdentityServer) Init(apiClient *crusoeapi.APIClient, driver *Config, services []Service) error { + i.driver = driver + i.apiClient = apiClient + i.capabilities = []*csi.PluginCapability{ + { + Type: &csi.PluginCapability_VolumeExpansion_{ + VolumeExpansion: &csi.PluginCapability_VolumeExpansion{ + Type: csi.PluginCapability_VolumeExpansion_OFFLINE, + }, + }, + }, + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, + }, + }, + }, + } + for _, service := range services { + if service == ControllerService { + i.capabilities = append(i.capabilities, &csi.PluginCapability{ + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, + }, + }, + }) + } + } + + return nil +} + +func (i *IdentityServer) RegisterServer(srv *grpc.Server) error { + csi.RegisterIdentityServer(srv, i) + + return nil +} + +func (i *IdentityServer) GetPluginInfo(_ context.Context, + _ *csi.GetPluginInfoRequest, +) (*csi.GetPluginInfoResponse, error) { + return &csi.GetPluginInfoResponse{ + Name: i.driver.GetName(), + VendorVersion: i.driver.GetVendorVersion(), + }, nil +} + +func (i *IdentityServer) GetPluginCapabilities(_ context.Context, + _ *csi.GetPluginCapabilitiesRequest, +) (*csi.GetPluginCapabilitiesResponse, error) { + return &csi.GetPluginCapabilitiesResponse{ + Capabilities: i.capabilities, + }, nil +} + +func (i *IdentityServer) Probe(_ context.Context, _ *csi.ProbeRequest) (*csi.ProbeResponse, error) { + return &csi.ProbeResponse{}, nil +} diff --git a/internal/driver/node.go b/internal/driver/node.go new file mode 100644 index 0000000..1fa5b73 --- /dev/null +++ b/internal/driver/node.go @@ -0,0 +1,192 @@ +package driver + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" + "k8s.io/mount-utils" + "k8s.io/utils/exec" + + crusoeapi "github.com/crusoecloud/client-go/swagger/v1alpha5" +) + +const ( + // MaxVolumesPerNode refers to the maximum number of disks that can be attached to a VM + // ref: https://docs.crusoecloud.com/storage/disks/overview#persistent-disks + MaxVolumesPerNode = 16 + TopologyLocationKey = "topology.csi.crusoe.ai/location" + TopologyProjectKey = "topology.csi.crusoe.ai/project-id" + VolumeContextDiskSerialNumberKey = "serial-number" + VolumeContextDiskTypeKey = "disk-type" + ReadOnlyMountOption = "ro" + newDirPerms = 0o755 // this represents: rwxr-xr-x + newFilePerms = 0o644 // this represents: rw-r--r-- +) + +var errVolumeMissingSerialNumber = errors.New("volume missing serial number context key") + +//nolint:gochecknoglobals // we will use this slice to determine what the node service supports +var NodeServerCapabilities = []csi.NodeServiceCapability_RPC_Type{ + csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + csi.NodeServiceCapability_RPC_EXPAND_VOLUME, +} + +type NodeServer struct { + apiClient *crusoeapi.APIClient + driver *Config + mounter *mount.SafeFormatAndMount +} + +func NewNodeServer() *NodeServer { + return &NodeServer{} +} + +func (n *NodeServer) Init(apiClient *crusoeapi.APIClient, driver *Config, _ []Service) error { + n.driver = driver + n.apiClient = apiClient + n.mounter = mount.NewSafeFormatAndMount(mount.New(""), exec.New()) + + return nil +} + +func (n *NodeServer) RegisterServer(srv *grpc.Server) error { + csi.RegisterNodeServer(srv, n) + + return nil +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (n *NodeServer) NodeStageVolume(_ context.Context, + _ *csi.NodeStageVolumeRequest, +) (*csi.NodeStageVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (n *NodeServer) NodeUnstageVolume(_ context.Context, + _ *csi.NodeUnstageVolumeRequest, +) (*csi.NodeUnstageVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +func (n *NodeServer) NodePublishVolume(_ context.Context, + req *csi.NodePublishVolumeRequest, +) (*csi.NodePublishVolumeResponse, error) { + klog.Infof("Received request to publish volume: %+v", req) + targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() + readOnly := req.GetReadonly() + + volumeCapability := req.GetVolumeCapability() + + mountOpts := []string{"bind"} + if readOnly { + mountOpts = append(mountOpts, ReadOnlyMountOption) + } + + // Check if volume is already mounted, if it is return success + mounted, err := n.mounter.IsMountPoint(targetPath) + if err == nil && mounted { + return &csi.NodePublishVolumeResponse{}, nil + } + + if volumeCapability.GetBlock() != nil { + mountErr := publishBlockVolume(req, targetPath, n.mounter, mountOpts) + if mountErr != nil { + return nil, status.Errorf(codes.Internal, "failed to mount volume: %s", mountErr.Error()) + } + } else if volumeCapability.GetMount() != nil { + var sourcePath string + fsType := volumeCapability.GetMount().GetFsType() + sourcePath = stagingTargetPath + mountOpts = append(mountOpts, volumeCapability.GetMount().GetMountFlags()...) + err := os.MkdirAll(sourcePath, newDirPerms) + if err != nil { + return nil, status.Errorf(codes.Internal, + fmt.Sprintf("failed to make directory at target path: %s", err.Error())) + } + err = n.mounter.Mount(sourcePath, targetPath, fsType, mountOpts) + if err != nil { + return nil, status.Errorf(codes.Internal, + fmt.Sprintf("failed to mount volume at target path: %s", err.Error())) + } + } + + klog.Infof("Successfully published volume: %s", req.GetVolumeId()) + + return &csi.NodePublishVolumeResponse{}, nil +} + +func (n *NodeServer) NodeUnpublishVolume(_ context.Context, + req *csi.NodeUnpublishVolumeRequest, +) (*csi.NodeUnpublishVolumeResponse, error) { + klog.Infof("Received request to unpublish volume: %+v", req) + + targetPath := req.GetTargetPath() + err := mount.CleanupMountPoint(targetPath, n.mounter, false) + if err != nil { + return nil, status.Errorf(codes.Internal, fmt.Sprintf("failed to cleanup mount point %s", err.Error())) + } + + klog.Infof("Successfully unpublished volume: %s", req.GetVolumeId()) + + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (n *NodeServer) NodeGetVolumeStats(_ context.Context, + _ *csi.NodeGetVolumeStatsRequest, +) (*csi.NodeGetVolumeStatsResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +//nolint:wrapcheck // we want to return gRPC Status errors +func (n *NodeServer) NodeExpandVolume(_ context.Context, + _ *csi.NodeExpandVolumeRequest, +) (*csi.NodeExpandVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, errRPCUnimplemented.Error()) +} + +func (n *NodeServer) NodeGetCapabilities(_ context.Context, + _ *csi.NodeGetCapabilitiesRequest, +) (*csi.NodeGetCapabilitiesResponse, error) { + nodeCapabilities := make([]*csi.NodeServiceCapability, 0, len(NodeServerCapabilities)) + + for _, capability := range NodeServerCapabilities { + nodeCapabilities = append(nodeCapabilities, &csi.NodeServiceCapability{ + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: capability, + }, + }, + }) + } + + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: nodeCapabilities, + }, nil +} + +func (n *NodeServer) NodeGetInfo(_ context.Context, _ *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { + // We want to provide useful topological hints to the container orchestrator + // We can only stage/publish volumes in the same location as a node + accessibleTopology := &csi.Topology{ + Segments: map[string]string{ + TopologyLocationKey: n.driver.GetNodeLocation(), + TopologyProjectKey: n.driver.GetNodeProject(), + }, + } + + return &csi.NodeGetInfoResponse{ + NodeId: n.driver.GetNodeID(), + MaxVolumesPerNode: MaxVolumesPerNode, + AccessibleTopology: accessibleTopology, + }, nil +} diff --git a/internal/driver/node_util.go b/internal/driver/node_util.go new file mode 100644 index 0000000..5877098 --- /dev/null +++ b/internal/driver/node_util.go @@ -0,0 +1,54 @@ +package driver + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/container-storage-interface/spec/lib/go/csi" + "k8s.io/mount-utils" +) + +func getPersistentSSDDevicePath(serialNumber string) string { + // symlink: /dev/disk/by-id/virtio- + return fmt.Sprintf("/dev/disk/by-id/virtio-%s", serialNumber) +} + +func publishBlockVolume(req *csi.NodePublishVolumeRequest, targetPath string, + mounter *mount.SafeFormatAndMount, mountOpts []string, +) error { + volumeContext := req.GetVolumeContext() + serialNumber, ok := volumeContext[VolumeContextDiskSerialNumberKey] + if !ok { + return errVolumeMissingSerialNumber + } + + devicePath := getPersistentSSDDevicePath(serialNumber) + dirPath := filepath.Dir(targetPath) + // Check if the directory exists + if _, err := os.Stat(dirPath); errors.Is(err, os.ErrNotExist) { + // Directory does not exist, create it + if err := os.MkdirAll(dirPath, newDirPerms); err != nil { + return fmt.Errorf("failed to make directory for target path: %w", err) + } + } + + // expose the block volume as a file + f, err := os.OpenFile(targetPath, os.O_CREATE, os.FileMode(newFilePerms)) + if err != nil { + if !os.IsExist(err) { + return fmt.Errorf("failed to make file for target path: %w", err) + } + } + if err = f.Close(); err != nil { + return fmt.Errorf("failed to close file after making target path: %w", err) + } + + err = mounter.FormatAndMount(devicePath, targetPath, "", mountOpts) + if err != nil { + return fmt.Errorf("failed to mount volume at target path: %w", err) + } + + return nil +} diff --git a/internal/driver/secrets.go b/internal/driver/secrets.go new file mode 100644 index 0000000..60cabec --- /dev/null +++ b/internal/driver/secrets.go @@ -0,0 +1,45 @@ +package driver + +import ( + "fmt" + "os" +) + +const ( + SecretPath = "/etc/secrets" + AccessKeyName = "crusoe-csi-accesskey" + //nolint:gosec // we are not hardcoding credentials, just the env var to get them + SecretKeyName = "crusoe-csi-secretkey" +) + +// Kubernetes provides two main ways of injecting secrets into pods: +// 1) Injecting them into environment variables which can be retrieved by the application +// 2) Creating a file '/etc/secrets' which the application can then retrieve + +func ReadSecretFromFile(secretName string) (string, error) { + // Attempt to open the file corresponding to the secret key + file, err := os.Open(fmt.Sprintf("%s/%s", SecretPath, secretName)) + if err != nil { + return "", fmt.Errorf("error opening secret file: %w", err) + } + defer file.Close() + + // Read the entire file into a byte slice + data := make([]byte, 0) + _, err = file.Read(data) + if err != nil { + return "", fmt.Errorf("error reading secret file: %w", err) + } + + secretValue := string(data) + + return secretValue, nil +} + +func GetCrusoeAccessKey() string { + return ReadEnvVar(AccessKeyName) +} + +func GetCrusoeSecretKey() string { + return ReadEnvVar(SecretKeyName) +} diff --git a/internal/driver/util.go b/internal/driver/util.go new file mode 100644 index 0000000..3e5d334 --- /dev/null +++ b/internal/driver/util.go @@ -0,0 +1,224 @@ +package driver + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "strings" + "time" + + "github.com/antihax/optional" + + swagger "github.com/crusoecloud/client-go/swagger/v1alpha4" + crusoeapi "github.com/crusoecloud/client-go/swagger/v1alpha5" +) + +const ( + pollInterval = 2 * time.Second + OpSucceeded opStatus = "SUCCEEDED" + OpInProgress opStatus = "IN_PROGRESS" + OpFailed opStatus = "FAILED" +) + +// apiError models the error format returned by the Crusoe API go client. +type apiError struct { + Code string `json:"code"` + Message string `json:"message"` +} + +type opStatus string + +type opResultError struct { + Code string `json:"code"` + Message string `json:"message"` +} + +var ( + errUnableToGetOpRes = errors.New("failed to get result of operation") + // fallback error presented to the user in unexpected situations. + errUnexpected = errors.New("an unexpected error occurred, please try again, and if the problem persists, " + + "contact support@crusoecloud.com") + errBadFQDN = errors.New("fqdn in unexpected format") + errInstanceNotFound = errors.New("instance not found") +) + +// UnpackAPIError takes a swagger API error and safely attempts to extract any additional information +// present in the response. The original error is returned unchanged if it cannot be unpacked. +func UnpackAPIError(original error) error { + apiErr := &swagger.GenericSwaggerError{} + if ok := errors.As(original, apiErr); !ok { + return original + } + + var model apiError + err := json.Unmarshal(apiErr.Body(), &model) + if err != nil { + return original + } + + // some error messages are of the format "rpc code = ... desc = ..." + // in those cases, we extract the description and return it + const two = 2 + components := strings.Split(model.Message, " desc = ") + if len(components) == two { + //nolint:goerr113 // error is dynamic + return fmt.Errorf("%s", components[1]) + } + + //nolint:goerr113 // error is dynamic + return fmt.Errorf("%s", model.Message) +} + +func opResultToError(res interface{}) (expectedErr, unexpectedErr error) { + b, err := json.Marshal(res) + if err != nil { + return nil, fmt.Errorf("unable to marshal operation error: %w", err) + } + resultError := opResultError{} + err = json.Unmarshal(b, &resultError) + if err != nil { + return nil, fmt.Errorf("op result type not error as expected: %w", err) + } + + //nolint:goerr113 //This function is designed to return dynamic errors + return fmt.Errorf("%s", resultError.Message), nil +} + +func parseOpResult[T any](opResult interface{}) (*T, error) { + b, err := json.Marshal(opResult) + if err != nil { + return nil, errUnableToGetOpRes + } + + var result T + err = json.Unmarshal(b, &result) + if err != nil { + return nil, errUnableToGetOpRes + } + + return &result, nil +} + +// awaitOperation polls an async API operation until it resolves into a success or failure state. +func awaitOperation(ctx context.Context, op *crusoeapi.Operation, projectID string, + getFunc func(context.Context, string, string) (crusoeapi.Operation, *http.Response, error)) ( + *crusoeapi.Operation, error, +) { + for op.State == string(OpInProgress) { + updatedOps, httpResp, err := getFunc(ctx, projectID, op.OperationId) + if err != nil { + return nil, fmt.Errorf("error getting operation with id %s: %w", op.OperationId, err) + } + httpResp.Body.Close() + + op = &updatedOps + + time.Sleep(pollInterval) + } + + switch op.State { + case string(OpSucceeded): + return op, nil + case string(OpFailed): + opError, err := opResultToError(op.Result) + if err != nil { + return op, err + } + + return op, opError + default: + + return op, errUnexpected + } +} + +// AwaitOperationAndResolve awaits an async API operation and attempts to parse the response as an instance of T, +// if the operation was successful. +func awaitOperationAndResolve[T any](ctx context.Context, op *crusoeapi.Operation, projectID string, + getFunc func(context.Context, string, string) (crusoeapi.Operation, *http.Response, error), +) (*T, *crusoeapi.Operation, error) { + op, err := awaitOperation(ctx, op, projectID, getFunc) + if err != nil { + return nil, op, err + } + + result, err := parseOpResult[T](op.Result) + if err != nil { + return nil, op, err + } + + return result, op, nil +} + +func GetInstanceID(ctx context.Context, client *crusoeapi.APIClient) ( + instanceID string, + projectID string, + location string, + err error, +) { + // FQDN is of the form: ..compute.internal + fqdn := GetNodeFQDN() + + fqdnSlice := strings.Split(fqdn, ".") + if len(fqdnSlice) < 1 { + return "", "", "", errBadFQDN + } + + vmName := fqdnSlice[0] + + instance, err := findInstance(ctx, client, vmName) + if err != nil { + return "", "", "", fmt.Errorf("could not find instance (%s): %w", vmName, err) + } + + return instance.Id, instance.ProjectId, instance.Location, nil +} + +func findInstance(ctx context.Context, + client *crusoeapi.APIClient, instanceName string, +) (*crusoeapi.InstanceV1Alpha5, error) { + opts := &crusoeapi.ProjectsApiListProjectsOpts{ + OrgId: optional.EmptyString(), + } + + projectsResp, projectHTTPResp, err := client.ProjectsApi.ListProjects(ctx, opts) + + defer projectHTTPResp.Body.Close() + if err != nil { + return nil, fmt.Errorf("failed to query for projects: %w", err) + } + + for _, project := range projectsResp.Items { + listVMOpts := &crusoeapi.VMsApiListInstancesOpts{ + Names: optional.NewString(instanceName), + } + instances, instancesHTTPResp, instancesErr := client.VMsApi.ListInstances(ctx, project.Id, listVMOpts) + if instancesErr != nil { + return nil, fmt.Errorf("failed to list instances: %w", instancesErr) + } + instancesHTTPResp.Body.Close() + + if len(instances.Items) == 0 { + continue + } + + for i := range instances.Items { + if instances.Items[i].Name == instanceName { + return &instances.Items[i], nil + } + } + } + + return nil, errInstanceNotFound +} + +func ReadEnvVar(secretName string) string { + return os.Getenv(secretName) +} + +func GetNodeFQDN() string { + return ReadEnvVar("NODE_NAME") +} diff --git a/internal/locker/locker.go b/internal/locker/locker.go new file mode 100644 index 0000000..8eb66f8 --- /dev/null +++ b/internal/locker/locker.go @@ -0,0 +1,56 @@ +package locker + +import ( + "sync" +) + +type Locker struct { + locks map[string]*sync.RWMutex // Maps ID to its mutex for locking. + mx *sync.Mutex +} + +func NewLocker() *Locker { + return &Locker{ + locks: make(map[string]*sync.RWMutex), + mx: &sync.Mutex{}, + } +} + +func (l *Locker) TryAcquireReadLock(id string) bool { + l.mx.Lock() + rwLock, ok := l.locks[id] + if !ok { + rwLock = &sync.RWMutex{} + l.locks[id] = rwLock + } + l.mx.Unlock() + + return rwLock.TryRLock() +} + +func (l *Locker) ReleaseReadLock(id string) { + rwLock, ok := l.locks[id] + if ok { + rwLock.RUnlock() + } +} + +func (l *Locker) TryAcquireWriteLock(id string) bool { + l.mx.Lock() + rwLock, ok := l.locks[id] + if !ok { + rwLock = &sync.RWMutex{} + l.locks[id] = rwLock + } + l.mx.Unlock() + + return rwLock.TryLock() +} + +// ReleaseWriteLock releases a previously acquired write lock for the given ID. +func (l *Locker) ReleaseWriteLock(id string) { + rwLock, ok := l.locks[id] + if ok { + rwLock.Unlock() + } +} diff --git a/scripts/tag_semver.sh b/scripts/tag_semver.sh new file mode 100644 index 0000000..793e589 --- /dev/null +++ b/scripts/tag_semver.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +set -e + +MAJOR_VERSION=$1 +MINOR_VERSION=$2 +TAG_PREFIX=$3 + +# find the latest tag +NEW_VERSION="${TAG_PREFIX}v${MAJOR_VERSION}.${MINOR_VERSION}.0" +git fetch -q --tags --prune --prune-tags +tags=$(git tag -l ${TAG_PREFIX}v${MAJOR_VERSION}.${MINOR_VERSION}.* --sort=-version:refname) +if [[ ! -z "$tags" ]]; then + arr=(${tags}) + for val in ${arr[@]}; do + if [[ "$val" =~ ^${TAG_PREFIX}v${MAJOR_VERSION}+\.${MINOR_VERSION}\.[0-9]+$ ]]; then + prev_build=$(echo ${val} | cut -d. -f3) + new_build=$((prev_build+1)) + NEW_VERSION="${TAG_PREFIX}v${MAJOR_VERSION}.${MINOR_VERSION}.${new_build}" + break + fi + done +fi + +echo "Version for this commit: ${NEW_VERSION}" +echo "RELEASE_VERSION=${NEW_VERSION}" >> variables.env diff --git a/versions.env b/versions.env new file mode 100644 index 0000000..94a9141 --- /dev/null +++ b/versions.env @@ -0,0 +1,2 @@ +export MAJOR_VERSION=0 +export MINOR_VERSION=0