Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/config/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/config/test_cc_mounts.py
# This file is part of cloud-init. See LICENSE file for license information.

import math
import os.path
import re
from collections import namedtuple
from unittest import mock

import pytest
from pytest import approx

from cloudinit.config import cc_mounts
from cloudinit.config.cc_mounts import (
    GB,
    MB,
    create_swapfile,
    suggested_swapsize,
)
from cloudinit.config.schema import (
    SchemaValidationError,
    get_schema,
    validate_cloudconfig_schema,
)
from cloudinit.subp import ProcessExecutionError
from tests.unittests import helpers as test_helpers

M_PATH = "cloudinit.config.cc_mounts."


class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase):
    def setUp(self):
        super(TestSanitizeDevname, self).setUp()
        self.new_root = self.tmp_dir()
        self.patchOS(self.new_root)

    def _touch(self, path):
        path = os.path.join(self.new_root, path.lstrip("/"))
        basedir = os.path.dirname(path)
        if not os.path.exists(basedir):
            os.makedirs(basedir)
        open(path, "a").close()

    def _makedirs(self, directory):
        directory = os.path.join(self.new_root, directory.lstrip("/"))
        if not os.path.exists(directory):
            os.makedirs(directory)

    def mock_existence_of_disk(self, disk_path):
        self._touch(disk_path)
        self._makedirs(os.path.join("/sys/block", disk_path.split("/")[-1]))

    def mock_existence_of_partition(self, disk_path, partition_number):
        self.mock_existence_of_disk(disk_path)
        self._touch(disk_path + str(partition_number))
        disk_name = disk_path.split("/")[-1]
        self._makedirs(
            os.path.join(
                "/sys/block", disk_name, disk_name + str(partition_number)
            )
        )

    def test_existent_full_disk_path_is_returned(self):
        disk_path = "/dev/sda"
        self.mock_existence_of_disk(disk_path)
        self.assertEqual(
            disk_path,
            cc_mounts.sanitize_devname(disk_path, lambda x: None),
        )

    def test_existent_disk_name_returns_full_path(self):
        disk_name = "sda"
        disk_path = "/dev/" + disk_name
        self.mock_existence_of_disk(disk_path)
        self.assertEqual(
            disk_path,
            cc_mounts.sanitize_devname(disk_name, lambda x: None),
        )

    def test_existent_meta_disk_is_returned(self):
        actual_disk_path = "/dev/sda"
        self.mock_existence_of_disk(actual_disk_path)
        self.assertEqual(
            actual_disk_path,
            cc_mounts.sanitize_devname(
                "ephemeral0",
                lambda x: actual_disk_path,
            ),
        )

    def test_existent_meta_partition_is_returned(self):
        disk_name, partition_part = "/dev/sda", "1"
        actual_partition_path = disk_name + partition_part
        self.mock_existence_of_partition(disk_name, partition_part)
        self.assertEqual(
            actual_partition_path,
            cc_mounts.sanitize_devname(
                "ephemeral0.1",
                lambda x: disk_name,
            ),
        )

    def test_existent_meta_partition_with_p_is_returned(self):
        disk_name, partition_part = "/dev/sda", "p1"
        actual_partition_path = disk_name + partition_part
        self.mock_existence_of_partition(disk_name, partition_part)
        self.assertEqual(
            actual_partition_path,
            cc_mounts.sanitize_devname(
                "ephemeral0.1",
                lambda x: disk_name,
            ),
        )

    def test_first_partition_returned_if_existent_disk_is_partitioned(self):
        disk_name, partition_part = "/dev/sda", "1"
        actual_partition_path = disk_name + partition_part
        self.mock_existence_of_partition(disk_name, partition_part)
        self.assertEqual(
            actual_partition_path,
            cc_mounts.sanitize_devname(
                "ephemeral0",
                lambda x: disk_name,
            ),
        )

    def test_nth_partition_returned_if_requested(self):
        disk_name, partition_part = "/dev/sda", "3"
        actual_partition_path = disk_name + partition_part
        self.mock_existence_of_partition(disk_name, partition_part)
        self.assertEqual(
            actual_partition_path,
            cc_mounts.sanitize_devname(
                "ephemeral0.3",
                lambda x: disk_name,
            ),
        )

    def test_transformer_returning_none_returns_none(self):
        self.assertIsNone(
            cc_mounts.sanitize_devname(
                "ephemeral0",
                lambda x: None,
            )
        )

    def test_missing_device_returns_none(self):
        self.assertIsNone(
            cc_mounts.sanitize_devname(
                "/dev/sda",
                None,
            )
        )

    def test_missing_sys_returns_none(self):
        disk_path = "/dev/sda"
        self._makedirs(disk_path)
        self.assertIsNone(
            cc_mounts.sanitize_devname(
                disk_path,
                None,
            )
        )

    def test_existent_disk_but_missing_partition_returns_none(self):
        disk_path = "/dev/sda"
        self.mock_existence_of_disk(disk_path)
        self.assertIsNone(
            cc_mounts.sanitize_devname(
                "ephemeral0.1",
                lambda x: disk_path,
            )
        )

    def test_network_device_returns_network_device(self):
        disk_path = "netdevice:/path"
        self.assertEqual(
            disk_path,
            cc_mounts.sanitize_devname(
                disk_path,
                None,
            ),
        )

    def test_device_aliases_remapping(self):
        disk_path = "/dev/sda"
        self.mock_existence_of_disk(disk_path)
        self.assertEqual(
            disk_path,
            cc_mounts.sanitize_devname(
                "mydata", lambda x: None, {"mydata": disk_path}
            ),
        )


