Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/config/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/config/test_cc_growpart.py
# This file is part of cloud-init. See LICENSE file for license information.

import errno
import logging
import os
import re
import shutil
import stat
import unittest
from contextlib import ExitStack
from itertools import chain
from unittest import mock

import pytest

from cloudinit import cloud, subp, temp_utils
from cloudinit.config import cc_growpart
from cloudinit.config.schema import (
    SchemaValidationError,
    get_schema,
    validate_cloudconfig_schema,
)
from tests.unittests.helpers import (
    TestCase,
    does_not_raise,
    skipUnlessJsonSchema,
)

# growpart:
#   mode: auto  # off, on, auto, 'growpart'
#   devices: ['root']

HELP_GROWPART_RESIZE = """
growpart disk partition
   rewrite partition table so that partition takes up all the space it can
   options:
    -h | --help       print Usage and exit
<SNIP>
    -u | --update  R  update the the kernel partition table info after growing
                      this requires kernel support and 'partx --update'
                      R is one of:
                       - 'auto'  : [default] update partition if possible
<SNIP>
   Example:
    - growpart /dev/sda 1
      Resize partition 1 on /dev/sda
"""

HELP_GROWPART_NO_RESIZE = """
growpart disk partition
   rewrite partition table so that partition takes up all the space it can
   options:
    -h | --help       print Usage and exit
<SNIP>
   Example:
    - growpart /dev/sda 1
      Resize partition 1 on /dev/sda
"""

HELP_GPART = """
usage: gpart add -t type [-a alignment] [-b start] <SNIP> geom
       gpart backup geom
       gpart bootcode [-b bootcode] [-p partcode -i index] [-f flags] geom
<SNIP>
       gpart resize -i index [-a alignment] [-s size] [-f flags] geom
       gpart restore [-lF] [-f flags] provider [...]
       gpart recover [-f flags] geom
       gpart help
<SNIP>
"""


class Dir:
    """Stub object"""

    def __init__(self, name):
        self.name = name
        self.st_mode = name

    def is_dir(self, *args, **kwargs):
        return True

    def stat(self, *args, **kwargs):
        return self


class Scanner:
    """Stub object"""

    def __enter__(self):
        return (
            Dir(""),
            Dir(""),
        )

    def __exit__(self, *args):
        pass


class TestDisabled(unittest.TestCase):
    def setUp(self):
        super(TestDisabled, self).setUp()
        self.name = "growpart"
        self.cloud = None
        self.args = []

        self.handle = cc_growpart.handle

    def test_mode_off(self):
        # Test that nothing is done if mode is off.

        # this really only verifies that resizer_factory isn't called
        config = {"growpart": {"mode": "off"}}

        with mock.patch.object(cc_growpart, "resizer_factory") as mockobj:
            self.handle(self.name, config, self.cloud, self.args)
            self.assertEqual(mockobj.call_count, 0)


