Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/config/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/config/test_cc_snap.py
# This file is part of cloud-init. See LICENSE file for license information.

import logging
import os
import re
from io import StringIO

import pytest

from cloudinit import helpers, util
from cloudinit.config.cc_snap import add_assertions, handle, run_commands
from cloudinit.config.schema import (
    SchemaValidationError,
    get_schema,
    validate_cloudconfig_schema,
)
from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema
from tests.unittests.util import get_cloud

M_PATH = "cloudinit.config.cc_snap."
ASSERTIONS_FILE = "/var/lib/cloud/instance/snapd.assertions"

SYSTEM_USER_ASSERTION = """\
type: system-user
authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
email: foo@bar.com
password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt
series:
- 16
since: 2016-09-10T16:34:00+03:00
until: 2017-11-10T16:34:00+03:00
username: baz
sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj

AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP
Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI
zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF
s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj
+to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP
Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS
d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q
BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H
f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V
v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q=="""

ACCOUNT_ASSERTION = """\
type: account-key
authority-id: canonical
revision: 2
public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0
account-id: canonical
name: store
since: 2016-04-01T00:00:00.0Z
body-length: 717
sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH

AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j
qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482
vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ
UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK
Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG
o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl
VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9
2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an
Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc
vUvV7RjVzv17ut0AEQEAAQ==

AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM
WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b
nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL
3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL
eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY
inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1
rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+
rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE
aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ
6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO
haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF
yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9
HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi
skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK
CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde
ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF
qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR
IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t
oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k"""


@pytest.fixture()
def fake_cloud(tmpdir):
    paths = helpers.Paths(
        {
            "cloud_dir": tmpdir.join("cloud"),
            "run_dir": tmpdir.join("cloud-init"),
            "templates_dir": tmpdir.join("templates"),
        }
    )
    cloud = get_cloud(paths=paths)
    yield cloud


class TestAddAssertions:
    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_add_assertions_on_empty_list(self, m_subp, caplog, tmpdir):
        """When provided with an empty list, add_assertions does nothing."""
        assert_file = tmpdir.join("snapd.assertions")
        add_assertions([], assert_file)
        assert not caplog.text
        assert 0 == m_subp.call_count

    def test_add_assertions_on_non_list_or_dict(self, tmpdir):
        """When provided an invalid type, add_assertions raises an error."""
        assert_file = tmpdir.join("snapd.assertions")
        with pytest.raises(
            TypeError,
            match="assertion parameter was not a list or dict: I'm Not Valid",
        ):
            add_assertions("I'm Not Valid", assert_file)

    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_add_assertions_adds_assertions_as_list(
        self, m_subp, caplog, tmpdir
    ):
        """When provided with a list, add_assertions adds all assertions."""
        assert_file = tmpdir.join("snapd.assertions")
        assertions = [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]
        add_assertions(assertions, assert_file)
        assert "Importing user-provided snap assertions" in caplog.text
        assert "sertions" in caplog.text
        assert [
            mock.call(["snap", "ack", assert_file], capture=True)
        ] == m_subp.call_args_list
        compare_file = tmpdir.join("comparison")
        util.write_file(compare_file, "\n".join(assertions).encode("utf-8"))
        assert util.load_file(compare_file) == util.load_file(assert_file)

    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_add_assertions_adds_assertions_as_dict(
        self, m_subp, caplog, tmpdir
    ):
        """When provided with a dict, add_assertions adds all assertions."""
        assert_file = tmpdir.join("snapd.assertions")
        assertions = {"00": SYSTEM_USER_ASSERTION, "01": ACCOUNT_ASSERTION}
        add_assertions(assertions, assert_file)
        assert "Importing user-provided snap assertions" in caplog.text
        assert (
            M_PATH[:-1],
            logging.DEBUG,
            "Snap acking: ['type: system-user', 'authority-id: "
            "LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp']",
        ) in caplog.record_tuples
        assert (
            M_PATH[:-1],
            logging.DEBUG,
            "Snap acking: ['type: account-key', 'authority-id: canonical']",
        ) in caplog.record_tuples
        assert [
            mock.call(["snap", "ack", assert_file], capture=True)
        ] == m_subp.call_args_list
        compare_file = tmpdir.join("comparison")
        combined = "\n".join(assertions.values())
        util.write_file(compare_file, combined.encode("utf-8"))
        assert util.load_file(compare_file) == util.load_file(assert_file)