class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase):
    def setUp(self):
        super(TestSwapFileCreation, self).setUp()
        self.new_root = self.tmp_dir()
        self.patchOS(self.new_root)

        self.fstab_path = os.path.join(self.new_root, "etc/fstab")
        self.swap_path = os.path.join(self.new_root, "swap.img")
        self._makedirs("/etc")

        self.add_patch(
            "cloudinit.config.cc_mounts.FSTAB_PATH",
            "mock_fstab_path",
            self.fstab_path,
            autospec=False,
        )

        self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp")

        self.add_patch(
            "cloudinit.config.cc_mounts.util.mounts",
            "mock_util_mounts",
            return_value={
                "/dev/sda1": {
                    "fstype": "ext4",
                    "mountpoint": "/",
                    "opts": "rw,relatime,discard",
                }
            },
        )

        self.mock_cloud = mock.Mock()
        self.mock_log = mock.Mock()
        self.mock_cloud.device_name_to_device = self.device_name_to_device

        self.cc = {
            "swap": {
                "filename": self.swap_path,
                "size": "512",
                "maxsize": "512",
            }
        }

    def _makedirs(self, directory):
        directory = os.path.join(self.new_root, directory.lstrip("/"))
        if not os.path.exists(directory):
            os.makedirs(directory)

    def device_name_to_device(self, path):
        if path == "swap":
            return self.swap_path
        else:
            dev = None

        return dev

    @mock.patch("cloudinit.util.get_mount_info")
    @mock.patch("cloudinit.util.kernel_version")
    def test_swap_creation_method_fallocate_on_xfs(
        self, m_kernel_version, m_get_mount_info
    ):
        m_kernel_version.return_value = (4, 20)
        m_get_mount_info.return_value = ["", "xfs"]

        cc_mounts.handle(None, self.cc, self.mock_cloud, [])
        self.m_subp_subp.assert_has_calls(
            [
                mock.call(
                    ["fallocate", "-l", "0M", self.swap_path], capture=True
                ),
                mock.call(["mkswap", self.swap_path]),
                mock.call(["swapon", "-a"]),
            ]
        )

    @mock.patch("cloudinit.util.get_mount_info")
    @mock.patch("cloudinit.util.kernel_version")
    def test_swap_creation_method_xfs(
        self, m_kernel_version, m_get_mount_info
    ):
        m_kernel_version.return_value = (3, 18)
        m_get_mount_info.return_value = ["", "xfs"]

        cc_mounts.handle(None, self.cc, self.mock_cloud, [])
        self.m_subp_subp.assert_has_calls(
            [
                mock.call(
                    [
                        "dd",
                        "if=/dev/zero",
                        "of=" + self.swap_path,
                        "bs=1M",
                        "count=0",
                    ],
                    capture=True,
                ),
                mock.call(["mkswap", self.swap_path]),
                mock.call(["swapon", "-a"]),
            ]
        )

    @mock.patch("cloudinit.util.get_mount_info")
    @mock.patch("cloudinit.util.kernel_version")
    def test_swap_creation_method_btrfs(
        self, m_kernel_version, m_get_mount_info
    ):
        m_kernel_version.return_value = (4, 20)
        m_get_mount_info.return_value = ["", "btrfs"]

        cc_mounts.handle(None, self.cc, self.mock_cloud, [])
        self.m_subp_subp.assert_has_calls(
            [
                mock.call(
                    [
                        "dd",
                        "if=/dev/zero",
                        "of=" + self.swap_path,
                        "bs=1M",
                        "count=0",
                    ],
                    capture=True,
                ),
                mock.call(["mkswap", self.swap_path]),
                mock.call(["swapon", "-a"]),
            ]
        )

    @mock.patch("cloudinit.util.get_mount_info")
    @mock.patch("cloudinit.util.kernel_version")
    def test_swap_creation_method_ext4(
        self, m_kernel_version, m_get_mount_info
    ):
        m_kernel_version.return_value = (5, 14)
        m_get_mount_info.return_value = ["", "ext4"]

        cc_mounts.handle(None, self.cc, self.mock_cloud, [])
        self.m_subp_subp.assert_has_calls(
            [
                mock.call(
                    ["fallocate", "-l", "0M", self.swap_path], capture=True
                ),
                mock.call(["mkswap", self.swap_path]),
                mock.call(["swapon", "-a"]),
            ]
        )


