Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/distros/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/distros/test_debian.py
# This file is part of cloud-init. See LICENSE file for license information.
from itertools import count, cycle
from unittest import mock

import pytest

from cloudinit import distros, subp, util
from cloudinit.distros.debian import APT_GET_COMMAND, APT_GET_WRAPPER
from tests.unittests.helpers import FilesystemMockingTestCase


@mock.patch("cloudinit.distros.debian.subp.subp")
class TestDebianApplyLocale(FilesystemMockingTestCase):
    def setUp(self):
        super(TestDebianApplyLocale, self).setUp()
        self.new_root = self.tmp_dir()
        self.patchOS(self.new_root)
        self.patchUtils(self.new_root)
        self.spath = self.tmp_path("etc/default/locale", self.new_root)
        cls = distros.fetch("debian")
        self.distro = cls("debian", {}, None)

    def test_no_rerun(self, m_subp):
        """If system has defined locale, no re-run is expected."""
        m_subp.return_value = (None, None)
        locale = "en_US.UTF-8"
        util.write_file(self.spath, "LANG=%s\n" % locale, omode="w")
        self.distro.apply_locale(locale, out_fn=self.spath)
        m_subp.assert_not_called()

    def test_no_regen_on_c_utf8(self, m_subp):
        """If locale is set to C.UTF8, do not attempt to call locale-gen"""
        m_subp.return_value = (None, None)
        locale = "C.UTF-8"
        util.write_file(self.spath, "LANG=%s\n" % "en_US.UTF-8", omode="w")
        self.distro.apply_locale(locale, out_fn=self.spath)
        self.assertEqual(
            [
                [
                    "update-locale",
                    "--locale-file=" + self.spath,
                    "LANG=%s" % locale,
                ]
            ],
            [p[0][0] for p in m_subp.call_args_list],
        )

    def test_rerun_if_different(self, m_subp):
        """If system has different locale, locale-gen should be called."""
        m_subp.return_value = (None, None)
        locale = "en_US.UTF-8"
        util.write_file(self.spath, "LANG=fr_FR.UTF-8", omode="w")
        self.distro.apply_locale(locale, out_fn=self.spath)
        self.assertEqual(
            [
                ["locale-gen", locale],
                [
                    "update-locale",
                    "--locale-file=" + self.spath,
                    "LANG=%s" % locale,
                ],
            ],
            [p[0][0] for p in m_subp.call_args_list],
        )

    def test_rerun_if_no_file(self, m_subp):
        """If system has no locale file, locale-gen should be called."""
        m_subp.return_value = (None, None)
        locale = "en_US.UTF-8"
        self.distro.apply_locale(locale, out_fn=self.spath)
        self.assertEqual(
            [
                ["locale-gen", locale],
                [
                    "update-locale",
                    "--locale-file=" + self.spath,
                    "LANG=%s" % locale,
                ],
            ],
            [p[0][0] for p in m_subp.call_args_list],
        )

    def test_rerun_on_unset_system_locale(self, m_subp):
        """If system has unset locale, locale-gen should be called."""
        m_subp.return_value = (None, None)
        locale = "en_US.UTF-8"
        util.write_file(self.spath, "LANG=", omode="w")
        self.distro.apply_locale(locale, out_fn=self.spath)
        self.assertEqual(
            [
                ["locale-gen", locale],
                [
                    "update-locale",
                    "--locale-file=" + self.spath,
                    "LANG=%s" % locale,
                ],
            ],
            [p[0][0] for p in m_subp.call_args_list],
        )

    def test_rerun_on_mismatched_keys(self, m_subp):
        """If key is LC_ALL and system has only LANG, rerun is expected."""
        m_subp.return_value = (None, None)
        locale = "en_US.UTF-8"
        util.write_file(self.spath, "LANG=", omode="w")
        self.distro.apply_locale(locale, out_fn=self.spath, keyname="LC_ALL")
        self.assertEqual(
            [
                ["locale-gen", locale],
                [
                    "update-locale",
                    "--locale-file=" + self.spath,
                    "LC_ALL=%s" % locale,
                ],
            ],
            [p[0][0] for p in m_subp.call_args_list],
        )

    def test_falseish_locale_raises_valueerror(self, m_subp):
        """locale as None or "" is invalid and should raise ValueError."""

        with self.assertRaises(ValueError) as ctext_m:
            self.distro.apply_locale(None)
            m_subp.assert_not_called()

        self.assertEqual(
            "Failed to provide locale value.", str(ctext_m.exception)
        )

        with self.assertRaises(ValueError) as ctext_m:
            self.distro.apply_locale("")
            m_subp.assert_not_called()
        self.assertEqual(
            "Failed to provide locale value.", str(ctext_m.exception)
        )


