|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /usr/src/cloud-init/tests/unittests/config/ |
Upload File : |
# This file is part of cloud-init. See LICENSE file for license information.
import pytest
from cloudinit import subp, util
from cloudinit.config import cc_wireguard
from cloudinit.config.schema import (
SchemaValidationError,
get_schema,
validate_cloudconfig_schema,
)
from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema
NL = "\n"
# Module path used in mocks
MPATH = "cloudinit.config.cc_wireguard"
MIN_KERNEL_VERSION = (5, 6)
class FakeCloud:
def __init__(self, distro):
self.distro = distro
class TestWireGuard(CiTestCase):
with_logs = True
allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
def setUp(self):
super(TestWireGuard, self).setUp()
self.tmp = self.tmp_dir()
def test_readiness_probe_schema_non_string_values(self):
"""ValueError raised for any values expected as string type."""
wg_readinessprobes = [1, ["not-a-valid-command"]]
errors = [
"Expected a string for readinessprobe at 0. Found 1",
"Expected a string for readinessprobe at 1."
" Found ['not-a-valid-command']",
]
with self.assertRaises(ValueError) as context_mgr:
cc_wireguard.readinessprobe_command_validation(wg_readinessprobes)
error_msg = str(context_mgr.exception)
for error in errors:
self.assertIn(error, error_msg)
def test_suppl_schema_error_on_missing_keys(self):
"""ValueError raised reporting any missing required keys"""
cfg = {}
match = (
f"Invalid wireguard interface configuration:{NL}"
"Missing required wg:interfaces keys: config_path, content, name"
)
with self.assertRaisesRegex(ValueError, match):
cc_wireguard.supplemental_schema_validation(cfg)
def test_suppl_schema_error_on_non_string_values(self):
"""ValueError raised for any values expected as string type."""
cfg = {"name": 1, "config_path": 2, "content": 3}
errors = [
"Expected a string for wg:interfaces:config_path. Found 2",
"Expected a string for wg:interfaces:content. Found 3",
"Expected a string for wg:interfaces:name. Found 1",
]
with self.assertRaises(ValueError) as context_mgr:
cc_wireguard.supplemental_schema_validation(cfg)
error_msg = str(context_mgr.exception)
for error in errors:
self.assertIn(error, error_msg)
def test_write_config_failed(self):
"""Errors when writing config are raised."""
wg_int = {"name": "wg0", "config_path": "/no/valid/path"}
with self.assertRaises(RuntimeError) as context_mgr:
cc_wireguard.write_config(wg_int)
self.assertIn(
"Failure writing Wireguard configuration file /no/valid/path:\n",
str(context_mgr.exception),
)
@mock.patch("%s.subp.subp" % MPATH)
def test_readiness_probe_invalid_command(self, m_subp):
"""Errors when executing readinessprobes are raised."""
wg_readinessprobes = ["not-a-valid-command"]
def fake_subp(cmd, capture=None, shell=None):
fail_cmds = ["not-a-valid-command"]
if cmd in fail_cmds and capture and shell:
raise subp.ProcessExecutionError(
"not-a-valid-command: command not found"
)
m_subp.side_effect = fake_subp
with self.assertRaises(RuntimeError) as context_mgr:
cc_wireguard.readinessprobe(wg_readinessprobes)
self.assertIn(
"Failed running readinessprobe command:\n"
"not-a-valid-command: Unexpected error while"
" running command.\n"
"Command: -\nExit code: -\nReason: -\n"
"Stdout: not-a-valid-command: command not found\nStderr: -",
str(context_mgr.exception),
)
@mock.patch("%s.subp.subp" % MPATH)
def test_enable_wg_on_error(self, m_subp):
"""Errors when enabling wireguard interfaces are raised."""
wg_int = {"name": "wg0"}
distro = mock.MagicMock() # No errors raised
distro.manage_service.side_effect = subp.ProcessExecutionError(
"systemctl start wg-quik@wg0 failed: exit code 1"
)
mycloud = FakeCloud(distro)
with self.assertRaises(RuntimeError) as context_mgr:
cc_wireguard.enable_wg(wg_int, mycloud)
self.assertEqual(
"Failed enabling/starting Wireguard interface(s):\n"
"Unexpected error while running command.\n"
"Command: -\nExit code: -\nReason: -\n"
"Stdout: systemctl start wg-quik@wg0 failed: exit code 1\n"
"Stderr: -",
str(context_mgr.exception),
)
@mock.patch("%s.subp.which" % MPATH)
def test_maybe_install_wg_packages_noop_when_wg_tools_present(
self, m_which
):
"""Do nothing if wireguard-tools already exists."""
m_which.return_value = "/usr/bin/wg" # already installed
distro = mock.MagicMock()
distro.update_package_sources.side_effect = RuntimeError(
"Some apt error"
)
cc_wireguard.maybe_install_wireguard_packages(cloud=FakeCloud(distro))
@mock.patch("%s.subp.which" % MPATH)
def test_maybe_install_wf_tools_raises_update_errors(self, m_which):
"""maybe_install_wireguard_packages logs and raises
apt update errors."""
m_which.return_value = None
distro = mock.MagicMock()
distro.update_package_sources.side_effect = RuntimeError(
"Some apt error"
)
with self.assertRaises(RuntimeError) as context_manager:
cc_wireguard.maybe_install_wireguard_packages(
cloud=FakeCloud(distro)
)
self.assertEqual("Some apt error", str(context_manager.exception))
self.assertIn("Package update failed\nTraceback", self.logs.getvalue())
@mock.patch("%s.subp.which" % MPATH)
def test_maybe_install_wg_raises_install_errors(self, m_which):
"""maybe_install_wireguard_packages logs and raises package
install errors."""
m_which.return_value = None
distro = mock.MagicMock()
distro.update_package_sources.return_value = None
distro.install_packages.side_effect = RuntimeError(
"Some install error"
)
with self.assertRaises(RuntimeError) as context_manager:
cc_wireguard.maybe_install_wireguard_packages(
cloud=FakeCloud(distro)
)
self.assertEqual("Some install error", str(context_manager.exception))
self.assertIn(
"Failed to install wireguard-tools\n", self.logs.getvalue()
)
@mock.patch("%s.subp.subp" % MPATH)
def test_load_wg_module_failed(self, m_subp):
"""load_wireguard_kernel_module logs and raises
kernel modules loading error."""
m_subp.side_effect = subp.ProcessExecutionError(
"Some kernel module load error"
)
with self.assertRaises(subp.ProcessExecutionError) as context_manager:
cc_wireguard.load_wireguard_kernel_module()
self.assertEqual(
"Unexpected error while running command.\n"
"Command: -\nExit code: -\nReason: -\n"
"Stdout: Some kernel module load error\n"
"Stderr: -",
str(context_manager.exception),
)
self.assertIn(
"WARNING: Could not load wireguard module:\n", self.logs.getvalue()
)
@mock.patch("%s.subp.which" % MPATH)
def test_maybe_install_wg_packages_happy_path(self, m_which):
"""maybe_install_wireguard_packages installs wireguard-tools."""
packages = ["wireguard-tools"]
if util.kernel_version() < MIN_KERNEL_VERSION:
packages.append("wireguard")
m_which.return_value = None
distro = mock.MagicMock() # No errors raised
cc_wireguard.maybe_install_wireguard_packages(cloud=FakeCloud(distro))
distro.update_package_sources.assert_called_once_with()
distro.install_packages.assert_called_once_with(packages)
@mock.patch("%s.maybe_install_wireguard_packages" % MPATH)
def test_handle_no_config(self, m_maybe_install_wireguard_packages):
"""When no wireguard configuration is provided, nothing happens."""
cfg = {}
cc_wireguard.handle("wg", cfg=cfg, cloud=None, args=None)
self.assertIn(
"DEBUG: Skipping module named wg, no 'wireguard'"
" configuration found",
self.logs.getvalue(),
)
self.assertEqual(m_maybe_install_wireguard_packages.call_count, 0)
def test_readiness_probe_with_non_string_values(self):
"""ValueError raised for any values expected as string type."""
cfg = [1, 2]
errors = [
"Expected a string for readinessprobe at 0. Found 1",
"Expected a string for readinessprobe at 1. Found 2",
]
with self.assertRaises(ValueError) as context_manager:
cc_wireguard.readinessprobe_command_validation(cfg)
error_msg = str(context_manager.exception)
for error in errors:
self.assertIn(error, error_msg)
class TestWireguardSchema:
@pytest.mark.parametrize(
"config, error_msg",
[
# Valid schemas
(
{
"wireguard": {
"interfaces": [
{
"name": "wg0",
"config_path": "/etc/wireguard/wg0.conf",
"content": "test",
}
]
}
},
None,
),
],
)
@skipUnlessJsonSchema()
def test_schema_validation(self, config, error_msg):
if error_msg is not None:
with pytest.raises(SchemaValidationError, match=error_msg):
validate_cloudconfig_schema(config, get_schema(), strict=True)
else:
validate_cloudconfig_schema(config, get_schema(), strict=True)
# vi: ts=4 expandtab