class TestFstabHandling(test_helpers.FilesystemMockingTestCase):

    swap_path = "/dev/sdb1"

    def setUp(self):
        super(TestFstabHandling, self).setUp()
        self.new_root = self.tmp_dir()
        self.patchOS(self.new_root)

        self.fstab_path = os.path.join(self.new_root, "etc/fstab")
        self._makedirs("/etc")

        self.add_patch(
            "cloudinit.config.cc_mounts.FSTAB_PATH",
            "mock_fstab_path",
            self.fstab_path,
            autospec=False,
        )

        self.add_patch(
            "cloudinit.config.cc_mounts._is_block_device",
            "mock_is_block_device",
            return_value=True,
        )

        self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp")

        self.add_patch(
            "cloudinit.config.cc_mounts.util.mounts",
            "mock_util_mounts",
            return_value={
                "/dev/sda1": {
                    "fstype": "ext4",
                    "mountpoint": "/",
                    "opts": "rw,relatime,discard",
                }
            },
        )

        self.mock_cloud = mock.Mock()
        self.mock_log = mock.Mock()
        self.mock_cloud.device_name_to_device = self.device_name_to_device

    def _makedirs(self, directory):
        directory = os.path.join(self.new_root, directory.lstrip("/"))
        if not os.path.exists(directory):
            os.makedirs(directory)

    def device_name_to_device(self, path):
        if path == "swap":
            return self.swap_path
        else:
            dev = None

        return dev

    def test_no_fstab(self):
        """Handle images which do not include an fstab."""
        self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH))
        fstab_expected_content = (
            "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n"
            % (self.swap_path,)
        )
        cc_mounts.handle(None, {}, self.mock_cloud, [])
        with open(cc_mounts.FSTAB_PATH, "r") as fd:
            fstab_new_content = fd.read()
            self.assertEqual(fstab_expected_content, fstab_new_content)

    def test_swap_integrity(self):
        """Ensure that the swap file is correctly created and can
        swapon successfully. Fixing the corner case of:
        kernel: swapon: swapfile has holes"""

        fstab = "/swap.img swap swap defaults 0 0\n"

        with open(cc_mounts.FSTAB_PATH, "w") as fd:
            fd.write(fstab)
        cc = {"swap": ["filename: /swap.img", "size: 512", "maxsize: 512"]}
        cc_mounts.handle(None, cc, self.mock_cloud, [])

    def test_fstab_no_swap_device(self):
        """Ensure that cloud-init adds a discovered swap partition
        to /etc/fstab."""

        fstab_original_content = ""
        fstab_expected_content = (
            "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n"
            % (self.swap_path,)
        )

        with open(cc_mounts.FSTAB_PATH, "w") as fd:
            fd.write(fstab_original_content)

        cc_mounts.handle(None, {}, self.mock_cloud, [])

        with open(cc_mounts.FSTAB_PATH, "r") as fd:
            fstab_new_content = fd.read()
            self.assertEqual(fstab_expected_content, fstab_new_content)

    def test_fstab_same_swap_device_already_configured(self):
        """Ensure that cloud-init will not add a swap device if the same
        device already exists in /etc/fstab."""

        fstab_original_content = "%s swap swap defaults 0 0\n" % (
            self.swap_path,
        )
        fstab_expected_content = fstab_original_content

        with open(cc_mounts.FSTAB_PATH, "w") as fd:
            fd.write(fstab_original_content)

        cc_mounts.handle(None, {}, self.mock_cloud, [])

        with open(cc_mounts.FSTAB_PATH, "r") as fd:
            fstab_new_content = fd.read()
            self.assertEqual(fstab_expected_content, fstab_new_content)

    def test_fstab_alternate_swap_device_already_configured(self):
        """Ensure that cloud-init will add a discovered swap device to
        /etc/fstab even when there exists a swap definition on another
        device."""

        fstab_original_content = "/dev/sdc1 swap swap defaults 0 0\n"
        fstab_expected_content = (
            fstab_original_content
            + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n"
            % (self.swap_path,)
        )

        with open(cc_mounts.FSTAB_PATH, "w") as fd:
            fd.write(fstab_original_content)

        cc_mounts.handle(None, {}, self.mock_cloud, [])

        with open(cc_mounts.FSTAB_PATH, "r") as fd:
            fstab_new_content = fd.read()
            self.assertEqual(fstab_expected_content, fstab_new_content)

    def test_no_change_fstab_sets_needs_mount_all(self):
        """verify unchanged fstab entries are mounted if not call mount -a"""
        fstab_original_content = (
            "LABEL=cloudimg-rootfs / ext4 defaults 0 0\n"
            "LABEL=UEFI /boot/efi vfat defaults 0 0\n"
            "/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n"
        )
        fstab_expected_content = fstab_original_content
        cc = {"mounts": [["/dev/vdb", "/mnt", "auto", "defaults,noexec"]]}
        with open(cc_mounts.FSTAB_PATH, "w") as fd:
            fd.write(fstab_original_content)
        with open(cc_mounts.FSTAB_PATH, "r") as fd:
            fstab_new_content = fd.read()
            self.assertEqual(fstab_expected_content, fstab_new_content)
        cc_mounts.handle(None, cc, self.mock_cloud, [])
        self.m_subp_subp.assert_has_calls(
            [
                mock.call(["mount", "-a"]),
                mock.call(["systemctl", "daemon-reload"]),
            ]
        )


