Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/distros/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/distros/test_create_users.py
# This file is part of cloud-init. See LICENSE file for license information.

import re

from cloudinit import distros, log, ssh_util
from tests.unittests.helpers import CiTestCase, mock
from tests.unittests.util import abstract_to_concrete


@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False)
@mock.patch("cloudinit.distros.subp.subp")
class TestCreateUser(CiTestCase):

    with_logs = True

    def setUp(self):
        super(TestCreateUser, self).setUp()
        self.dist = abstract_to_concrete(distros.Distro)(
            name="test", cfg=None, paths=None
        )

    def _useradd2call(self, args):
        # return a mock call for the useradd command in args
        # with expected 'logstring'.
        args = ["useradd"] + args
        logcmd = [a for a in args]
        for i in range(len(args)):
            if args[i] in ("--password",):
                logcmd[i + 1] = "REDACTED"
        return mock.call(args, logstring=logcmd)

    def test_basic(self, m_subp, m_is_snappy):
        user = "foouser"
        self.dist.create_user(user)
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "-m"]),
                mock.call(["passwd", "-l", user]),
            ],
        )

    def test_no_home(self, m_subp, m_is_snappy):
        user = "foouser"
        self.dist.create_user(user, no_create_home=True)
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "-M"]),
                mock.call(["passwd", "-l", user]),
            ],
        )

    def test_system_user(self, m_subp, m_is_snappy):
        # system user should have no home and get --system
        user = "foouser"
        self.dist.create_user(user, system=True)
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "--system", "-M"]),
                mock.call(["passwd", "-l", user]),
            ],
        )

    def test_explicit_no_home_false(self, m_subp, m_is_snappy):
        user = "foouser"
        self.dist.create_user(user, no_create_home=False)
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "-m"]),
                mock.call(["passwd", "-l", user]),
            ],
        )

    def test_unlocked(self, m_subp, m_is_snappy):
        user = "foouser"
        self.dist.create_user(user, lock_passwd=False)
        self.assertEqual(
            m_subp.call_args_list, [self._useradd2call([user, "-m"])]
        )

    def test_set_password(self, m_subp, m_is_snappy):
        user = "foouser"
        password = "passfoo"
        self.dist.create_user(user, passwd=password)
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "--password", password, "-m"]),
                mock.call(["passwd", "-l", user]),
            ],
        )

    @mock.patch("cloudinit.distros.util.is_group")
    def test_group_added(self, m_is_group, m_subp, m_is_snappy):
        m_is_group.return_value = False
        user = "foouser"
        self.dist.create_user(user, groups=["group1"])
        expected = [
            mock.call(["groupadd", "group1"]),
            self._useradd2call([user, "--groups", "group1", "-m"]),
            mock.call(["passwd", "-l", user]),
        ]
        self.assertEqual(m_subp.call_args_list, expected)

    @mock.patch("cloudinit.distros.util.is_group")
    def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy):
        ex_groups = ["existing_group"]
        groups = ["group1", ex_groups[0]]
        m_is_group.side_effect = lambda m: m in ex_groups
        user = "foouser"
        self.dist.create_user(user, groups=groups)
        expected = [
            mock.call(["groupadd", "group1"]),
            self._useradd2call([user, "--groups", ",".join(groups), "-m"]),
            mock.call(["passwd", "-l", user]),
        ]
        self.assertEqual(m_subp.call_args_list, expected)

    @mock.patch("cloudinit.distros.util.is_group")
    def test_create_groups_with_whitespace_string(
        self, m_is_group, m_subp, m_is_snappy
    ):
        # groups supported as a comma delimeted string even with white space
        m_is_group.return_value = False
        user = "foouser"
        self.dist.create_user(user, groups="group1, group2")
        expected = [
            mock.call(["groupadd", "group1"]),
            mock.call(["groupadd", "group2"]),
            self._useradd2call([user, "--groups", "group1,group2", "-m"]),
            mock.call(["passwd", "-l", user]),
        ]
        self.assertEqual(m_subp.call_args_list, expected)

    @mock.patch("cloudinit.distros.util.is_group", return_value=False)
    def test_create_groups_with_dict_deprecated(
        self, m_is_group, m_subp, m_is_snappy
    ):
        """users.groups supports a dict value, but emit deprecation log."""
        log.setupLogging()
        user = "foouser"
        self.dist.create_user(user, groups={"group1": None, "group2": None})
        expected = [
            mock.call(["groupadd", "group1"]),
            mock.call(["groupadd", "group2"]),
            self._useradd2call([user, "--groups", "group1,group2", "-m"]),
            mock.call(["passwd", "-l", user]),
        ]
        self.assertEqual(m_subp.call_args_list, expected)
        self.assertIn(
            "DEPRECAT",
            self.logs.getvalue(),
        )
        self.assertIn(
            "The user foouser has a 'groups' config value of type dict",
            self.logs.getvalue(),
        )
        self.assertIn(
            "Use a comma-delimited",
            self.logs.getvalue(),
        )

    @mock.patch("cloudinit.distros.util.is_group", return_value=False)
    def test_create_groups_with_list(self, m_is_group, m_subp, m_is_snappy):
        """users.groups supports a list value."""
        user = "foouser"
        self.dist.create_user(user, groups=["group1", "group2"])
        expected = [
            mock.call(["groupadd", "group1"]),
            mock.call(["groupadd", "group2"]),
            self._useradd2call([user, "--groups", "group1,group2", "-m"]),
            mock.call(["passwd", "-l", user]),
        ]
        self.assertEqual(m_subp.call_args_list, expected)
        self.assertNotIn("WARNING: DEPRECATED: ", self.logs.getvalue())

    def test_explicit_sudo_false(self, m_subp, m_is_snappy):
        user = "foouser"
        self.dist.create_user(user, sudo=False)
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "-m"]),
                mock.call(["passwd", "-l", user]),
            ],
        )
        self.assertIn(
            "DEPRECATED: The value of 'false' in user foouser's 'sudo' "
            "config is deprecated in 22.3 and scheduled to be removed"
            " in 27.3. Use 'null' instead.",
            self.logs.getvalue(),
        )

    def test_explicit_sudo_none(self, m_subp, m_is_snappy):
        user = "foouser"
        self.dist.create_user(user, sudo=None)
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "-m"]),
                mock.call(["passwd", "-l", user]),
            ],
        )
        self.assertNotIn("WARNING: DEPRECATED: ", self.logs.getvalue())

    @mock.patch("cloudinit.ssh_util.setup_user_keys")
    def test_setup_ssh_authorized_keys_with_string(
        self, m_setup_user_keys, m_subp, m_is_snappy
    ):
        """ssh_authorized_keys allows string and calls setup_user_keys."""
        user = "foouser"
        self.dist.create_user(user, ssh_authorized_keys="mykey")
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "-m"]),
                mock.call(["passwd", "-l", user]),
            ],
        )
        m_setup_user_keys.assert_called_once_with(set(["mykey"]), user)

    @mock.patch("cloudinit.ssh_util.setup_user_keys")
    def test_setup_ssh_authorized_keys_with_list(
        self, m_setup_user_keys, m_subp, m_is_snappy
    ):
        """ssh_authorized_keys allows lists and calls setup_user_keys."""
        user = "foouser"
        self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"])
        self.assertEqual(
            m_subp.call_args_list,
            [
                self._useradd2call([user, "-m"]),
                mock.call(["passwd", "-l", user]),
            ],
        )
        m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user)

    @mock.patch("cloudinit.ssh_util.setup_user_keys")
    def test_setup_ssh_authorized_keys_with_integer(
        self, m_setup_user_keys, m_subp, m_is_snappy
    ):
        """ssh_authorized_keys warns on non-iterable/string type."""
        user = "foouser"
        self.dist.create_user(user, ssh_authorized_keys=-1)
        m_setup_user_keys.assert_called_once_with(set([]), user)
        match = re.match(
            r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for"
            " 'ssh_authorized_keys'.*",
            self.logs.getvalue(),
            re.DOTALL,
        )
        self.assertIsNotNone(
            match, "Missing ssh_authorized_keys invalid type warning"
        )

    @mock.patch("cloudinit.ssh_util.setup_user_keys")
    def test_create_user_with_ssh_redirect_user_no_cloud_keys(
        self, m_setup_user_keys, m_subp, m_is_snappy
    ):
        """Log a warning when trying to redirect a user no cloud ssh keys."""
        user = "foouser"
        self.dist.create_user(user, ssh_redirect_user="someuser")
        self.assertIn(
            "WARNING: Unable to disable SSH logins for foouser given "
            "ssh_redirect_user: someuser. No cloud public-keys present.\n",
            self.logs.getvalue(),
        )
        m_setup_user_keys.assert_not_called()

    @mock.patch("cloudinit.ssh_util.setup_user_keys")
    def test_create_user_with_ssh_redirect_user_with_cloud_keys(
        self, m_setup_user_keys, m_subp, m_is_snappy
    ):
        """Disable ssh when ssh_redirect_user and cloud ssh keys are set."""
        user = "foouser"
        self.dist.create_user(
            user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"]
        )
        disable_prefix = ssh_util.DISABLE_USER_OPTS
        disable_prefix = disable_prefix.replace("$USER", "someuser")
        disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
        m_setup_user_keys.assert_called_once_with(
            set(["key1"]), "foouser", options=disable_prefix
        )

    @mock.patch("cloudinit.ssh_util.setup_user_keys")
    def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys(
        self, m_setup_user_keys, m_subp, m_is_snappy
    ):
        """Do not disable ssh_authorized_keys when ssh_redirect_user is set."""
        user = "foouser"
        self.dist.create_user(
            user,
            ssh_authorized_keys="auth1",
            ssh_redirect_user="someuser",
            cloud_public_ssh_keys=["key1"],
        )
        disable_prefix = ssh_util.DISABLE_USER_OPTS
        disable_prefix = disable_prefix.replace("$USER", "someuser")
        disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
        self.assertEqual(
            m_setup_user_keys.call_args_list,
            [
                mock.call(set(["auth1"]), user),  # not disabled
                mock.call(set(["key1"]), "foouser", options=disable_prefix),
            ],
        )

    @mock.patch("cloudinit.distros.subp.which")
    def test_lock_with_usermod_if_no_passwd(
        self, m_which, m_subp, m_is_snappy
    ):
        """Lock uses usermod --lock if no 'passwd' cmd available."""
        m_which.side_effect = lambda m: m in ("usermod",)
        self.dist.lock_passwd("bob")
        self.assertEqual(
            [mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list
        )

    @mock.patch("cloudinit.distros.subp.which")
    def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy):
        """Lock with only passwd will use passwd."""
        m_which.side_effect = lambda m: m in ("passwd",)
        self.dist.lock_passwd("bob")
        self.assertEqual(
            [mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list
        )

    @mock.patch("cloudinit.distros.subp.which")
    def test_lock_raises_runtime_if_no_commands(
        self, m_which, m_subp, m_is_snappy
    ):
        """Lock with no commands available raises RuntimeError."""
        m_which.return_value = None
        with self.assertRaises(RuntimeError):
            self.dist.lock_passwd("bob")


# vi: ts=4 expandtab

haha - 2025