Skip to content

Commit

Permalink
Merge pull request #3165 from telepresenceio/thallgren/kind-minikube-…
Browse files Browse the repository at this point in the history
…auto-discovery

Better discovery of local container-based clusters (kind or minikube).
  • Loading branch information
thallgren authored May 8, 2023
2 parents ff039d4 + 6fb5634 commit 6855291
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 61 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
now be configured in the `values.yaml` file using `timeouts.agentArrival`. The default
timeout is still 30 seconds.

- Bugfix: The automatic discovery of a local container based cluster (minikube or kind) used when the
Telepresence daemon runs in a container, now works on macOS and Windows, and with different profiles,
ports, and cluster names

- Bugfix: FTP Stability improvements. Multiple simultaneous intercepts can transfer large files in bidirectionally and in parallel.

- Bugfix: Pods using persistent volumes no longer causes timeouts when intercepted.
Expand Down
1 change: 1 addition & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ following Free and Open Source software:
github.com/Masterminds/semver/v3 v3.2.0 MIT license
github.com/Masterminds/sprig/v3 v3.2.3 MIT license
github.com/Masterminds/squirrel v1.5.3 MIT license
github.com/Microsoft/go-winio v0.6.0 MIT license
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d MIT license
github.com/beorn7/perks v1.0.1 MIT license
github.com/blang/semver v3.5.1+incompatible MIT license
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/datawire/go-fuseftp/rpc v0.4.0
github.com/datawire/k8sapi v0.1.2
github.com/datawire/metriton-go-client v0.1.1
github.com/docker/docker v20.10.24+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/godbus/dbus/v5 v5.1.0
github.com/golang/mock v1.6.0
Expand Down Expand Up @@ -71,6 +72,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.0 // indirect
github.com/Masterminds/sprig/v3 v3.2.3 // indirect
github.com/Masterminds/squirrel v1.5.3 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
Expand All @@ -81,7 +83,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/cli v20.10.22+incompatible // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.24+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBa
github.com/Masterminds/squirrel v1.5.3 h1:YPpoceAcxuzIljlr5iWpNKaql7hLeG1KLSrhvdHpkZc=
github.com/Masterminds/squirrel v1.5.3/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg=
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
github.com/Microsoft/hcsshim v0.9.6 h1:VwnDOgLeoi2du6dAznfmspNqTiwczvjv4K7NxuY9jsY=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d h1:UrqY+r/OJnIp5u0s1SbQ8dVfLCZJsnvazdBP5hS4iRs=
Expand Down
7 changes: 0 additions & 7 deletions pkg/client/cli/cmd/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/ann"
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/connect"
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/daemon"
"github.com/telepresenceio/telepresence/v2/pkg/client/docker"
"github.com/telepresenceio/telepresence/v2/pkg/errcat"
"github.com/telepresenceio/telepresence/v2/pkg/ioutil"
)
Expand Down Expand Up @@ -189,12 +188,6 @@ func (ha *HelmCommand) run(cmd *cobra.Command, _ []string) error {
NoHooks: ha.NoHooks,
}
ud := daemon.GetUserClient(ctx)
if ud.Remote && daemon.GetSession(ctx) == nil {
// This is needed here, because we never establish a session.
if err := docker.EnableK8SAuthenticator(ctx); err != nil {
return err
}
}
resp, err := ud.Helm(ctx, request)
if err != nil {
return err
Expand Down
13 changes: 7 additions & 6 deletions pkg/client/cli/connect/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
empty "google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -175,7 +176,12 @@ func ensureUserDaemon(ctx context.Context, required bool) (context.Context, erro
var ud *daemon.UserClient
if addr := client.GetEnv(ctx).UserDaemonAddress; addr != "" {
// Assume that the user daemon is running and connect to it using the given address instead of using a socket.
conn, err := docker.ConnectDaemon(ctx, addr)
// NOTE: The UserDaemonAddress does not imply that the daemon runs in Docker
conn, err := grpc.DialContext(ctx, addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithNoProxy(),
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true))
if err != nil {
return ctx, err
}
Expand Down Expand Up @@ -249,11 +255,6 @@ func connectSession(ctx context.Context, userD *daemon.UserClient, request *daem
if !required {
return nil, nil
}
if userD.Remote {
if err = docker.EnableK8SAuthenticator(ctx); err != nil {
return nil, err
}
}
if ci, err = userD.Connect(ctx, &request.ConnectRequest); err != nil {
return nil, err
}
Expand Down
202 changes: 155 additions & 47 deletions pkg/client/docker/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/netip"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
dockerClient "github.com/docker/docker/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
runtime2 "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"

Expand Down Expand Up @@ -137,17 +142,20 @@ func DiscoverDaemon(ctx context.Context, name string) (conn *grpc.ClientConn, er
// The host relies on that the daemon has exposed a port to localhost
addr = fmt.Sprintf(":%d", port)
}
return ConnectDaemon(ctx, addr)
return connectDaemon(ctx, addr)
}

// ConnectDaemon connects to a daemon at the given address.
func ConnectDaemon(ctx context.Context, address string) (conn *grpc.ClientConn, err error) {
// connectDaemon connects to a daemon at the given address.
func connectDaemon(ctx context.Context, address string) (conn *grpc.ClientConn, err error) {
if err = enableK8SAuthenticator(ctx); err != nil {
return nil, err
}
// Assume that the user daemon is running and connect to it using the given address instead of using a socket.
for i := 1; ; i++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}
conn, err := grpc.DialContext(ctx, address,
conn, err = grpc.DialContext(ctx, address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithNoProxy(),
grpc.WithBlock(),
Expand Down Expand Up @@ -261,7 +269,7 @@ func ensureAuthenticatorService(ctx context.Context, kubeFlags map[string]string
return startAuthenticatorService(ctx, portFile, kubeFlags, configFiles)
}

func EnableK8SAuthenticator(ctx context.Context) error {
func enableK8SAuthenticator(ctx context.Context) error {
cr := daemon.GetRequest(ctx)
dlog.Debugf(ctx, "kubeflags = %v", cr.KubeFlags)
configFlags, err := client.ConfigFlags(cr.KubeFlags)
Expand Down Expand Up @@ -315,11 +323,9 @@ func EnableK8SAuthenticator(ctx context.Context) error {
if err = os.MkdirAll(kubeConfigDir, 0o700); err != nil {
return err
}

// Special hack for the "kind-kind" cluster, replacing its 127.0.0.1 access on the host network with
// the docker internal control plane address
if cl, ok := config.Clusters["kind-kind"]; ok && strings.HasPrefix(cl.Server, "https://127.0.0.1:") {
cl.Server = "https://kind-control-plane:6443"
err = handleLocalK8s(ctx, cc.Cluster, config.Clusters[cc.Cluster])
if err != nil {
dlog.Errorf(ctx, "unable to handle local K8s: %v", err)
}

if err = clientcmd.WriteToFile(config, filepath.Join(kubeConfigDir, kubeConfigFile)); err != nil {
Expand All @@ -331,16 +337,78 @@ func EnableK8SAuthenticator(ctx context.Context) error {
return nil
}

// handleLocalK8s checks if the cluster is using a well known provider (currently minikube or kind)
// and ensures that the service is modified to access the docker internal address instead of an
// address available on the host.
func handleLocalK8s(ctx context.Context, clusterName string, cl *api.Cluster) error {
isKind := strings.HasPrefix(clusterName, "kind-")
isMinikube := false
if !isKind {
if ex, ok := cl.Extensions["cluster_info"].(*runtime2.Unknown); ok {
var data map[string]any
isMinikube = json.Unmarshal(ex.Raw, &data) == nil && data["provider"] == "minikube.sigs.k8s.io"
}
}
if !(isKind || isMinikube) {
return nil
}

server, err := url.Parse(cl.Server)
if err != nil {
return err
}
host, portStr, err := net.SplitHostPort(server.Host)
if err != nil {
return err
}
addr, err := netip.ParseAddr(host)
if err != nil {
if host == "localhost" {
addr = netip.AddrFrom4([4]byte{127, 0, 0, 1})
}
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return err
}

addrPort := netip.AddrPortFrom(addr, uint16(port))

// Let's check if we have a container with port bindings for the
// given addrPort that is a known k8sapi provider
cli, err := dockerClient.NewClientWithOpts(dockerClient.FromEnv, dockerClient.WithAPIVersionNegotiation())
if err != nil {
return err
}
defer cli.Close()
cjs := runningContainers(ctx, cli)

var hostPort, network string
if isKind {
hostPort, network = detectKind(cjs, addrPort)
} else if isMinikube {
hostPort, network = detectMinikube(cjs, addrPort, clusterName)
}
if hostPort != "" {
server.Host = hostPort
cl.Server = server.String()
}
if network != "" {
dcName := SafeContainerName(containerNamePrefix + clusterName)
if err = cli.NetworkConnect(ctx, network, dcName, nil); err != nil {
dlog.Debugf(ctx, "failed to connect network %s to container %s", network, dcName)
}
}
return nil
}

// LaunchDaemon ensures that the image returned by ClientImage exists by calling PullImage. It then uses the
// options DaemonOptions and DaemonArgs to start the image, and finally ConnectDaemon to connect to it. A
// options DaemonOptions and DaemonArgs to start the image, and finally connectDaemon to connect to it. A
// successful start yields a cache.DaemonInfo entry in the cache.
func LaunchDaemon(ctx context.Context, name string) (conn *grpc.ClientConn, err error) {
if proc.RunningInContainer() {
return nil, errors.New("unable to start a docker container from within a container")
}
if err != nil {
return nil, fmt.Errorf("failed to create cidfile: %v", err)
}
image := ClientImage(ctx)
if err = PullImage(ctx, image); err != nil {
return nil, err
Expand All @@ -362,9 +430,8 @@ func LaunchDaemon(ctx context.Context, name string) (conn *grpc.ClientConn, err
allArgs = append(allArgs, opts...)
allArgs = append(allArgs, image)
allArgs = append(allArgs, args...)
var cid string
for i := 1; ; i++ {
cid, err = tryLaunch(ctx, addr.Port, name, allArgs)
_, err = tryLaunch(ctx, addr.Port, name, allArgs)
if err != nil {
if i < 6 && strings.Contains(err.Error(), "already in use by container") {
// This may happen if the daemon has died (and hence, we never discovered it), but
Expand All @@ -376,44 +443,85 @@ func LaunchDaemon(ctx context.Context, name string) (conn *grpc.ClientConn, err
}
break
}
detectAndAddLocalKubeNetwork(ctx, "minikube", cid)
detectAndAddLocalKubeNetwork(ctx, "kind", cid)
return connectDaemon(ctx, addr.String())
}

return ConnectDaemon(ctx, addr.String())
// containerPort returns the port that the container uses internally to expose the given
// addrPort on the host. An empty string is returned when the addrPort is not found among
// the container's port bindings.
func containerPort(addrPort netip.AddrPort, ns *types.NetworkSettings) string {
for port, bindings := range ns.Ports {
for _, binding := range bindings {
addr, err := netip.ParseAddr(binding.HostIP)
if err != nil {
continue
}
pn, err := strconv.ParseUint(binding.HostPort, 10, 16)
if err != nil {
continue
}
if netip.AddrPortFrom(addr, uint16(pn)) == addrPort {
return port.Port()
}
}
}
return ""
}

// detectAndAddLocalKubeNetwork checks for a network with the given name, and adds it to the
// running container if present.
//
// This takes care of the use-case when Minikube or Kind is installed with a docker driver and
// uses the default docker network name.
func detectAndAddLocalKubeNetwork(ctx context.Context, network, cid string) {
// Check that the network exists and that at least one container is connected to it
stdout := bytes.Buffer{}
cmd := proc.CommandContext(ctx, "docker", "network", "inspect", network, "-f", "{{len .Containers}}")
cmd.DisableLogging = true
cmd.Stderr = io.Discard
cmd.Stdout = &stdout
if cmd.Run() != nil {
return
// runningContainers returns the inspect data for all containers with status=running.
func runningContainers(ctx context.Context, cli dockerClient.APIClient) []types.ContainerJSON {
cl, err := cli.ContainerList(ctx, types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{Key: "status", Value: "running"}),
})
if err != nil {
dlog.Errorf(ctx, "failed to list containers: %v", err)
return nil
}
cjs := make([]types.ContainerJSON, 0, len(cl))
for _, cn := range cl {
cj, err := cli.ContainerInspect(ctx, cn.ID)
if err != nil {
dlog.Errorf(ctx, "container inspect on %v failed: %v", cn.Names, err)
} else {
cjs = append(cjs, cj)
}
}
cnt, err := strconv.Atoi(strings.TrimSpace(stdout.String()))
if err != nil || cnt == 0 {
return
return cjs
}

// detectMinikube returns the container IP:port for the given hostAddrPort for a container where the
// "name.minikube.sigs.k8s.io" label is equal to the given cluster name.
// Returns the internal IP:port for the given hostAddrPort and the name of a network that makes the
// IP available.
func detectMinikube(cns []types.ContainerJSON, hostAddrPort netip.AddrPort, clusterName string) (string, string) {
for _, cn := range cns {
if cfg, ns := cn.Config, cn.NetworkSettings; cfg != nil && ns != nil && cfg.Labels["name.minikube.sigs.k8s.io"] == clusterName {
if port := containerPort(hostAddrPort, ns); port != "" {
for networkName, network := range ns.Networks {
return net.JoinHostPort(network.IPAddress, port), networkName
}
}
}
}
return "", ""
}

// The network is present. Let's try and add it.
stderr := bytes.Buffer{}
cmd = proc.StdCommand(ctx, "docker", "network", "connect", network, cid)
cmd.Stderr = &stderr
cmd.Stdout = io.Discard
if err := cmd.Run(); err != nil {
es := stderr.String()
if es != "" {
dlog.Error(ctx, es)
// detectKind returns the container hostname:port for the given hostAddrPort for a container where the
// "io.x-k8s.kind.role" label is equal to "control-plane".
// Returns the internal hostname:port for the given hostAddrPort and the name of a network that makes the
// hostname available.
func detectKind(cns []types.ContainerJSON, hostAddrPort netip.AddrPort) (string, string) {
for _, cn := range cns {
if cfg, ns := cn.Config, cn.NetworkSettings; cfg != nil && ns != nil && cfg.Labels["io.x-k8s.kind.role"] == "control-plane" {
if port := containerPort(hostAddrPort, ns); port != "" {
hostPort := net.JoinHostPort(cfg.Hostname, port)
for networkName := range ns.Networks {
return hostPort, networkName
}
}
}
dlog.Errorf(ctx, "failed to add %s network: %v", network, err)
}
return "", ""
}

func tryLaunch(ctx context.Context, port int, name string, args []string) (string, error) {
Expand Down

0 comments on commit 6855291

Please sign in to comment.