class TestCreateSwapfile:
    @pytest.mark.parametrize("fstype", ("xfs", "btrfs", "ext4", "other"))
    @mock.patch(M_PATH + "util.get_mount_info")
    @mock.patch(M_PATH + "subp.subp")
    def test_happy_path(self, m_subp, m_get_mount_info, fstype, tmpdir):
        swap_file = tmpdir.join("swap-file")
        fname = str(swap_file)

        # Some of the calls to subp.subp should create the swap file; this
        # roughly approximates that
        m_subp.side_effect = lambda *args, **kwargs: swap_file.write("")

        m_get_mount_info.return_value = (mock.ANY, fstype)

        create_swapfile(fname, "")
        assert mock.call(["mkswap", fname]) in m_subp.call_args_list

    @mock.patch(M_PATH + "util.get_mount_info")
    @mock.patch(M_PATH + "subp.subp")
    def test_fallback_from_fallocate_to_dd(
        self, m_subp, m_get_mount_info, caplog, tmpdir
    ):
        swap_file = tmpdir.join("swap-file")
        fname = str(swap_file)

        def subp_side_effect(cmd, *args, **kwargs):
            # Mock fallocate failing, to initiate fallback
            if cmd[0] == "fallocate":
                raise ProcessExecutionError()

        m_subp.side_effect = subp_side_effect
        # Use ext4 so both fallocate and dd are valid swap creation methods
        m_get_mount_info.return_value = (mock.ANY, "ext4")

        create_swapfile(fname, "")

        cmds = [args[0][0] for args, _kwargs in m_subp.call_args_list]
        assert "fallocate" in cmds, "fallocate was not called"
        assert "dd" in cmds, "fallocate failure did not fallback to dd"

        assert cmds.index("dd") > cmds.index(
            "fallocate"
        ), "dd ran before fallocate"

        assert mock.call(["mkswap", fname]) in m_subp.call_args_list

        msg = "fallocate swap creation failed, will attempt with dd"
        assert msg in caplog.text

    # See https://help.ubuntu.com/community/SwapFaq
    @pytest.mark.parametrize(
        "memsize,expected",
        [
            (256 * MB, 256 * MB),
            (512 * MB, 512 * MB),
            (1 * GB, 1 * GB),
            (2 * GB, 2 * GB),
            (4 * GB, 4 * GB),
            (8 * GB, 4 * GB),
            (16 * GB, 4 * GB),
            (32 * GB, 6 * GB),
            (64 * GB, 8 * GB),
            (128 * GB, 11 * GB),
            (256 * GB, 16 * GB),
            (512 * GB, 23 * GB),
        ],
    )
    def test_suggested_swapsize(self, memsize, expected, mocker):
        mock_stat = namedtuple("mock_stat", "f_frsize f_bfree")
        mocker.patch(
            "os.statvfs",
            # Don't care about available disk space for the purposes of this
            # test
            return_value=mock_stat(math.inf, math.inf),
        )
        size = suggested_swapsize(memsize, math.inf, "dontcare")
        assert expected == approx(size)


