|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /usr/src/cloud-init/tests/unittests/config/ |
Upload File : |
# This file is part of cloud-init. See LICENSE file for license information.
import logging
import os
import re
from io import StringIO
import pytest
from cloudinit import helpers, util
from cloudinit.config.cc_snap import add_assertions, handle, run_commands
from cloudinit.config.schema import (
SchemaValidationError,
get_schema,
validate_cloudconfig_schema,
)
from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema
from tests.unittests.util import get_cloud
M_PATH = "cloudinit.config.cc_snap."
ASSERTIONS_FILE = "/var/lib/cloud/instance/snapd.assertions"
SYSTEM_USER_ASSERTION = """\
type: system-user
authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
email: foo@bar.com
password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt
series:
- 16
since: 2016-09-10T16:34:00+03:00
until: 2017-11-10T16:34:00+03:00
username: baz
sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj
AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP
Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI
zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF
s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj
+to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP
Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS
d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q
BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H
f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V
v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q=="""
ACCOUNT_ASSERTION = """\
type: account-key
authority-id: canonical
revision: 2
public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0
account-id: canonical
name: store
since: 2016-04-01T00:00:00.0Z
body-length: 717
sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH
AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j
qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482
vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ
UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK
Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG
o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl
VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9
2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an
Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc
vUvV7RjVzv17ut0AEQEAAQ==
AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM
WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b
nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL
3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL
eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY
inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1
rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+
rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE
aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ
6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO
haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF
yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9
HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi
skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK
CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde
ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF
qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR
IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t
oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k"""
@pytest.fixture()
def fake_cloud(tmpdir):
paths = helpers.Paths(
{
"cloud_dir": tmpdir.join("cloud"),
"run_dir": tmpdir.join("cloud-init"),
"templates_dir": tmpdir.join("templates"),
}
)
cloud = get_cloud(paths=paths)
yield cloud
class TestAddAssertions:
@mock.patch("cloudinit.config.cc_snap.subp.subp")
def test_add_assertions_on_empty_list(self, m_subp, caplog, tmpdir):
"""When provided with an empty list, add_assertions does nothing."""
assert_file = tmpdir.join("snapd.assertions")
add_assertions([], assert_file)
assert not caplog.text
assert 0 == m_subp.call_count
def test_add_assertions_on_non_list_or_dict(self, tmpdir):
"""When provided an invalid type, add_assertions raises an error."""
assert_file = tmpdir.join("snapd.assertions")
with pytest.raises(
TypeError,
match="assertion parameter was not a list or dict: I'm Not Valid",
):
add_assertions("I'm Not Valid", assert_file)
@mock.patch("cloudinit.config.cc_snap.subp.subp")
def test_add_assertions_adds_assertions_as_list(
self, m_subp, caplog, tmpdir
):
"""When provided with a list, add_assertions adds all assertions."""
assert_file = tmpdir.join("snapd.assertions")
assertions = [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]
add_assertions(assertions, assert_file)
assert "Importing user-provided snap assertions" in caplog.text
assert "sertions" in caplog.text
assert [
mock.call(["snap", "ack", assert_file], capture=True)
] == m_subp.call_args_list
compare_file = tmpdir.join("comparison")
util.write_file(compare_file, "\n".join(assertions).encode("utf-8"))
assert util.load_file(compare_file) == util.load_file(assert_file)
@mock.patch("cloudinit.config.cc_snap.subp.subp")
def test_add_assertions_adds_assertions_as_dict(
self, m_subp, caplog, tmpdir
):
"""When provided with a dict, add_assertions adds all assertions."""
assert_file = tmpdir.join("snapd.assertions")
assertions = {"00": SYSTEM_USER_ASSERTION, "01": ACCOUNT_ASSERTION}
add_assertions(assertions, assert_file)
assert "Importing user-provided snap assertions" in caplog.text
assert (
M_PATH[:-1],
logging.DEBUG,
"Snap acking: ['type: system-user', 'authority-id: "
"LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp']",
) in caplog.record_tuples
assert (
M_PATH[:-1],
logging.DEBUG,
"Snap acking: ['type: account-key', 'authority-id: canonical']",
) in caplog.record_tuples
assert [
mock.call(["snap", "ack", assert_file], capture=True)
] == m_subp.call_args_list
compare_file = tmpdir.join("comparison")
combined = "\n".join(assertions.values())
util.write_file(compare_file, combined.encode("utf-8"))
assert util.load_file(compare_file) == util.load_file(assert_file)
class TestRunCommands(CiTestCase):
with_logs = True
allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
def setUp(self):
super(TestRunCommands, self).setUp()
self.tmp = self.tmp_dir()
@mock.patch("cloudinit.config.cc_snap.subp.subp")
def test_run_commands_on_empty_list(self, m_subp):
"""When provided with an empty list, run_commands does nothing."""
run_commands([])
self.assertEqual("", self.logs.getvalue())
m_subp.assert_not_called()
def test_run_commands_on_non_list_or_dict(self):
"""When provided an invalid type, run_commands raises an error."""
with self.assertRaises(TypeError) as context_manager:
run_commands(commands="I'm Not Valid")
self.assertEqual(
"commands parameter was not a list or dict: I'm Not Valid",
str(context_manager.exception),
)
def test_run_command_logs_commands_and_exit_codes_to_stderr(self):
"""All exit codes are logged to stderr."""
outfile = self.tmp_path("output.log", dir=self.tmp)
cmd1 = 'echo "HI" >> %s' % outfile
cmd2 = "bogus command"
cmd3 = 'echo "MOM" >> %s' % outfile
commands = [cmd1, cmd2, cmd3]
mock_path = "cloudinit.config.cc_snap.sys.stderr"
with mock.patch(mock_path, new_callable=StringIO) as m_stderr:
with self.assertRaises(RuntimeError) as context_manager:
run_commands(commands=commands)
self.assertIsNotNone(
re.search(
r"bogus: (command )?not found", str(context_manager.exception)
),
msg="Expected bogus command not found",
)
expected_stderr_log = "\n".join(
[
"Begin run command: {cmd}".format(cmd=cmd1),
"End run command: exit(0)",
"Begin run command: {cmd}".format(cmd=cmd2),
"ERROR: End run command: exit(127)",
"Begin run command: {cmd}".format(cmd=cmd3),
"End run command: exit(0)\n",
]
)
self.assertEqual(expected_stderr_log, m_stderr.getvalue())
def test_run_command_as_lists(self):
"""When commands are specified as a list, run them in order."""
outfile = self.tmp_path("output.log", dir=self.tmp)
cmd1 = 'echo "HI" >> %s' % outfile
cmd2 = 'echo "MOM" >> %s' % outfile
commands = [cmd1, cmd2]
mock_path = "cloudinit.config.cc_snap.sys.stderr"
with mock.patch(mock_path, new_callable=StringIO):
run_commands(commands=commands)
self.assertIn(
"DEBUG: Running user-provided snap commands", self.logs.getvalue()
)
self.assertEqual("HI\nMOM\n", util.load_file(outfile))
self.assertIn(
"WARNING: Non-snap commands in snap config:", self.logs.getvalue()
)
def test_run_command_dict_sorted_as_command_script(self):
"""When commands are a dict, sort them and run."""
outfile = self.tmp_path("output.log", dir=self.tmp)
cmd1 = 'echo "HI" >> %s' % outfile
cmd2 = 'echo "MOM" >> %s' % outfile
commands = {"02": cmd1, "01": cmd2}
mock_path = "cloudinit.config.cc_snap.sys.stderr"
with mock.patch(mock_path, new_callable=StringIO):
run_commands(commands=commands)
expected_messages = ["DEBUG: Running user-provided snap commands"]
for message in expected_messages:
self.assertIn(message, self.logs.getvalue())
self.assertEqual("MOM\nHI\n", util.load_file(outfile))
@skipUnlessJsonSchema()
class TestSnapSchema:
@pytest.mark.parametrize(
"config, error_msg",
[
# Valid
({"snap": {"commands": ["valid"]}}, None),
({"snap": {"commands": {"01": "also valid"}}}, None),
({"snap": {"assertions": ["valid"]}}, None),
({"snap": {"assertions": {"01": "also valid"}}}, None),
({"commands": [["echo", "bye"], ["echo", "bye"]]}, None),
({"commands": ["echo bye", "echo bye"]}, None),
(
{"commands": {"00": ["echo", "bye"], "01": ["echo", "bye"]}},
None,
),
({"commands": {"00": "echo bye", "01": "echo bye"}}, None),
# Invalid
({"snap": "wrong type"}, "'wrong type' is not of type 'object'"),
(
{"snap": {"commands": ["ls"], "invalid-key": ""}},
"Additional properties are not allowed",
),
({"snap": {}}, "{} does not have enough properties"),
(
{"snap": {"commands": "broken"}},
"'broken' is not of type 'object', 'array'",
),
({"snap": {"commands": []}}, r"snap.commands: \[\] is too short"),
(
{"snap": {"commands": {}}},
r"snap.commands: {} does not have enough properties",
),
({"snap": {"commands": [123]}}, ""),
({"snap": {"commands": {"01": 123}}}, ""),
({"snap": {"commands": [["snap", "install", 123]]}}, ""),
({"snap": {"commands": {"01": ["snap", "install", 123]}}}, ""),
({"snap": {"assertions": [123]}}, "123 is not of type 'string'"),
(
{"snap": {"assertions": {"01": 123}}},
"123 is not of type 'string'",
),
(
{"snap": {"assertions": "broken"}},
"'broken' is not of type 'object', 'array'",
),
({"snap": {"assertions": []}}, r"\[\] is too short"),
(
{"snap": {"assertions": {}}},
r"\{} does not have enough properties",
),
],
)
@skipUnlessJsonSchema()
def test_schema_validation(self, config, error_msg):
if error_msg is None:
validate_cloudconfig_schema(config, get_schema(), strict=True)
else:
with pytest.raises(SchemaValidationError, match=error_msg):
validate_cloudconfig_schema(config, get_schema(), strict=True)
class TestHandle:
@mock.patch("cloudinit.config.cc_snap.subp.subp")
def test_handle_adds_assertions(self, m_subp, fake_cloud, tmpdir):
"""Any configured snap assertions are provided to add_assertions."""
assert_file = os.path.join(
fake_cloud.paths.get_ipath_cur(), "snapd.assertions"
)
compare_file = tmpdir.join("comparison")
cfg = {
"snap": {"assertions": [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]}
}
handle("snap", cfg=cfg, cloud=fake_cloud, args=None)
content = "\n".join(cfg["snap"]["assertions"])
util.write_file(compare_file, content.encode("utf-8"))
assert util.load_file(compare_file) == util.load_file(assert_file)
# vi: ts=4 expandtab