diff --git a/README.md b/README.md index 09dfe02..275344f 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,8 @@ $ docker plugin install datawire/telemount:arm64 --alias telemount Create an intercept. Use `--local-mount-port 1234` to set up a bridge instead of mounting, and `--detailed-ouput --output yaml` so that the command outputs the environment in a readable form: ```console -$ telepresence intercept --local-mount-port 1234 --port 8080 --http-header who=me --detailed-output --output yaml echo-easy +$ telepresence connect +$ telepresence intercept --local-mount-port 1234 --port 8080 --detailed-output --output yaml echo-easy ... TELEPRESENCE_CONTAINER: echo-easy TELEPRESENCE_MOUNTS: /var/run/secrets/kubernetes.io @@ -51,6 +52,21 @@ namespace token ``` +## Debugging + +Start by building the plugin for debugging. This command both builds and enables the plugin: +```console +$ make debug +``` + +Figure out the ID of the plugin: +```console +$ PLUGIN_ID=`docker plugin inspect -f='{{json .Id}}' datawire/telemount:amd64 | xargs` +``` +and start viewing what it prints on stderr. All logging goes to stderr: +``` +$ sudo cat /run/docker/plugins/$PLUGIN_ID/$PLUGIN_ID-stderr +``` ## Credits To the [Rclone project](https://github.com/rclone/rclone) project and [PR 5668](https://github.com/rclone/rclone/pull/5668) specifically for showing a good way to create multi-arch plugins. diff --git a/go.mod b/go.mod index 3c6c344..430fc5a 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,15 @@ module github.com/datawire/docker-volume-telemount go 1.21 -require github.com/docker/go-plugins-helpers v0.0.0-20211224144127-6eecb7beb651 +require ( + github.com/docker/go-plugins-helpers v0.0.0-20211224144127-6eecb7beb651 + golang.org/x/sys v0.17.0 +) require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/docker/go-connections v0.5.0 // indirect - golang.org/x/mod v0.14.0 // indirect - golang.org/x/sys v0.16.0 // indirect - golang.org/x/tools v0.17.0 // indirect + golang.org/x/mod v0.15.0 // indirect + golang.org/x/tools v0.18.0 // indirect ) diff --git a/go.sum b/go.sum index fc2f49c..be03a34 100644 --- a/go.sum +++ b/go.sum @@ -6,11 +6,11 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-plugins-helpers v0.0.0-20211224144127-6eecb7beb651 h1:YcvzLmdrP/b8kLAGJ8GT7bdncgCAiWxJZIlt84D+RJg= github.com/docker/go-plugins-helpers v0.0.0-20211224144127-6eecb7beb651/go.mod h1:LFyLie6XcDbyKGeVK6bHe+9aJTYCxWLBg5IrJZOaXKA= -golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= -golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.15.0 h1:SernR4v+D55NyBH2QiEQrlBAnj1ECL6AGrA5+dPaMY8= +golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= -golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ= +golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg= diff --git a/pkg/log/log.go b/pkg/log/log.go index 7b6c2bb..f84d73c 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -49,11 +49,11 @@ func Fatal(v any) { } func Info(v any) { - _, _ = fmt.Fprintln(os.Stdout, v) + _, _ = fmt.Fprintln(os.Stderr, v) } func Infof(format string, args ...any) { - fprintfln(os.Stdout, format, args...) + fprintfln(os.Stderr, format, args...) } func Debug(v any) { diff --git a/pkg/sftp/driver.go b/pkg/sftp/driver.go index 722a8cf..da47147 100644 --- a/pkg/sftp/driver.go +++ b/pkg/sftp/driver.go @@ -17,7 +17,7 @@ import ( type driver struct { // All access to the driver is synchronized using this lock - sync.RWMutex + lock sync.RWMutex volumePath string remoteMounts map[string]*mount } @@ -86,9 +86,13 @@ func (d *driver) Create(r *volume.CreateRequest) (err error) { } else { dir = filepath.Join(container, strings.TrimPrefix(dir, "/")) } - d.Lock() - d.getRemoteMount(host, port).addVolume(r.Name, dir) - d.Unlock() + d.lock.Lock() + defer d.lock.Unlock() + m, err := d.getRemoteMount(host, port) + if err != nil { + return err + } + m.addVolume(r.Name, dir) return nil } @@ -97,8 +101,8 @@ func (d *driver) Remove(r *volume.RemoveRequest) (err error) { defer func() { logResponse(err, "Remove %s return", r.Name) }() - d.Lock() - defer d.Unlock() + d.lock.Lock() + defer d.lock.Unlock() var v *volumeDir if v, err = d.getVolume(r.Name); err != nil { @@ -118,8 +122,8 @@ func (d *driver) Mount(r *volume.MountRequest) (mr *volume.MountResponse, err er defer func() { logResponse(err, "Mount %s return %s", r.Name, mr.Mountpoint) }() - d.Lock() - defer d.Unlock() + d.lock.Lock() + defer d.lock.Unlock() var v *volumeDir if v, err = d.getMountedVolume(r.Name); err != nil { @@ -146,9 +150,9 @@ func (d *driver) Mount(r *volume.MountRequest) (mr *volume.MountResponse, err er func (d *driver) Path(r *volume.PathRequest) (*volume.PathResponse, error) { log.Debugf("Path %s", r.Name) - d.RLock() + d.lock.RLock() v, err := d.getVolume(r.Name) - d.RUnlock() + d.lock.RUnlock() pr := &volume.PathResponse{} if err == nil { pr.Mountpoint = v.logicalMountPoint() @@ -162,8 +166,8 @@ func (d *driver) Unmount(r *volume.UnmountRequest) (err error) { defer func() { logResponse(err, "Unmount %s return", r.Name) }() - d.Lock() - defer d.Unlock() + d.lock.Lock() + defer d.lock.Unlock() var v *volumeDir v, err = d.getVolume(r.Name) @@ -199,24 +203,24 @@ func (d *driver) Unmount(r *volume.UnmountRequest) (err error) { func (d *driver) Get(r *volume.GetRequest) (gr *volume.GetResponse, err error) { log.Debugf("Get %s", r.Name) gr = &volume.GetResponse{} - d.RLock() + d.lock.RLock() v, err := d.getVolume(r.Name) if err == nil { gr.Volume = v.asVolume(r.Name) } - d.RUnlock() + d.lock.RUnlock() logResponse(err, "Get %s return %v", r.Name, gr.Volume) return gr, err } func (d *driver) List() (*volume.ListResponse, error) { log.Debug("List") - d.RLock() + d.lock.RLock() var vols = make([]*volume.Volume, 0, 32) for _, m := range d.remoteMounts { vols = m.appendVolumes(vols) } - d.RUnlock() + d.lock.RUnlock() sort.Slice(vols, func(i, j int) bool { return vols[i].Name < vols[j].Name }) @@ -228,15 +232,18 @@ func (d *driver) Capabilities() *volume.CapabilitiesResponse { return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "local"}} } -func (d *driver) getRemoteMount(host string, port uint16) *mount { +func (d *driver) getRemoteMount(host string, port uint16) (*mount, error) { ps := strconv.Itoa(int(port)) key := net.JoinHostPort(host, ps) if m, ok := d.remoteMounts[key]; ok { - return m + return m, nil } m := newMount(filepath.Join(d.volumePath, host, ps), host, port) + if err := m.mountVolume(); err != nil { + return nil, err + } d.remoteMounts[key] = m - return m + return m, nil } func (d *driver) getVolume(n string) (*volumeDir, error) { diff --git a/pkg/sftp/mount.go b/pkg/sftp/mount.go index f425d4a..390a679 100644 --- a/pkg/sftp/mount.go +++ b/pkg/sftp/mount.go @@ -1,6 +1,8 @@ package sftp import ( + "context" + "errors" "fmt" "os" "os/exec" @@ -9,6 +11,7 @@ import ( "time" "github.com/docker/go-plugins-helpers/volume" + "golang.org/x/sys/unix" "github.com/datawire/docker-volume-telemount/pkg/log" ) @@ -30,7 +33,6 @@ func newMount(mountPoint, host string, port uint16) *mount { host: host, port: port, volumes: make(map[string]*volumeDir), - done: make(chan error, 1), } } @@ -89,6 +91,7 @@ func (m *mount) mountVolume() error { // connection settings "-C", // compression "-o", "ConnectTimeout=10", + "-o", "ServerAliveInterval=5", "-o", fmt.Sprintf("directport=%d", m.port), // mount directives @@ -100,10 +103,14 @@ func (m *mount) mountVolume() error { } exe := "sshfs" cmd := exec.Command(exe, sshfsArgs...) - done := make(chan error, 1) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr + ctx := context.Background() + + // Get the current Ino and Dev of the mountPoint directory + st, err := statWithTimeout(ctx, m.mountPoint, 10*time.Millisecond) + // Die if this process dies cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} @@ -113,29 +120,106 @@ func (m *mount) mountVolume() error { } m.proc = cmd.Process m.mounted.Store(true) - m.done = done - go func() { - // The Wait here will always exit with an error status, because that's what happens - // when sshfs gets interrupted. - err = cmd.Wait() - // Restore to unmounted state - m.mounted.Store(false) - close(done) - if err == nil { - log.Debug("sshfs exited normally") - } else { - log.Errorf("sshfs exited with %v", err) + + m.done = make(chan error, 2) + starting := atomic.Bool{} + starting.Store(true) + go m.sshfsWait(cmd, &starting) + + err = m.detectSshfsStarted(ctx, st) + if starting.Swap(false) { + if err != nil { + m.done <- err + _ = m.proc.Kill() + } + } + return err +} + +func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) { + defer close(m.done) + err := cmd.Wait() + if err != nil { + var ex *exec.ExitError + if errors.As(err, &ex) { + if len(ex.Stderr) > 0 { + err = fmt.Errorf("%s: exit status %d", string(ex.Stderr), ex.ExitCode()) + } + } + log.Errorf("sshfs exited with %v", err) + } else { + log.Debug("sshfs exited normally") + } + + // Restore to unmounted state + m.mounted.Store(false) + if starting.Swap(false) { + if err != nil { + m.done <- err } + } + m.mounted.Store(false) + + // sshfs sometimes leave the mount point in a bad state. This will clean it up + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + go func() { + defer cancel() + _ = exec.Command("fusermount", "-uz", m.mountPoint).Run() }() + <-ctx.Done() +} + +func (m *mount) detectSshfsStarted(ctx context.Context, st *unix.Stat_t) error { + ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case err := <-m.done: + // sshfs command failed + return err + case <-ctx.Done(): + err := ctx.Err() + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + err = fmt.Errorf("timeout trying to stat mount point %q", m.mountPoint) + } + return err + case <-ticker.C: + if mountSt, err := statWithTimeout(ctx, m.mountPoint, 400*time.Millisecond); err == nil { + if st.Ino != mountSt.Ino || st.Dev != mountSt.Dev { + // Mount point changed, so we're done here + log.Debug("mountpoint inode or dev changed") + return nil + } + } else { + // we don't consider a failure to stat fatal here, just a cause for a retry. + if !errors.Is(err, context.DeadlineExceeded) { + log.Errorf("unable to stat mount point %q: %v", m.mountPoint, err) + } + } + } + } +} - // Let's wait a short while to check if the command errors. +// statWithTimeout performs a normal unix.Stat but will not allow that it hangs for +// more than the given timeout. +func statWithTimeout(ctx context.Context, path string, timeout time.Duration) (*unix.Stat_t, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + errCh := make(chan error, 1) + mountSt := new(unix.Stat_t) + go func() { + errCh <- unix.Stat(path, mountSt) + }() select { - case <-time.After(1 * time.Second): - // No errors so far. We're probably good. - log.Debugf("mount successful") - return nil - case err := <-done: - return err + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-errCh: + if err != nil { + return nil, err + } + return mountSt, nil } } @@ -145,21 +229,17 @@ func (m *mount) unmountVolume() (err error) { log.Errorf("failed to remove mountpoint %s: %v", m.mountPoint, err) } }() - if err := exec.Command("umount", m.mountPoint).Run(); err != nil { - log.Errorf("failed to unmount volumeDir %s: %v", m.mountPoint, err) - } if m.mounted.Load() { log.Debug("kindly asking sshfs to stop") - // _ = cmd.Process.Signal(os.Interrupt) + _ = m.proc.Signal(os.Interrupt) select { - case <-m.done: + case err = <-m.done: case <-time.After(5 * time.Second): log.Debug("forcing sshfs to stop") _ = m.proc.Kill() } - return nil } - return nil + return err } func (m *mount) appendVolumes(vols []*volume.Volume) []*volume.Volume {