class TestMountsSchema:
    @pytest.mark.parametrize(
        "config, error_msg",
        [
            # We expect to see one mount if provided in user-data.
            ({"mounts": []}, re.escape("mounts: [] is too short")),
            # Disallow less than 1 item per mount entry
            ({"mounts": [[]]}, re.escape("mounts.0: [] is too short")),
            # Disallow more than 6 items per mount entry
            ({"mounts": [["1"] * 7]}, "mounts.0:.* is too long"),
            # Disallow mount_default_fields will anything other than 6 items
            (
                {"mount_default_fields": ["1"] * 5},
                "mount_default_fields:.* is too short",
            ),
            (
                {"mount_default_fields": ["1"] * 7},
                "mount_default_fields:.* is too long",
            ),
            (
                {"swap": {"invalidprop": True}},
                re.escape(
                    "Additional properties are not allowed ('invalidprop'"
                ),
            ),
            # Swap size/maxsize positive test cases
            ({"swap": {"size": ".5T", "maxsize": ".5T"}}, None),
            ({"swap": {"size": "1G", "maxsize": "1G"}}, None),
            ({"swap": {"size": "200K", "maxsize": "200K"}}, None),
            ({"swap": {"size": "10485760B", "maxsize": "10485760B"}}, None),
            # Swap size/maxsize negative test cases
            ({"swap": {"size": "1.5MB"}}, "swap.size:"),
            (
                {"swap": {"maxsize": "1.5MT"}},
                "swap.maxsize: '1.5MT' is not valid",
            ),
            (
                {"swap": {"maxsize": "..5T"}},
                "swap.maxsize: '..5T' is not valid",
            ),
            ({"swap": {"size": "K"}}, "swap.size: 'K' is not valid"),
        ],
    )
    @test_helpers.skipUnlessJsonSchema()
    def test_schema_validation(self, config, error_msg):
        if error_msg is None:
            validate_cloudconfig_schema(config, get_schema(), strict=True)
        else:
            with pytest.raises(SchemaValidationError, match=error_msg):
                validate_cloudconfig_schema(config, get_schema(), strict=True)


# vi: ts=4 expandtab

haha - 2025