Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/config/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/config/test_cc_wireguard.py
# This file is part of cloud-init. See LICENSE file for license information.
import pytest

from cloudinit import subp, util
from cloudinit.config import cc_wireguard
from cloudinit.config.schema import (
    SchemaValidationError,
    get_schema,
    validate_cloudconfig_schema,
)
from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema

NL = "\n"
# Module path used in mocks
MPATH = "cloudinit.config.cc_wireguard"
MIN_KERNEL_VERSION = (5, 6)


class FakeCloud:
    def __init__(self, distro):
        self.distro = distro


class TestWireGuard(CiTestCase):

    with_logs = True
    allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]

    def setUp(self):
        super(TestWireGuard, self).setUp()
        self.tmp = self.tmp_dir()

    def test_readiness_probe_schema_non_string_values(self):
        """ValueError raised for any values expected as string type."""
        wg_readinessprobes = [1, ["not-a-valid-command"]]
        errors = [
            "Expected a string for readinessprobe at 0. Found 1",
            "Expected a string for readinessprobe at 1."
            " Found ['not-a-valid-command']",
        ]
        with self.assertRaises(ValueError) as context_mgr:
            cc_wireguard.readinessprobe_command_validation(wg_readinessprobes)
        error_msg = str(context_mgr.exception)
        for error in errors:
            self.assertIn(error, error_msg)

    def test_suppl_schema_error_on_missing_keys(self):
        """ValueError raised reporting any missing required keys"""
        cfg = {}
        match = (
            f"Invalid wireguard interface configuration:{NL}"
            "Missing required wg:interfaces keys: config_path, content, name"
        )
        with self.assertRaisesRegex(ValueError, match):
            cc_wireguard.supplemental_schema_validation(cfg)

    def test_suppl_schema_error_on_non_string_values(self):
        """ValueError raised for any values expected as string type."""
        cfg = {"name": 1, "config_path": 2, "content": 3}
        errors = [
            "Expected a string for wg:interfaces:config_path. Found 2",
            "Expected a string for wg:interfaces:content. Found 3",
            "Expected a string for wg:interfaces:name. Found 1",
        ]
        with self.assertRaises(ValueError) as context_mgr:
            cc_wireguard.supplemental_schema_validation(cfg)
        error_msg = str(context_mgr.exception)
        for error in errors:
            self.assertIn(error, error_msg)

    def test_write_config_failed(self):
        """Errors when writing config are raised."""
        wg_int = {"name": "wg0", "config_path": "/no/valid/path"}

        with self.assertRaises(RuntimeError) as context_mgr:
            cc_wireguard.write_config(wg_int)
        self.assertIn(
            "Failure writing Wireguard configuration file /no/valid/path:\n",
            str(context_mgr.exception),
        )

    @mock.patch("%s.subp.subp" % MPATH)
    def test_readiness_probe_invalid_command(self, m_subp):
        """Errors when executing readinessprobes are raised."""
        wg_readinessprobes = ["not-a-valid-command"]

        def fake_subp(cmd, capture=None, shell=None):
            fail_cmds = ["not-a-valid-command"]
            if cmd in fail_cmds and capture and shell:
                raise subp.ProcessExecutionError(
                    "not-a-valid-command: command not found"
                )

        m_subp.side_effect = fake_subp

        with self.assertRaises(RuntimeError) as context_mgr:
            cc_wireguard.readinessprobe(wg_readinessprobes)
        self.assertIn(
            "Failed running readinessprobe command:\n"
            "not-a-valid-command: Unexpected error while"
            " running command.\n"
            "Command: -\nExit code: -\nReason: -\n"
            "Stdout: not-a-valid-command: command not found\nStderr: -",
            str(context_mgr.exception),
        )

    @mock.patch("%s.subp.subp" % MPATH)
    def test_enable_wg_on_error(self, m_subp):
        """Errors when enabling wireguard interfaces are raised."""
        wg_int = {"name": "wg0"}
        distro = mock.MagicMock()  # No errors raised
        distro.manage_service.side_effect = subp.ProcessExecutionError(
            "systemctl start wg-quik@wg0 failed: exit code 1"
        )
        mycloud = FakeCloud(distro)
        with self.assertRaises(RuntimeError) as context_mgr:
            cc_wireguard.enable_wg(wg_int, mycloud)
        self.assertEqual(
            "Failed enabling/starting Wireguard interface(s):\n"
            "Unexpected error while running command.\n"
            "Command: -\nExit code: -\nReason: -\n"
            "Stdout: systemctl start wg-quik@wg0 failed: exit code 1\n"
            "Stderr: -",
            str(context_mgr.exception),
        )

    @mock.patch("%s.subp.which" % MPATH)
    def test_maybe_install_wg_packages_noop_when_wg_tools_present(
        self, m_which
    ):
        """Do nothing if wireguard-tools already exists."""
        m_which.return_value = "/usr/bin/wg"  # already installed
        distro = mock.MagicMock()
        distro.update_package_sources.side_effect = RuntimeError(
            "Some apt error"
        )
        cc_wireguard.maybe_install_wireguard_packages(cloud=FakeCloud(distro))

    @mock.patch("%s.subp.which" % MPATH)
    def test_maybe_install_wf_tools_raises_update_errors(self, m_which):
        """maybe_install_wireguard_packages logs and raises
        apt update errors."""
        m_which.return_value = None
        distro = mock.MagicMock()
        distro.update_package_sources.side_effect = RuntimeError(
            "Some apt error"
        )
        with self.assertRaises(RuntimeError) as context_manager:
            cc_wireguard.maybe_install_wireguard_packages(
                cloud=FakeCloud(distro)
            )
        self.assertEqual("Some apt error", str(context_manager.exception))
        self.assertIn("Package update failed\nTraceback", self.logs.getvalue())

    @mock.patch("%s.subp.which" % MPATH)
    def test_maybe_install_wg_raises_install_errors(self, m_which):
        """maybe_install_wireguard_packages logs and raises package
        install errors."""
        m_which.return_value = None
        distro = mock.MagicMock()
        distro.update_package_sources.return_value = None
        distro.install_packages.side_effect = RuntimeError(
            "Some install error"
        )
        with self.assertRaises(RuntimeError) as context_manager:
            cc_wireguard.maybe_install_wireguard_packages(
                cloud=FakeCloud(distro)
            )
        self.assertEqual("Some install error", str(context_manager.exception))
        self.assertIn(
            "Failed to install wireguard-tools\n", self.logs.getvalue()
        )

    @mock.patch("%s.subp.subp" % MPATH)
    def test_load_wg_module_failed(self, m_subp):
        """load_wireguard_kernel_module logs and raises
        kernel modules loading error."""
        m_subp.side_effect = subp.ProcessExecutionError(
            "Some kernel module load error"
        )
        with self.assertRaises(subp.ProcessExecutionError) as context_manager:
            cc_wireguard.load_wireguard_kernel_module()
        self.assertEqual(
            "Unexpected error while running command.\n"
            "Command: -\nExit code: -\nReason: -\n"
            "Stdout: Some kernel module load error\n"
            "Stderr: -",
            str(context_manager.exception),
        )
        self.assertIn(
            "WARNING: Could not load wireguard module:\n", self.logs.getvalue()
        )

    @mock.patch("%s.subp.which" % MPATH)
    def test_maybe_install_wg_packages_happy_path(self, m_which):
        """maybe_install_wireguard_packages installs wireguard-tools."""
        packages = ["wireguard-tools"]

        if util.kernel_version() < MIN_KERNEL_VERSION:
            packages.append("wireguard")

        m_which.return_value = None
        distro = mock.MagicMock()  # No errors raised
        cc_wireguard.maybe_install_wireguard_packages(cloud=FakeCloud(distro))
        distro.update_package_sources.assert_called_once_with()
        distro.install_packages.assert_called_once_with(packages)

    @mock.patch("%s.maybe_install_wireguard_packages" % MPATH)
    def test_handle_no_config(self, m_maybe_install_wireguard_packages):
        """When no wireguard configuration is provided, nothing happens."""
        cfg = {}
        cc_wireguard.handle("wg", cfg=cfg, cloud=None, args=None)
        self.assertIn(
            "DEBUG: Skipping module named wg, no 'wireguard'"
            " configuration found",
            self.logs.getvalue(),
        )
        self.assertEqual(m_maybe_install_wireguard_packages.call_count, 0)

    def test_readiness_probe_with_non_string_values(self):
        """ValueError raised for any values expected as string type."""
        cfg = [1, 2]
        errors = [
            "Expected a string for readinessprobe at 0. Found 1",
            "Expected a string for readinessprobe at 1. Found 2",
        ]
        with self.assertRaises(ValueError) as context_manager:
            cc_wireguard.readinessprobe_command_validation(cfg)
        error_msg = str(context_manager.exception)
        for error in errors:
            self.assertIn(error, error_msg)


class TestWireguardSchema:
    @pytest.mark.parametrize(
        "config, error_msg",
        [
            # Valid schemas
            (
                {
                    "wireguard": {
                        "interfaces": [
                            {
                                "name": "wg0",
                                "config_path": "/etc/wireguard/wg0.conf",
                                "content": "test",
                            }
                        ]
                    }
                },
                None,
            ),
        ],
    )
    @skipUnlessJsonSchema()
    def test_schema_validation(self, config, error_msg):
        if error_msg is not None:
            with pytest.raises(SchemaValidationError, match=error_msg):
                validate_cloudconfig_schema(config, get_schema(), strict=True)
        else:
            validate_cloudconfig_schema(config, get_schema(), strict=True)


# vi: ts=4 expandtab

haha - 2025