Skip to content

Commit

Permalink
Use snapd REST api to change snap config (#138)
Browse files Browse the repository at this point in the history
Use the snapd REST API to change snap configurations instead of using the snap command.

This improves security in case a secret in command line arguments are logged.
  • Loading branch information
weiiwang01 authored Dec 1, 2024
1 parent a1aaa35 commit 8e034ef
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 21 deletions.
45 changes: 37 additions & 8 deletions lib/charms/operator_libs_linux/v2/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
Expand All @@ -83,7 +84,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 7
LIBPATCH = 8


# Regex to locate 7-bit C1 ANSI sequences
Expand Down Expand Up @@ -332,19 +333,17 @@ def get(self, key: Optional[str], *, typed: bool = False) -> Any:

return self._snap("get", [key]).strip()

def set(self, config: Dict[str, Any], *, typed: bool = False) -> str:
def set(self, config: Dict[str, Any], *, typed: bool = False) -> None:
"""Set a snap configuration value.
Args:
config: a dictionary containing keys and values specifying the config to set.
typed: set to True to convert all values in the config into typed values while
configuring the snap (set with typed=True). Default is not to convert.
"""
if typed:
kv = [f"{key}={json.dumps(val)}" for key, val in config.items()]
return self._snap("set", ["-t"] + kv)

return self._snap("set", [f"{key}={val}" for key, val in config.items()])
if not typed:
config = {k: str(v) for k, v in config.items()}
self._snap_client._put_snap_conf(self._name, config)

def unset(self, key) -> str:
"""Unset a snap configuration value.
Expand Down Expand Up @@ -770,7 +769,33 @@ def _request(
headers["Content-Type"] = "application/json"

response = self._request_raw(method, path, query, headers, data)
return json.loads(response.read().decode())["result"]
response = json.loads(response.read().decode())
if response["type"] == "async":
return self._wait(response["change"])
return response["result"]

def _wait(self, change_id: str, timeout=300) -> JSONType:
"""Wait for an async change to complete.
The poll time is 100 milliseconds, the same as in snap clients.
"""
deadline = time.time() + timeout
while True:
if time.time() > deadline:
raise TimeoutError(f"timeout waiting for snap change {change_id}")
response = self._request("GET", f"changes/{change_id}")
status = response["status"]
if status == "Done":
return response.get("data")
if status == "Doing":
time.sleep(0.1)
continue
if status == "Wait":
logger.warning("snap change %s succeeded with status 'Wait'", change_id)
return response.get("data")
raise SnapError(
f"snap change {response.get('kind')!r} id {change_id} failed with status {status}"
)

def _request_raw(
self,
Expand Down Expand Up @@ -818,6 +843,10 @@ def get_installed_snap_apps(self, name: str) -> List:
"""Query the snap server for apps belonging to a named, currently installed snap."""
return self._request("GET", "apps", {"names": name, "select": "service"})

def _put_snap_conf(self, name: str, conf: Dict[str, Any]):
"""Set the configuration details for an installed snap."""
return self._request("PUT", f"snaps/{name}/conf", body=conf)


class SnapCache(Mapping):
"""An abstraction to represent installed/available packages.
Expand Down
13 changes: 11 additions & 2 deletions tests/integration/test_snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging
import re
import time
from datetime import datetime, timedelta
from subprocess import CalledProcessError, check_output, run

Expand Down Expand Up @@ -65,7 +66,11 @@ def test_snap_refresh():
def test_snap_set_and_get_with_typed():
cache = snap.SnapCache()
lxd = cache["lxd"]
lxd.ensure(snap.SnapState.Latest, channel="latest")
try:
lxd.ensure(snap.SnapState.Latest, channel="latest")
except snap.SnapError:
time.sleep(60)
lxd.ensure(snap.SnapState.Latest, channel="latest")
configs = {
"true": True,
"false": False,
Expand Down Expand Up @@ -137,7 +142,11 @@ def test_snap_set_and_get_with_typed():
def test_snap_set_and_get_untyped():
cache = snap.SnapCache()
lxd = cache["lxd"]
lxd.ensure(snap.SnapState.Latest, channel="latest")
try:
lxd.ensure(snap.SnapState.Latest, channel="latest")
except snap.SnapError:
time.sleep(60)
lxd.ensure(snap.SnapState.Latest, channel="latest")

lxd.set({"foo": "true", "bar": True}, typed=False)
assert lxd.get("foo", typed=False) == "true"
Expand Down
156 changes: 145 additions & 11 deletions tests/unit/test_snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
# pyright: reportPrivateUsage=false

import datetime
import io
import json
import time
import typing
import unittest
from subprocess import CalledProcessError
from typing import Any, Dict, Iterable, Optional
Expand Down Expand Up @@ -697,6 +700,141 @@ def test_request_raw_bad_response_raises_snapapierror(self):
finally:
shutdown()

def test_wait_changes(self):
change_finished = False

def _request_raw(
method: str,
path: str,
query: Dict = None,
headers: Dict = None,
data: bytes = None,
) -> typing.IO[bytes]:
nonlocal change_finished
if method == "PUT" and path == "snaps/test/conf":
return io.BytesIO(
json.dumps(
{
"type": "async",
"status-code": 202,
"status": "Accepted",
"result": None,
"change": "97",
}
).encode("utf-8")
)
if method == "GET" and path == "changes/97" and not change_finished:
change_finished = True
return io.BytesIO(
json.dumps(
{
"type": "sync",
"status-code": 200,
"status": "OK",
"result": {
"id": "97",
"kind": "configure-snap",
"summary": 'Change configuration of "test" snap',
"status": "Doing",
"tasks": [
{
"id": "1029",
"kind": "run-hook",
"summary": 'Run configure hook of "test" snap',
"status": "Doing",
"progress": {"label": "", "done": 1, "total": 1},
"spawn-time": "2024-11-28T20:02:47.498399651+00:00",
"data": {"affected-snaps": ["test"]},
}
],
"ready": False,
"spawn-time": "2024-11-28T20:02:47.49842583+00:00",
},
}
).encode("utf-8")
)
if method == "GET" and path == "changes/97" and change_finished:
return io.BytesIO(
json.dumps(
{
"type": "sync",
"status-code": 200,
"status": "OK",
"result": {
"id": "98",
"kind": "configure-snap",
"summary": 'Change configuration of "test" snap',
"status": "Done",
"tasks": [
{
"id": "1030",
"kind": "run-hook",
"summary": 'Run configure hook of "test" snap',
"status": "Done",
"progress": {"label": "", "done": 1, "total": 1},
"spawn-time": "2024-11-28T20:06:41.415929854+00:00",
"ready-time": "2024-11-28T20:06:41.797437537+00:00",
"data": {"affected-snaps": ["test"]},
}
],
"ready": True,
"spawn-time": "2024-11-28T20:06:41.415955681+00:00",
"ready-time": "2024-11-28T20:06:41.797440022+00:00",
},
}
).encode("utf-8")
)
raise RuntimeError("unknown request")

client = snap.SnapClient()
with patch.object(client, "_request_raw", _request_raw), patch.object(time, "sleep"):
client._put_snap_conf("test", {"foo": "bar"})

def test_wait_failed(self):
def _request_raw(
method: str,
path: str,
query: Dict = None,
headers: Dict = None,
data: bytes = None,
) -> typing.IO[bytes]:
if method == "PUT" and path == "snaps/test/conf":
return io.BytesIO(
json.dumps(
{
"type": "async",
"status-code": 202,
"status": "Accepted",
"result": None,
"change": "97",
}
).encode("utf-8")
)
if method == "GET" and path == "changes/97":
return io.BytesIO(
json.dumps(
{
"type": "sync",
"status-code": 200,
"status": "OK",
"result": {
"id": "97",
"kind": "configure-snap",
"summary": 'Change configuration of "test" snap',
"status": "Error",
"ready": False,
"spawn-time": "2024-11-28T20:02:47.49842583+00:00",
},
}
).encode("utf-8")
)
raise RuntimeError("unknown request")

client = snap.SnapClient()
with patch.object(client, "_request_raw", _request_raw), patch.object(time, "sleep"):
with self.assertRaises(snap.SnapError):
client._put_snap_conf("test", {"foo": "bar"})


class TestSnapBareMethods(unittest.TestCase):
@patch("builtins.open", new_callable=mock_open, read_data="curl\n")
Expand Down Expand Up @@ -902,28 +1040,24 @@ def fake_snap(command: str, optargs: Optional[Iterable[str]] = None) -> str:
with self.assertRaises(TypeError):
foo.get(None) # pyright: ignore[reportArgumentType]

@patch("charms.operator_libs_linux.v2.snap.subprocess.check_output")
def test_snap_set_typed(self, mock_subprocess):
@patch("charms.operator_libs_linux.v2.snap.SnapClient._put_snap_conf")
def test_snap_set_typed(self, put_snap_conf):
foo = snap.Snap("foo", snap.SnapState.Present, "stable", "1", "classic")

config = {"n": 42, "s": "string", "d": {"nested": True}}

foo.set(config, typed=True)
mock_subprocess.assert_called_with(
["snap", "set", "foo", "-t", "n=42", 's="string"', 'd={"nested": true}'],
universal_newlines=True,
)
put_snap_conf.assert_called_with("foo", {"n": 42, "s": "string", "d": {"nested": True}})

@patch("charms.operator_libs_linux.v2.snap.subprocess.check_output")
def test_snap_set_untyped(self, mock_subprocess):
@patch("charms.operator_libs_linux.v2.snap.SnapClient._put_snap_conf")
def test_snap_set_untyped(self, put_snap_conf):
foo = snap.Snap("foo", snap.SnapState.Present, "stable", "1", "classic")

config = {"n": 42, "s": "string", "d": {"nested": True}}

foo.set(config, typed=False)
mock_subprocess.assert_called_with(
["snap", "set", "foo", "n=42", "s=string", "d={'nested': True}"],
universal_newlines=True,
put_snap_conf.assert_called_with(
"foo", {"n": "42", "s": "string", "d": "{'nested': True}"}
)

@patch(
Expand Down

0 comments on commit 8e034ef

Please sign in to comment.