|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /usr/src/cloud-init/tests/unittests/distros/ |
Upload File : |
# This file is part of cloud-init. See LICENSE file for license information.
import re
from cloudinit import distros, log, ssh_util
from tests.unittests.helpers import CiTestCase, mock
from tests.unittests.util import abstract_to_concrete
@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False)
@mock.patch("cloudinit.distros.subp.subp")
class TestCreateUser(CiTestCase):
with_logs = True
def setUp(self):
super(TestCreateUser, self).setUp()
self.dist = abstract_to_concrete(distros.Distro)(
name="test", cfg=None, paths=None
)
def _useradd2call(self, args):
# return a mock call for the useradd command in args
# with expected 'logstring'.
args = ["useradd"] + args
logcmd = [a for a in args]
for i in range(len(args)):
if args[i] in ("--password",):
logcmd[i + 1] = "REDACTED"
return mock.call(args, logstring=logcmd)
def test_basic(self, m_subp, m_is_snappy):
user = "foouser"
self.dist.create_user(user)
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "-m"]),
mock.call(["passwd", "-l", user]),
],
)
def test_no_home(self, m_subp, m_is_snappy):
user = "foouser"
self.dist.create_user(user, no_create_home=True)
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "-M"]),
mock.call(["passwd", "-l", user]),
],
)
def test_system_user(self, m_subp, m_is_snappy):
# system user should have no home and get --system
user = "foouser"
self.dist.create_user(user, system=True)
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "--system", "-M"]),
mock.call(["passwd", "-l", user]),
],
)
def test_explicit_no_home_false(self, m_subp, m_is_snappy):
user = "foouser"
self.dist.create_user(user, no_create_home=False)
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "-m"]),
mock.call(["passwd", "-l", user]),
],
)
def test_unlocked(self, m_subp, m_is_snappy):
user = "foouser"
self.dist.create_user(user, lock_passwd=False)
self.assertEqual(
m_subp.call_args_list, [self._useradd2call([user, "-m"])]
)
def test_set_password(self, m_subp, m_is_snappy):
user = "foouser"
password = "passfoo"
self.dist.create_user(user, passwd=password)
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "--password", password, "-m"]),
mock.call(["passwd", "-l", user]),
],
)
@mock.patch("cloudinit.distros.util.is_group")
def test_group_added(self, m_is_group, m_subp, m_is_snappy):
m_is_group.return_value = False
user = "foouser"
self.dist.create_user(user, groups=["group1"])
expected = [
mock.call(["groupadd", "group1"]),
self._useradd2call([user, "--groups", "group1", "-m"]),
mock.call(["passwd", "-l", user]),
]
self.assertEqual(m_subp.call_args_list, expected)
@mock.patch("cloudinit.distros.util.is_group")
def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy):
ex_groups = ["existing_group"]
groups = ["group1", ex_groups[0]]
m_is_group.side_effect = lambda m: m in ex_groups
user = "foouser"
self.dist.create_user(user, groups=groups)
expected = [
mock.call(["groupadd", "group1"]),
self._useradd2call([user, "--groups", ",".join(groups), "-m"]),
mock.call(["passwd", "-l", user]),
]
self.assertEqual(m_subp.call_args_list, expected)
@mock.patch("cloudinit.distros.util.is_group")
def test_create_groups_with_whitespace_string(
self, m_is_group, m_subp, m_is_snappy
):
# groups supported as a comma delimeted string even with white space
m_is_group.return_value = False
user = "foouser"
self.dist.create_user(user, groups="group1, group2")
expected = [
mock.call(["groupadd", "group1"]),
mock.call(["groupadd", "group2"]),
self._useradd2call([user, "--groups", "group1,group2", "-m"]),
mock.call(["passwd", "-l", user]),
]
self.assertEqual(m_subp.call_args_list, expected)
@mock.patch("cloudinit.distros.util.is_group", return_value=False)
def test_create_groups_with_dict_deprecated(
self, m_is_group, m_subp, m_is_snappy
):
"""users.groups supports a dict value, but emit deprecation log."""
log.setupLogging()
user = "foouser"
self.dist.create_user(user, groups={"group1": None, "group2": None})
expected = [
mock.call(["groupadd", "group1"]),
mock.call(["groupadd", "group2"]),
self._useradd2call([user, "--groups", "group1,group2", "-m"]),
mock.call(["passwd", "-l", user]),
]
self.assertEqual(m_subp.call_args_list, expected)
self.assertIn(
"DEPRECAT",
self.logs.getvalue(),
)
self.assertIn(
"The user foouser has a 'groups' config value of type dict",
self.logs.getvalue(),
)
self.assertIn(
"Use a comma-delimited",
self.logs.getvalue(),
)
@mock.patch("cloudinit.distros.util.is_group", return_value=False)
def test_create_groups_with_list(self, m_is_group, m_subp, m_is_snappy):
"""users.groups supports a list value."""
user = "foouser"
self.dist.create_user(user, groups=["group1", "group2"])
expected = [
mock.call(["groupadd", "group1"]),
mock.call(["groupadd", "group2"]),
self._useradd2call([user, "--groups", "group1,group2", "-m"]),
mock.call(["passwd", "-l", user]),
]
self.assertEqual(m_subp.call_args_list, expected)
self.assertNotIn("WARNING: DEPRECATED: ", self.logs.getvalue())
def test_explicit_sudo_false(self, m_subp, m_is_snappy):
user = "foouser"
self.dist.create_user(user, sudo=False)
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "-m"]),
mock.call(["passwd", "-l", user]),
],
)
self.assertIn(
"DEPRECATED: The value of 'false' in user foouser's 'sudo' "
"config is deprecated in 22.3 and scheduled to be removed"
" in 27.3. Use 'null' instead.",
self.logs.getvalue(),
)
def test_explicit_sudo_none(self, m_subp, m_is_snappy):
user = "foouser"
self.dist.create_user(user, sudo=None)
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "-m"]),
mock.call(["passwd", "-l", user]),
],
)
self.assertNotIn("WARNING: DEPRECATED: ", self.logs.getvalue())
@mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_setup_ssh_authorized_keys_with_string(
self, m_setup_user_keys, m_subp, m_is_snappy
):
"""ssh_authorized_keys allows string and calls setup_user_keys."""
user = "foouser"
self.dist.create_user(user, ssh_authorized_keys="mykey")
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "-m"]),
mock.call(["passwd", "-l", user]),
],
)
m_setup_user_keys.assert_called_once_with(set(["mykey"]), user)
@mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_setup_ssh_authorized_keys_with_list(
self, m_setup_user_keys, m_subp, m_is_snappy
):
"""ssh_authorized_keys allows lists and calls setup_user_keys."""
user = "foouser"
self.dist.create_user(user, ssh_authorized_keys=["key1", "key2"])
self.assertEqual(
m_subp.call_args_list,
[
self._useradd2call([user, "-m"]),
mock.call(["passwd", "-l", user]),
],
)
m_setup_user_keys.assert_called_once_with(set(["key1", "key2"]), user)
@mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_setup_ssh_authorized_keys_with_integer(
self, m_setup_user_keys, m_subp, m_is_snappy
):
"""ssh_authorized_keys warns on non-iterable/string type."""
user = "foouser"
self.dist.create_user(user, ssh_authorized_keys=-1)
m_setup_user_keys.assert_called_once_with(set([]), user)
match = re.match(
r".*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for"
" 'ssh_authorized_keys'.*",
self.logs.getvalue(),
re.DOTALL,
)
self.assertIsNotNone(
match, "Missing ssh_authorized_keys invalid type warning"
)
@mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_create_user_with_ssh_redirect_user_no_cloud_keys(
self, m_setup_user_keys, m_subp, m_is_snappy
):
"""Log a warning when trying to redirect a user no cloud ssh keys."""
user = "foouser"
self.dist.create_user(user, ssh_redirect_user="someuser")
self.assertIn(
"WARNING: Unable to disable SSH logins for foouser given "
"ssh_redirect_user: someuser. No cloud public-keys present.\n",
self.logs.getvalue(),
)
m_setup_user_keys.assert_not_called()
@mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_create_user_with_ssh_redirect_user_with_cloud_keys(
self, m_setup_user_keys, m_subp, m_is_snappy
):
"""Disable ssh when ssh_redirect_user and cloud ssh keys are set."""
user = "foouser"
self.dist.create_user(
user, ssh_redirect_user="someuser", cloud_public_ssh_keys=["key1"]
)
disable_prefix = ssh_util.DISABLE_USER_OPTS
disable_prefix = disable_prefix.replace("$USER", "someuser")
disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
m_setup_user_keys.assert_called_once_with(
set(["key1"]), "foouser", options=disable_prefix
)
@mock.patch("cloudinit.ssh_util.setup_user_keys")
def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys(
self, m_setup_user_keys, m_subp, m_is_snappy
):
"""Do not disable ssh_authorized_keys when ssh_redirect_user is set."""
user = "foouser"
self.dist.create_user(
user,
ssh_authorized_keys="auth1",
ssh_redirect_user="someuser",
cloud_public_ssh_keys=["key1"],
)
disable_prefix = ssh_util.DISABLE_USER_OPTS
disable_prefix = disable_prefix.replace("$USER", "someuser")
disable_prefix = disable_prefix.replace("$DISABLE_USER", user)
self.assertEqual(
m_setup_user_keys.call_args_list,
[
mock.call(set(["auth1"]), user), # not disabled
mock.call(set(["key1"]), "foouser", options=disable_prefix),
],
)
@mock.patch("cloudinit.distros.subp.which")
def test_lock_with_usermod_if_no_passwd(
self, m_which, m_subp, m_is_snappy
):
"""Lock uses usermod --lock if no 'passwd' cmd available."""
m_which.side_effect = lambda m: m in ("usermod",)
self.dist.lock_passwd("bob")
self.assertEqual(
[mock.call(["usermod", "--lock", "bob"])], m_subp.call_args_list
)
@mock.patch("cloudinit.distros.subp.which")
def test_lock_with_passwd_if_available(self, m_which, m_subp, m_is_snappy):
"""Lock with only passwd will use passwd."""
m_which.side_effect = lambda m: m in ("passwd",)
self.dist.lock_passwd("bob")
self.assertEqual(
[mock.call(["passwd", "-l", "bob"])], m_subp.call_args_list
)
@mock.patch("cloudinit.distros.subp.which")
def test_lock_raises_runtime_if_no_commands(
self, m_which, m_subp, m_is_snappy
):
"""Lock with no commands available raises RuntimeError."""
m_which.return_value = None
with self.assertRaises(RuntimeError):
self.dist.lock_passwd("bob")
# vi: ts=4 expandtab