class TestConfig(TestCase):
    def setUp(self):
        super(TestConfig, self).setUp()
        self.name = "growpart"
        self.paths = None
        self.distro = mock.Mock()
        self.cloud = cloud.Cloud(None, self.paths, None, self.distro, None)
        self.log = logging.getLogger("TestConfig")
        self.args = []

        self.handle = cc_growpart.handle
        self.tmppath = "/tmp/cloudinit-test-file"
        self.tmpdir = os.scandir("/tmp")
        self.tmpfile = open(self.tmppath, "w")

    def tearDown(self):
        self.tmpfile.close()
        os.remove(self.tmppath)

    @mock.patch.dict("os.environ", clear=True)
    def test_no_resizers_auto_is_fine(self):
        with mock.patch.object(
            subp, "subp", return_value=(HELP_GROWPART_NO_RESIZE, "")
        ) as mockobj:

            config = {"growpart": {"mode": "auto"}}
            self.handle(self.name, config, self.cloud, self.args)

            mockobj.assert_has_calls(
                [
                    mock.call(["growpart", "--help"], env={"LANG": "C"}),
                    mock.call(
                        ["gpart", "help"], env={"LANG": "C"}, rcs=[0, 1]
                    ),
                ]
            )

    @mock.patch.dict("os.environ", clear=True)
    def test_no_resizers_mode_growpart_is_exception(self):
        with mock.patch.object(
            subp, "subp", return_value=(HELP_GROWPART_NO_RESIZE, "")
        ) as mockobj:
            config = {"growpart": {"mode": "growpart"}}
            self.assertRaises(
                ValueError,
                self.handle,
                self.name,
                config,
                self.cloud,
                self.args,
            )

            mockobj.assert_called_once_with(
                ["growpart", "--help"], env={"LANG": "C"}
            )

    @mock.patch.dict("os.environ", clear=True)
    def test_mode_auto_prefers_growpart(self):
        with mock.patch.object(
            subp, "subp", return_value=(HELP_GROWPART_RESIZE, "")
        ) as mockobj:
            ret = cc_growpart.resizer_factory(mode="auto", distro=mock.Mock())
            self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)

            mockobj.assert_called_once_with(
                ["growpart", "--help"], env={"LANG": "C"}
            )

    @mock.patch.dict("os.environ", {"LANG": "cs_CZ.UTF-8"}, clear=True)
    @mock.patch.object(temp_utils, "mkdtemp", return_value="/tmp/much-random")
    @mock.patch.object(stat, "S_ISDIR", return_value=False)
    @mock.patch.object(os.path, "samestat", return_value=True)
    @mock.patch.object(os.path, "join", return_value="/tmp")
    @mock.patch.object(os, "scandir", return_value=Scanner())
    @mock.patch.object(os, "mkdir")
    @mock.patch.object(os, "unlink")
    @mock.patch.object(os, "rmdir")
    @mock.patch.object(os, "open", return_value=1)
    @mock.patch.object(os, "close")
    @mock.patch.object(shutil, "rmtree")
    @mock.patch.object(os, "lseek", return_value=1024)
    @mock.patch.object(os, "lstat", return_value="interesting metadata")
    def test_force_lang_check_tempfile(self, *args, **kwargs):
        with mock.patch.object(
            subp, "subp", return_value=(HELP_GROWPART_RESIZE, "")
        ) as mockobj:

            ret = cc_growpart.resizer_factory(mode="auto", distro=mock.Mock())
            self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)
            diskdev = "/dev/sdb"
            partnum = 1
            partdev = "/dev/sdb"
            ret.resize(diskdev, partnum, partdev)
        mockobj.assert_has_calls(
            [
                mock.call(
                    ["growpart", "--dry-run", diskdev, partnum],
                    env={"LANG": "C", "TMPDIR": "/tmp"},
                ),
                mock.call(
                    ["growpart", diskdev, partnum],
                    env={"LANG": "C", "TMPDIR": "/tmp"},
                ),
            ]
        )

    @mock.patch.dict("os.environ", {"LANG": "cs_CZ.UTF-8"}, clear=True)
    def test_mode_auto_falls_back_to_gpart(self):
        with mock.patch.object(
            subp, "subp", return_value=("", HELP_GPART)
        ) as mockobj:
            ret = cc_growpart.resizer_factory(mode="auto", distro=mock.Mock())
            self.assertIsInstance(ret, cc_growpart.ResizeGpart)

            mockobj.assert_has_calls(
                [
                    mock.call(["growpart", "--help"], env={"LANG": "C"}),
                    mock.call(
                        ["gpart", "help"], env={"LANG": "C"}, rcs=[0, 1]
                    ),
                ]
            )

    def test_handle_with_no_growpart_entry(self):
        # if no 'growpart' entry in config, then mode=auto should be used

        myresizer = object()
        retval = (
            (
                "/",
                cc_growpart.RESIZE.CHANGED,
                "my-message",
            ),
        )

        with ExitStack() as mocks:
            factory = mocks.enter_context(
                mock.patch.object(
                    cc_growpart, "resizer_factory", return_value=myresizer
                )
            )
            rsdevs = mocks.enter_context(
                mock.patch.object(
                    cc_growpart, "resize_devices", return_value=retval
                )
            )
            mocks.enter_context(
                mock.patch.object(
                    cc_growpart, "RESIZERS", (("mysizer", object),)
                )
            )

            self.handle(self.name, {}, self.cloud, self.args)

            factory.assert_called_once_with("auto", self.distro)
            rsdevs.assert_called_once_with(myresizer, ["/"])