class TestRunCommands(CiTestCase):

    with_logs = True
    allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]

    def setUp(self):
        super(TestRunCommands, self).setUp()
        self.tmp = self.tmp_dir()

    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_run_commands_on_empty_list(self, m_subp):
        """When provided with an empty list, run_commands does nothing."""
        run_commands([])
        self.assertEqual("", self.logs.getvalue())
        m_subp.assert_not_called()

    def test_run_commands_on_non_list_or_dict(self):
        """When provided an invalid type, run_commands raises an error."""
        with self.assertRaises(TypeError) as context_manager:
            run_commands(commands="I'm Not Valid")
        self.assertEqual(
            "commands parameter was not a list or dict: I'm Not Valid",
            str(context_manager.exception),
        )

    def test_run_command_logs_commands_and_exit_codes_to_stderr(self):
        """All exit codes are logged to stderr."""
        outfile = self.tmp_path("output.log", dir=self.tmp)

        cmd1 = 'echo "HI" >> %s' % outfile
        cmd2 = "bogus command"
        cmd3 = 'echo "MOM" >> %s' % outfile
        commands = [cmd1, cmd2, cmd3]

        mock_path = "cloudinit.config.cc_snap.sys.stderr"
        with mock.patch(mock_path, new_callable=StringIO) as m_stderr:
            with self.assertRaises(RuntimeError) as context_manager:
                run_commands(commands=commands)

        self.assertIsNotNone(
            re.search(
                r"bogus: (command )?not found", str(context_manager.exception)
            ),
            msg="Expected bogus command not found",
        )
        expected_stderr_log = "\n".join(
            [
                "Begin run command: {cmd}".format(cmd=cmd1),
                "End run command: exit(0)",
                "Begin run command: {cmd}".format(cmd=cmd2),
                "ERROR: End run command: exit(127)",
                "Begin run command: {cmd}".format(cmd=cmd3),
                "End run command: exit(0)\n",
            ]
        )
        self.assertEqual(expected_stderr_log, m_stderr.getvalue())

    def test_run_command_as_lists(self):
        """When commands are specified as a list, run them in order."""
        outfile = self.tmp_path("output.log", dir=self.tmp)

        cmd1 = 'echo "HI" >> %s' % outfile
        cmd2 = 'echo "MOM" >> %s' % outfile
        commands = [cmd1, cmd2]
        mock_path = "cloudinit.config.cc_snap.sys.stderr"
        with mock.patch(mock_path, new_callable=StringIO):
            run_commands(commands=commands)

        self.assertIn(
            "DEBUG: Running user-provided snap commands", self.logs.getvalue()
        )
        self.assertEqual("HI\nMOM\n", util.load_file(outfile))
        self.assertIn(
            "WARNING: Non-snap commands in snap config:", self.logs.getvalue()
        )

    def test_run_command_dict_sorted_as_command_script(self):
        """When commands are a dict, sort them and run."""
        outfile = self.tmp_path("output.log", dir=self.tmp)
        cmd1 = 'echo "HI" >> %s' % outfile
        cmd2 = 'echo "MOM" >> %s' % outfile
        commands = {"02": cmd1, "01": cmd2}
        mock_path = "cloudinit.config.cc_snap.sys.stderr"
        with mock.patch(mock_path, new_callable=StringIO):
            run_commands(commands=commands)

        expected_messages = ["DEBUG: Running user-provided snap commands"]
        for message in expected_messages:
            self.assertIn(message, self.logs.getvalue())
        self.assertEqual("MOM\nHI\n", util.load_file(outfile))


@skipUnlessJsonSchema()
class TestSnapSchema:
    @pytest.mark.parametrize(
        "config, error_msg",
        [
            # Valid
            ({"snap": {"commands": ["valid"]}}, None),
            ({"snap": {"commands": {"01": "also valid"}}}, None),
            ({"snap": {"assertions": ["valid"]}}, None),
            ({"snap": {"assertions": {"01": "also valid"}}}, None),
            ({"commands": [["echo", "bye"], ["echo", "bye"]]}, None),
            ({"commands": ["echo bye", "echo bye"]}, None),
            (
                {"commands": {"00": ["echo", "bye"], "01": ["echo", "bye"]}},
                None,
            ),
            ({"commands": {"00": "echo bye", "01": "echo bye"}}, None),
            # Invalid
            ({"snap": "wrong type"}, "'wrong type' is not of type 'object'"),
            (
                {"snap": {"commands": ["ls"], "invalid-key": ""}},
                "Additional properties are not allowed",
            ),
            ({"snap": {}}, "{} does not have enough properties"),
            (
                {"snap": {"commands": "broken"}},
                "'broken' is not of type 'object', 'array'",
            ),
            ({"snap": {"commands": []}}, r"snap.commands: \[\] is too short"),
            (
                {"snap": {"commands": {}}},
                r"snap.commands: {} does not have enough properties",
            ),
            ({"snap": {"commands": [123]}}, ""),
            ({"snap": {"commands": {"01": 123}}}, ""),
            ({"snap": {"commands": [["snap", "install", 123]]}}, ""),
            ({"snap": {"commands": {"01": ["snap", "install", 123]}}}, ""),
            ({"snap": {"assertions": [123]}}, "123 is not of type 'string'"),
            (
                {"snap": {"assertions": {"01": 123}}},
                "123 is not of type 'string'",
            ),
            (
                {"snap": {"assertions": "broken"}},
                "'broken' is not of type 'object', 'array'",
            ),
            ({"snap": {"assertions": []}}, r"\[\] is too short"),
            (
                {"snap": {"assertions": {}}},
                r"\{} does not have enough properties",
            ),
        ],
    )
    @skipUnlessJsonSchema()
    def test_schema_validation(self, config, error_msg):
        if error_msg is None:
            validate_cloudconfig_schema(config, get_schema(), strict=True)
        else:
            with pytest.raises(SchemaValidationError, match=error_msg):
                validate_cloudconfig_schema(config, get_schema(), strict=True)


class TestHandle:
    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_handle_adds_assertions(self, m_subp, fake_cloud, tmpdir):
        """Any configured snap assertions are provided to add_assertions."""
        assert_file = os.path.join(
            fake_cloud.paths.get_ipath_cur(), "snapd.assertions"
        )
        compare_file = tmpdir.join("comparison")
        cfg = {
            "snap": {"assertions": [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]}
        }
        handle("snap", cfg=cfg, cloud=fake_cloud, args=None)
        content = "\n".join(cfg["snap"]["assertions"])
        util.write_file(compare_file, content.encode("utf-8"))
        assert util.load_file(compare_file) == util.load_file(assert_file)


# vi: ts=4 expandtab

haha - 2025