|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /usr/src/cloud-init/tests/unittests/distros/ |
Upload File : |
# This file is part of cloud-init. See LICENSE file for license information.
import os
import shutil
import tempfile
from unittest import mock
import pytest
from cloudinit import distros, util
from tests.unittests import helpers
M_PATH = "cloudinit.distros."
unknown_arch_info = {
"arches": ["default"],
"failsafe": {
"primary": "http://fs-primary-default",
"security": "http://fs-security-default",
},
}
package_mirrors = [
{
"arches": ["i386", "amd64"],
"failsafe": {
"primary": "http://fs-primary-intel",
"security": "http://fs-security-intel",
},
"search": {
"primary": [
"http://%(ec2_region)s.ec2/",
"http://%(availability_zone)s.clouds/",
],
"security": [
"http://security-mirror1-intel",
"http://security-mirror2-intel",
],
},
},
{
"arches": ["armhf", "armel"],
"failsafe": {
"primary": "http://fs-primary-arm",
"security": "http://fs-security-arm",
},
},
unknown_arch_info,
]
gpmi = distros._get_package_mirror_info
gapmi = distros._get_arch_package_mirror_info
class TestGenericDistro(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestGenericDistro, self).setUp()
# Make a temp directoy for tests to use.
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
def _write_load_sudoers(self, _user, rules):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
os.makedirs(os.path.join(self.tmp, "etc"))
os.makedirs(os.path.join(self.tmp, "etc", "sudoers.d"))
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
d.write_sudo_rules("harlowja", rules)
contents = util.load_file(d.ci_sudoers_fn)
return contents
def _count_in(self, lines_look_for, text_content):
found_amount = 0
for e in lines_look_for:
for line in text_content.splitlines():
line = line.strip()
if line == e:
found_amount += 1
return found_amount
def test_sudoers_ensure_rules(self):
rules = "ALL=(ALL:ALL) ALL"
contents = self._write_load_sudoers("harlowja", rules)
expected = ["harlowja ALL=(ALL:ALL) ALL"]
self.assertEqual(len(expected), self._count_in(expected, contents))
not_expected = [
"harlowja A",
"harlowja L",
"harlowja L",
]
self.assertEqual(0, self._count_in(not_expected, contents))
def test_sudoers_ensure_rules_list(self):
rules = [
"ALL=(ALL:ALL) ALL",
"B-ALL=(ALL:ALL) ALL",
"C-ALL=(ALL:ALL) ALL",
]
contents = self._write_load_sudoers("harlowja", rules)
expected = [
"harlowja ALL=(ALL:ALL) ALL",
"harlowja B-ALL=(ALL:ALL) ALL",
"harlowja C-ALL=(ALL:ALL) ALL",
]
self.assertEqual(len(expected), self._count_in(expected, contents))
not_expected = [
"harlowja A",
"harlowja L",
"harlowja L",
]
self.assertEqual(0, self._count_in(not_expected, contents))
def test_sudoers_ensure_new(self):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
d.ensure_sudo_dir("/b")
contents = util.load_file("/etc/sudoers")
self.assertIn("includedir /b", contents)
self.assertTrue(os.path.isdir("/b"))
def test_sudoers_ensure_append(self):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
util.write_file("/etc/sudoers", "josh, josh\n")
d.ensure_sudo_dir("/b")
contents = util.load_file("/etc/sudoers")
self.assertIn("includedir /b", contents)
self.assertTrue(os.path.isdir("/b"))
self.assertIn("josh", contents)
self.assertEqual(2, contents.count("josh"))
def test_sudoers_ensure_only_one_includedir(self):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
for char in ["#", "@"]:
util.write_file("/etc/sudoers", "{}includedir /b".format(char))
d.ensure_sudo_dir("/b")
contents = util.load_file("/etc/sudoers")
self.assertIn("includedir /b", contents)
self.assertTrue(os.path.isdir("/b"))
self.assertEqual(1, contents.count("includedir /b"))
def test_arch_package_mirror_info_unknown(self):
"""for an unknown arch, we should get back that with arch 'default'."""
arch_mirrors = gapmi(package_mirrors, arch="unknown")
self.assertEqual(unknown_arch_info, arch_mirrors)
def test_arch_package_mirror_info_known(self):
arch_mirrors = gapmi(package_mirrors, arch="amd64")
self.assertEqual(package_mirrors[0], arch_mirrors)
def test_systemd_in_use(self):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
os.makedirs("/run/systemd/system")
self.assertTrue(d.uses_systemd())
def test_systemd_not_in_use(self):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
self.assertFalse(d.uses_systemd())
def test_systemd_symlink(self):
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
self.patchOS(self.tmp)
self.patchUtils(self.tmp)
os.makedirs("/run/systemd")
os.symlink("/", "/run/systemd/system")
self.assertFalse(d.uses_systemd())
@mock.patch("cloudinit.distros.debian.read_system_locale")
def test_get_locale_ubuntu(self, m_locale):
"""Test ubuntu distro returns locale set to C.UTF-8"""
m_locale.return_value = "C.UTF-8"
cls = distros.fetch("ubuntu")
d = cls("ubuntu", {}, None)
locale = d.get_locale()
self.assertEqual("C.UTF-8", locale)
@mock.patch("cloudinit.distros.rhel.Distro._read_system_locale")
def test_get_locale_rhel(self, m_locale):
"""Test rhel distro returns locale set to C.UTF-8"""
m_locale.return_value = "C.UTF-8"
cls = distros.fetch("rhel")
d = cls("rhel", {}, None)
locale = d.get_locale()
self.assertEqual("C.UTF-8", locale)
def test_expire_passwd_uses_chpasswd(self):
"""Test ubuntu.expire_passwd uses the passwd command."""
for d_name in ("ubuntu", "rhel"):
cls = distros.fetch(d_name)
d = cls(d_name, {}, None)
with mock.patch("cloudinit.subp.subp") as m_subp:
d.expire_passwd("myuser")
m_subp.assert_called_once_with(["passwd", "--expire", "myuser"])
def test_expire_passwd_freebsd_uses_pw_command(self):
"""Test FreeBSD.expire_passwd uses the pw command."""
cls = distros.fetch("freebsd")
# patch ifconfig -a
with mock.patch(
"cloudinit.distros.networking.subp.subp", return_value=("", None)
):
d = cls("freebsd", {}, None)
with mock.patch("cloudinit.subp.subp") as m_subp:
d.expire_passwd("myuser")
m_subp.assert_called_once_with(
["pw", "usermod", "myuser", "-p", "01-Jan-1970"]
)
class TestGetPackageMirrors:
def return_first(self, mlist):
if not mlist:
return None
return mlist[0]
def return_second(self, mlist):
if not mlist:
return None
return mlist[1] if len(mlist) > 1 else None
def return_none(self, _mlist):
return None
def return_last(self, mlist):
if not mlist:
return None
return mlist[-1]
@pytest.mark.parametrize(
"allow_ec2_mirror, platform_type, mirrors",
[
(
True,
"ec2",
[
{
"primary": "http://us-east-1.ec2/",
"security": "http://security-mirror1-intel",
},
{
"primary": "http://us-east-1a.clouds/",
"security": "http://security-mirror2-intel",
},
],
),
(
True,
"other",
[
{
"primary": "http://us-east-1.ec2/",
"security": "http://security-mirror1-intel",
},
{
"primary": "http://us-east-1a.clouds/",
"security": "http://security-mirror2-intel",
},
],
),
(
False,
"ec2",
[
{
"primary": "http://us-east-1.ec2/",
"security": "http://security-mirror1-intel",
},
{
"primary": "http://us-east-1a.clouds/",
"security": "http://security-mirror2-intel",
},
],
),
(
False,
"other",
[
{
"primary": "http://us-east-1a.clouds/",
"security": "http://security-mirror1-intel",
},
{
"primary": "http://fs-primary-intel",
"security": "http://security-mirror2-intel",
},
],
),
],
)
def test_get_package_mirror_info_az_ec2(
self, allow_ec2_mirror, platform_type, mirrors
):
flag_path = (
"cloudinit.distros.ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"
)
with mock.patch(flag_path, allow_ec2_mirror):
arch_mirrors = gapmi(package_mirrors, arch="amd64")
data_source_mock = mock.Mock(
availability_zone="us-east-1a", platform_type=platform_type
)
results = gpmi(
arch_mirrors,
data_source=data_source_mock,
mirror_filter=self.return_first,
)
assert results == mirrors[0]
results = gpmi(
arch_mirrors,
data_source=data_source_mock,
mirror_filter=self.return_second,
)
assert results == mirrors[1]
results = gpmi(
arch_mirrors,
data_source=data_source_mock,
mirror_filter=self.return_none,
)
assert results == package_mirrors[0]["failsafe"]
def test_get_package_mirror_info_az_non_ec2(self):
arch_mirrors = gapmi(package_mirrors, arch="amd64")
data_source_mock = mock.Mock(availability_zone="nova.cloudvendor")
results = gpmi(
arch_mirrors,
data_source=data_source_mock,
mirror_filter=self.return_first,
)
assert results == {
"primary": "http://nova.cloudvendor.clouds/",
"security": "http://security-mirror1-intel",
}
results = gpmi(
arch_mirrors,
data_source=data_source_mock,
mirror_filter=self.return_last,
)
assert results == {
"primary": "http://nova.cloudvendor.clouds/",
"security": "http://security-mirror2-intel",
}
def test_get_package_mirror_info_none(self):
arch_mirrors = gapmi(package_mirrors, arch="amd64")
data_source_mock = mock.Mock(availability_zone=None)
# because both search entries here replacement based on
# availability-zone, the filter will be called with an empty list and
# failsafe should be taken.
results = gpmi(
arch_mirrors,
data_source=data_source_mock,
mirror_filter=self.return_first,
)
assert results == {
"primary": "http://fs-primary-intel",
"security": "http://security-mirror1-intel",
}
results = gpmi(
arch_mirrors,
data_source=data_source_mock,
mirror_filter=self.return_last,
)
assert results == {
"primary": "http://fs-primary-intel",
"security": "http://security-mirror2-intel",
}
@pytest.mark.usefixtures("fake_filesystem")
class TestDistro:
@pytest.mark.parametrize("is_noexec", [False, True])
@mock.patch(M_PATH + "util.has_mount_opt")
@mock.patch(M_PATH + "temp_utils.get_tmp_ancestor", return_value="/tmp")
def test_get_tmp_exec_path(
self, m_get_tmp_ancestor, m_has_mount_opt, is_noexec, mocker
):
m_has_mount_opt.return_value = not is_noexec
cls = distros.fetch("ubuntu")
distro = cls("ubuntu", {}, None)
mocker.patch.object(distro, "usr_lib_exec", "/usr_lib_exec")
tmp_path = distro.get_tmp_exec_path()
if is_noexec:
assert "/tmp" == tmp_path
else:
assert "/usr_lib_exec/cloud-init/clouddir" == tmp_path
# vi: ts=4 expandtab