class TestResize(unittest.TestCase):
    def setUp(self):
        super(TestResize, self).setUp()
        self.name = "growpart"
        self.log = logging.getLogger("TestResize")

    def test_simple_devices(self):
        # test simple device list
        # this patches out devent2dev, os.stat, and device_part_info
        # so in the end, doesn't test a lot
        devs = ["/dev/XXda1", "/dev/YYda2"]
        devstat_ret = Bunch(
            st_mode=25008,
            st_ino=6078,
            st_dev=5,
            st_nlink=1,
            st_uid=0,
            st_gid=6,
            st_size=0,
            st_atime=0,
            st_mtime=0,
            st_ctime=0,
        )
        enoent = ["/dev/NOENT"]
        real_stat = os.stat
        resize_calls = []

        class myresizer:
            def resize(self, diskdev, partnum, partdev):
                resize_calls.append((diskdev, partnum, partdev))
                if partdev == "/dev/YYda2":
                    return (1024, 2048)
                return (1024, 1024)  # old size, new size

        def mystat(path):
            if path in devs:
                return devstat_ret
            if path in enoent:
                e = OSError("%s: does not exist" % path)
                e.errno = errno.ENOENT
                raise e
            return real_stat(path)

        opinfo = cc_growpart.device_part_info
        try:
            cc_growpart.device_part_info = simple_device_part_info
            os.stat = mystat

            resized = cc_growpart.resize_devices(myresizer(), devs + enoent)

            def find(name, res):
                for f in res:
                    if f[0] == name:
                        return f
                return None

            self.assertEqual(
                cc_growpart.RESIZE.NOCHANGE, find("/dev/XXda1", resized)[1]
            )
            self.assertEqual(
                cc_growpart.RESIZE.CHANGED, find("/dev/YYda2", resized)[1]
            )
            self.assertEqual(
                cc_growpart.RESIZE.SKIPPED, find(enoent[0], resized)[1]
            )
            # self.assertEqual(resize_calls,
            #                 [("/dev/XXda", "1", "/dev/XXda1"),
            #                  ("/dev/YYda", "2", "/dev/YYda2")])
        finally:
            cc_growpart.device_part_info = opinfo
            os.stat = real_stat