@mock.patch.dict("os.environ", {}, clear=True)
@mock.patch("cloudinit.distros.debian.subp.which", return_value=True)
@mock.patch("cloudinit.distros.debian.subp.subp")
class TestPackageCommand:
    distro = distros.fetch("debian")("debian", {}, None)

    @mock.patch(
        "cloudinit.distros.debian.Distro._apt_lock_available",
        return_value=True,
    )
    def test_simple_command(self, m_apt_avail, m_subp, m_which):
        self.distro.package_command("update")
        apt_args = [APT_GET_WRAPPER["command"]]
        apt_args.extend(APT_GET_COMMAND)
        apt_args.append("update")
        expected_call = {
            "args": apt_args,
            "capture": False,
            "env": {"DEBIAN_FRONTEND": "noninteractive"},
        }
        assert m_subp.call_args == mock.call(**expected_call)

    @mock.patch(
        "cloudinit.distros.debian.Distro._apt_lock_available",
        side_effect=[False, False, True],
    )
    @mock.patch("cloudinit.distros.debian.time.sleep")
    def test_wait_for_lock(self, m_sleep, m_apt_avail, m_subp, m_which):
        self.distro._wait_for_apt_command("stub", {"args": "stub2"})
        assert m_sleep.call_args_list == [mock.call(1), mock.call(1)]
        assert m_subp.call_args_list == [mock.call(args="stub2")]

    @mock.patch(
        "cloudinit.distros.debian.Distro._apt_lock_available",
        return_value=False,
    )
    @mock.patch("cloudinit.distros.debian.time.sleep")
    @mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
    def test_lock_wait_timeout(
        self, m_time, m_sleep, m_apt_avail, m_subp, m_which
    ):
        with pytest.raises(TimeoutError):
            self.distro._wait_for_apt_command("stub", "stub2", timeout=5)
        assert m_subp.call_args_list == []

    @mock.patch(
        "cloudinit.distros.debian.Distro._apt_lock_available",
        side_effect=cycle([True, False]),
    )
    @mock.patch("cloudinit.distros.debian.time.sleep")
    def test_lock_exception_wait(self, m_sleep, m_apt_avail, m_subp, m_which):
        exception = subp.ProcessExecutionError(
            exit_code=100, stderr="Could not get apt lock"
        )
        m_subp.side_effect = [exception, exception, "return_thing"]
        ret = self.distro._wait_for_apt_command("stub", {"args": "stub2"})
        assert ret == "return_thing"

    @mock.patch(
        "cloudinit.distros.debian.Distro._apt_lock_available",
        side_effect=cycle([True, False]),
    )
    @mock.patch("cloudinit.distros.debian.time.sleep")
    @mock.patch("cloudinit.distros.debian.time.time", side_effect=count())
    def test_lock_exception_timeout(
        self, m_time, m_sleep, m_apt_avail, m_subp, m_which
    ):
        m_subp.side_effect = subp.ProcessExecutionError(
            exit_code=100, stderr="Could not get apt lock"
        )
        with pytest.raises(TimeoutError):
            self.distro._wait_for_apt_command(
                "stub", {"args": "stub2"}, timeout=5
            )

haha - 2025