class TestEncrypted:
    """Attempt end-to-end scenarios using encrypted devices.

    Things are mocked such that:
     - "/fake_encrypted" is mounted onto "/dev/mapper/fake"
     - "/dev/mapper/fake" is a LUKS device and symlinked to /dev/dm-1
     - The partition backing "/dev/mapper/fake" is "/dev/vdx1"
     - "/" is not encrypted and mounted onto "/dev/vdz1"

    Note that we don't (yet) support non-encrypted mapped drives, such
    as LVM volumes. If our mount point is /dev/mapper/*, then we will
    not resize it if it is not encrypted.
    """

    def _subp_side_effect(self, value, good=True, **kwargs):
        if value[0] == "dmsetup":
            return ("1 dependencies : (vdx1)",)
        return mock.Mock()

    def _device_part_info_side_effect(self, value):
        if value.startswith("/dev/mapper/"):
            raise TypeError(f"{value} not a partition")
        return (1024, 1024)

    def _devent2dev_side_effect(self, value):
        if value == "/fake_encrypted":
            return "/dev/mapper/fake"
        elif value == "/":
            return "/dev/vdz"
        elif value.startswith("/dev"):
            return value
        raise RuntimeError(f"unexpected value {value}")

    def _realpath_side_effect(self, value):
        return "/dev/dm-1" if value.startswith("/dev/mapper") else value

    def assert_resize_and_cleanup(self):
        all_subp_args = list(
            chain(*[args[0][0] for args in self.m_subp.call_args_list])
        )
        assert "resize" in all_subp_args
        assert "luksKillSlot" in all_subp_args
        self.m_unlink.assert_called_once()

    def assert_no_resize_or_cleanup(self):
        all_subp_args = list(
            chain(*[args[0][0] for args in self.m_subp.call_args_list])
        )
        assert "resize" not in all_subp_args
        assert "luksKillSlot" not in all_subp_args
        self.m_unlink.assert_not_called()

    @pytest.fixture
    def common_mocks(self, mocker):
        # These are all "happy path" mocks which will get overridden
        # when needed
        mocker.patch(
            "cloudinit.config.cc_growpart.device_part_info",
            side_effect=self._device_part_info_side_effect,
        )
        mocker.patch("os.stat")
        mocker.patch("stat.S_ISBLK")
        mocker.patch("stat.S_ISCHR")
        mocker.patch(
            "cloudinit.config.cc_growpart.devent2dev",
            side_effect=self._devent2dev_side_effect,
        )
        mocker.patch(
            "os.path.realpath", side_effect=self._realpath_side_effect
        )
        # Only place subp.which is used in cc_growpart is for cryptsetup
        mocker.patch(
            "cloudinit.config.cc_growpart.subp.which",
            return_value="/usr/sbin/cryptsetup",
        )
        self.m_subp = mocker.patch(
            "cloudinit.config.cc_growpart.subp.subp",
            side_effect=self._subp_side_effect,
        )
        mocker.patch(
            "pathlib.Path.open",
            new_callable=mock.mock_open,
            read_data=(
                '{"key":"XFmCwX2FHIQp0LBWaLEMiHIyfxt1SGm16VvUAVledlY=",'
                '"slot":5}'
            ),
        )
        mocker.patch("pathlib.Path.exists", return_value=True)
        self.m_unlink = mocker.patch("pathlib.Path.unlink", autospec=True)

        self.resizer = mock.Mock()
        self.resizer.resize = mock.Mock(return_value=(1024, 1024))

    def test_resize_when_encrypted(self, common_mocks, caplog):
        info = cc_growpart.resize_devices(self.resizer, ["/fake_encrypted"])
        assert len(info) == 2
        assert info[0][0] == "/dev/vdx1"
        assert info[0][2].startswith("no change necessary")
        assert info[1][0] == "/fake_encrypted"
        assert (
            info[1][2]
            == "Successfully resized encrypted volume '/dev/mapper/fake'"
        )
        assert (
            "/dev/mapper/fake is a mapped device pointing to /dev/dm-1"
            in caplog.text
        )
        assert "Determined that /dev/dm-1 is encrypted" in caplog.text

        self.assert_resize_and_cleanup()

    def test_resize_when_unencrypted(self, common_mocks):
        info = cc_growpart.resize_devices(self.resizer, ["/"])
        assert len(info) == 1
        assert info[0][0] == "/"
        assert "encrypted" not in info[0][2]
        self.assert_no_resize_or_cleanup()

    def test_encrypted_but_cryptsetup_not_found(
        self, common_mocks, mocker, caplog
    ):
        mocker.patch(
            "cloudinit.config.cc_growpart.subp.which",
            return_value=None,
        )
        info = cc_growpart.resize_devices(self.resizer, ["/fake_encrypted"])

        assert len(info) == 1
        assert "skipped as it is not encrypted" in info[0][2]
        assert "cryptsetup not found" in caplog.text
        self.assert_no_resize_or_cleanup()

    def test_dmsetup_not_found(self, common_mocks, mocker, caplog):
        def _subp_side_effect(value, **kwargs):
            if value[0] == "dmsetup":
                raise subp.ProcessExecutionError()

        mocker.patch(
            "cloudinit.config.cc_growpart.subp.subp",
            side_effect=_subp_side_effect,
        )
        info = cc_growpart.resize_devices(self.resizer, ["/fake_encrypted"])
        assert len(info) == 1
        assert info[0][0] == "/fake_encrypted"
        assert info[0][1] == "FAILED"
        assert (
            "Resizing encrypted device (/dev/mapper/fake) failed" in info[0][2]
        )
        self.assert_no_resize_or_cleanup()

    def test_unparsable_dmsetup(self, common_mocks, mocker, caplog):
        def _subp_side_effect(value, **kwargs):
            if value[0] == "dmsetup":
                return ("2 dependencies",)
            return mock.Mock()

        mocker.patch(
            "cloudinit.config.cc_growpart.subp.subp",
            side_effect=_subp_side_effect,
        )
        info = cc_growpart.resize_devices(self.resizer, ["/fake_encrypted"])
        assert len(info) == 1
        assert info[0][0] == "/fake_encrypted"
        assert info[0][1] == "FAILED"
        assert (
            "Resizing encrypted device (/dev/mapper/fake) failed" in info[0][2]
        )
        self.assert_no_resize_or_cleanup()

    def test_missing_keydata(self, common_mocks, mocker, caplog):
        # Note that this will be standard behavior after first boot
        # on a system with an encrypted root partition
        mocker.patch("pathlib.Path.open", side_effect=FileNotFoundError())
        info = cc_growpart.resize_devices(self.resizer, ["/fake_encrypted"])
        assert len(info) == 2
        assert info[0][0] == "/dev/vdx1"
        assert info[0][2].startswith("no change necessary")
        assert info[1][0] == "/fake_encrypted"
        assert info[1][1] == "FAILED"
        assert (
            info[1][2]
            == "Resizing encrypted device (/dev/mapper/fake) failed: Could "
            "not load encryption key. This is expected if the volume has "
            "been previously resized."
        )
        self.assert_no_resize_or_cleanup()

    def test_resize_failed(self, common_mocks, mocker, caplog):
        def _subp_side_effect(value, **kwargs):
            if value[0] == "dmsetup":
                return ("1 dependencies : (vdx1)",)
            elif value[0] == "cryptsetup" and "resize" in value:
                raise subp.ProcessExecutionError()
            return mock.Mock()

        self.m_subp = mocker.patch(
            "cloudinit.config.cc_growpart.subp.subp",
            side_effect=_subp_side_effect,
        )

        info = cc_growpart.resize_devices(self.resizer, ["/fake_encrypted"])
        assert len(info) == 2
        assert info[0][0] == "/dev/vdx1"
        assert info[0][2].startswith("no change necessary")
        assert info[1][0] == "/fake_encrypted"
        assert info[1][1] == "FAILED"
        assert (
            "Resizing encrypted device (/dev/mapper/fake) failed" in info[1][2]
        )
        # Assert we still cleanup
        all_subp_args = list(
            chain(*[args[0][0] for args in self.m_subp.call_args_list])
        )
        assert "luksKillSlot" in all_subp_args
        self.m_unlink.assert_called_once()

    def test_resize_skipped(self, common_mocks, mocker, caplog):
        mocker.patch("pathlib.Path.exists", return_value=False)
        info = cc_growpart.resize_devices(self.resizer, ["/fake_encrypted"])
        assert len(info) == 2
        assert info[1] == (
            "/fake_encrypted",
            "SKIPPED",
            "No encryption keyfile found",
        )


def simple_device_part_info(devpath):
    # simple stupid return (/dev/vda, 1) for /dev/vda
    ret = re.search("([^0-9]*)([0-9]*)$", devpath)
    x = (ret.group(1), ret.group(2))
    return x


class Bunch:
    def __init__(self, **kwds):
        self.__dict__.update(kwds)


class TestGrowpartSchema:
    @pytest.mark.parametrize(
        "config, expectation",
        (
            ({"growpart": {"mode": "off"}}, does_not_raise()),
            (
                {"growpart": {"mode": False}},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "Cloud config schema deprecations: "
                        "growpart.mode:  Changed in version 22.3. "
                        "Specifying a boolean ``false`` value for "
                        "``mode`` is deprecated. Use ``off`` instead."
                    ),
                ),
            ),
            (
                {"growpart": {"mode": "false"}},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "growpart.mode: 'false' is not valid under any of the"
                        " given schemas"
                    ),
                ),
            ),
            (
                {"growpart": {"mode": "a"}},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "growpart.mode: 'a' is not valid under any of the"
                        " given schemas"
                    ),
                ),
            ),
            (
                {"growpart": {"devices": "/"}},
                pytest.raises(
                    SchemaValidationError, match="'/' is not of type 'array'"
                ),
            ),
            (
                {"growpart": {"ignore_growroot_disabled": "off"}},
                pytest.raises(
                    SchemaValidationError,
                    match="'off' is not of type 'boolean'",
                ),
            ),
            (
                {"growpart": {"a": "b"}},
                pytest.raises(
                    SchemaValidationError,
                    match="Additional properties are not allowed",
                ),
            ),
        ),
    )
    @skipUnlessJsonSchema()
    def test_schema_validation(self, config, expectation):
        """Assert expected schema validation and error messages."""
        schema = get_schema()
        with expectation:
            validate_cloudconfig_schema(config, schema, strict=True)

haha - 2025