|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /usr/src/cloud-init/tests/unittests/config/ |
Upload File : |
# This file is part of cloud-init. See LICENSE file for license information.
import re
import pytest
from cloudinit.config import cc_users_groups
from cloudinit.config.schema import (
SchemaValidationError,
get_schema,
validate_cloudconfig_schema,
)
from tests.unittests.helpers import (
CiTestCase,
does_not_raise,
mock,
skipUnlessJsonSchema,
)
MODPATH = "cloudinit.config.cc_users_groups"
@mock.patch("cloudinit.distros.ubuntu.Distro.create_group")
@mock.patch("cloudinit.distros.ubuntu.Distro.create_user")
class TestHandleUsersGroups(CiTestCase):
"""Test cc_users_groups handling of config."""
with_logs = True
def test_handle_no_cfg_creates_no_users_or_groups(self, m_user, m_group):
"""Test handle with no config will not create users or groups."""
cfg = {} # merged cloud-config
# System config defines a default user for the distro.
sys_cfg = {
"default_user": {
"name": "ubuntu",
"lock_passwd": True,
"groups": ["lxd", "sudo"],
"shell": "/bin/bash",
}
}
metadata = {}
cloud = self.tmp_cloud(
distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
)
cc_users_groups.handle("modulename", cfg, cloud, None)
m_user.assert_not_called()
m_group.assert_not_called()
def test_handle_users_in_cfg_calls_create_users(self, m_user, m_group):
"""When users in config, create users with distro.create_user."""
cfg = {"users": ["default", {"name": "me2"}]} # merged cloud-config
# System config defines a default user for the distro.
sys_cfg = {
"default_user": {
"name": "ubuntu",
"lock_passwd": True,
"groups": ["lxd", "sudo"],
"shell": "/bin/bash",
}
}
metadata = {}
cloud = self.tmp_cloud(
distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
)
cc_users_groups.handle("modulename", cfg, cloud, None)
self.assertCountEqual(
m_user.call_args_list,
[
mock.call(
"ubuntu",
groups="lxd,sudo",
lock_passwd=True,
shell="/bin/bash",
),
mock.call("me2", default=False),
],
)
m_group.assert_not_called()
@mock.patch("cloudinit.distros.freebsd.Distro.create_group")
@mock.patch("cloudinit.distros.freebsd.Distro.create_user")
def test_handle_users_in_cfg_calls_create_users_on_bsd(
self,
m_fbsd_user,
m_fbsd_group,
m_linux_user,
m_linux_group,
):
"""When users in config, create users with freebsd.create_user."""
cfg = {
"users": ["default", {"name": "me2", "uid": 1234}]
} # merged cloud-config
# System config defines a default user for the distro.
sys_cfg = {
"default_user": {
"name": "freebsd",
"lock_passwd": True,
"groups": ["wheel"],
"shell": "/bin/tcsh",
}
}
metadata = {}
# patch ifconfig -a
with mock.patch(
"cloudinit.distros.networking.subp.subp", return_value=("", None)
):
cloud = self.tmp_cloud(
distro="freebsd", sys_cfg=sys_cfg, metadata=metadata
)
cc_users_groups.handle("modulename", cfg, cloud, None)
self.assertCountEqual(
m_fbsd_user.call_args_list,
[
mock.call(
"freebsd",
groups="wheel",
lock_passwd=True,
shell="/bin/tcsh",
),
mock.call("me2", uid=1234, default=False),
],
)
m_fbsd_group.assert_not_called()
m_linux_group.assert_not_called()
m_linux_user.assert_not_called()
def test_users_with_ssh_redirect_user_passes_keys(self, m_user, m_group):
"""When ssh_redirect_user is True pass default user and cloud keys."""
cfg = {
"users": ["default", {"name": "me2", "ssh_redirect_user": True}]
}
# System config defines a default user for the distro.
sys_cfg = {
"default_user": {
"name": "ubuntu",
"lock_passwd": True,
"groups": ["lxd", "sudo"],
"shell": "/bin/bash",
}
}
metadata = {"public-keys": ["key1"]}
cloud = self.tmp_cloud(
distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
)
cc_users_groups.handle("modulename", cfg, cloud, None)
self.assertCountEqual(
m_user.call_args_list,
[
mock.call(
"ubuntu",
groups="lxd,sudo",
lock_passwd=True,
shell="/bin/bash",
),
mock.call(
"me2",
cloud_public_ssh_keys=["key1"],
default=False,
ssh_redirect_user="ubuntu",
),
],
)
m_group.assert_not_called()
def test_users_with_ssh_redirect_user_default_str(self, m_user, m_group):
"""When ssh_redirect_user is 'default' pass default username."""
cfg = {
"users": [
"default",
{"name": "me2", "ssh_redirect_user": "default"},
]
}
# System config defines a default user for the distro.
sys_cfg = {
"default_user": {
"name": "ubuntu",
"lock_passwd": True,
"groups": ["lxd", "sudo"],
"shell": "/bin/bash",
}
}
metadata = {"public-keys": ["key1"]}
cloud = self.tmp_cloud(
distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
)
cc_users_groups.handle("modulename", cfg, cloud, None)
self.assertCountEqual(
m_user.call_args_list,
[
mock.call(
"ubuntu",
groups="lxd,sudo",
lock_passwd=True,
shell="/bin/bash",
),
mock.call(
"me2",
cloud_public_ssh_keys=["key1"],
default=False,
ssh_redirect_user="ubuntu",
),
],
)
m_group.assert_not_called()
def test_users_without_home_cannot_import_ssh_keys(self, m_user, m_group):
cfg = {
"users": [
"default",
{
"name": "me2",
"ssh_import_id": ["snowflake"],
"no_create_home": True,
},
]
}
cloud = self.tmp_cloud(distro="ubuntu", sys_cfg={}, metadata={})
with self.assertRaises(ValueError) as context_manager:
cc_users_groups.handle("modulename", cfg, cloud, None)
m_group.assert_not_called()
self.assertEqual(
"Not creating user me2. Key(s) ssh_import_id cannot be provided"
" with no_create_home",
str(context_manager.exception),
)
def test_users_with_ssh_redirect_user_non_default(self, m_user, m_group):
"""Warn when ssh_redirect_user is not 'default'."""
cfg = {
"users": [
"default",
{"name": "me2", "ssh_redirect_user": "snowflake"},
]
}
# System config defines a default user for the distro.
sys_cfg = {
"default_user": {
"name": "ubuntu",
"lock_passwd": True,
"groups": ["lxd", "sudo"],
"shell": "/bin/bash",
}
}
metadata = {"public-keys": ["key1"]}
cloud = self.tmp_cloud(
distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
)
with self.assertRaises(ValueError) as context_manager:
cc_users_groups.handle("modulename", cfg, cloud, None)
m_group.assert_not_called()
self.assertEqual(
"Not creating user me2. Invalid value of ssh_redirect_user:"
" snowflake. Expected values: true, default or false.",
str(context_manager.exception),
)
def test_users_with_ssh_redirect_user_default_false(self, m_user, m_group):
"""When unspecified ssh_redirect_user is false and not set up."""
cfg = {"users": ["default", {"name": "me2"}]}
# System config defines a default user for the distro.
sys_cfg = {
"default_user": {
"name": "ubuntu",
"lock_passwd": True,
"groups": ["lxd", "sudo"],
"shell": "/bin/bash",
}
}
metadata = {"public-keys": ["key1"]}
cloud = self.tmp_cloud(
distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
)
cc_users_groups.handle("modulename", cfg, cloud, None)
self.assertCountEqual(
m_user.call_args_list,
[
mock.call(
"ubuntu",
groups="lxd,sudo",
lock_passwd=True,
shell="/bin/bash",
),
mock.call("me2", default=False),
],
)
m_group.assert_not_called()
def test_users_ssh_redirect_user_and_no_default(self, m_user, m_group):
"""Warn when ssh_redirect_user is True and no default user present."""
cfg = {
"users": ["default", {"name": "me2", "ssh_redirect_user": True}]
}
# System config defines *no* default user for the distro.
sys_cfg = {}
metadata = {} # no public-keys defined
cloud = self.tmp_cloud(
distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
)
cc_users_groups.handle("modulename", cfg, cloud, None)
m_user.assert_called_once_with("me2", default=False)
m_group.assert_not_called()
self.assertEqual(
"WARNING: Ignoring ssh_redirect_user: True for me2. No"
" default_user defined. Perhaps missing"
" cloud configuration users: [default, ..].\n",
self.logs.getvalue(),
)
class TestUsersGroupsSchema:
@pytest.mark.parametrize(
"config, expectation, has_errors",
[
# Validate default settings not covered by examples
({"groups": ["anygrp"]}, does_not_raise(), None),
(
{"groups": "anygrp,anyothergroup"},
does_not_raise(),
None,
), # DEPRECATED
# Create anygrp with user1 as member
({"groups": [{"anygrp": "user1"}]}, does_not_raise(), None),
# Create anygrp with user1 as member using object/string syntax
({"groups": {"anygrp": "user1"}}, does_not_raise(), None),
# Create anygrp with user1 as member using object/list syntax
({"groups": {"anygrp": ["user1"]}}, does_not_raise(), None),
(
{"groups": [{"anygrp": ["user1", "user2"]}]},
does_not_raise(),
None,
),
# Make default username "olddefault": DEPRECATED
({"user": "olddefault"}, does_not_raise(), None),
# Create multiple users, and include default user. DEPRECATED
({"users": [{"name": "bbsw"}]}, does_not_raise(), None),
(
{"users": [{"name": "bbsw", "garbage-key": None}]},
pytest.raises(
SchemaValidationError,
match="is not valid under any of the given schemas",
),
True,
),
(
{"groups": {"": "bbsw"}},
pytest.raises(
SchemaValidationError,
match="does not match any of the regexes",
),
True,
),
(
{"users": [{"name": "bbsw", "groups": ["anygrp"]}]},
does_not_raise(),
None,
), # user with a list of groups
({"groups": [{"yep": ["user1"]}]}, does_not_raise(), None),
({"users": "oldstyle,default"}, does_not_raise(), None),
({"users": ["default"]}, does_not_raise(), None),
({"users": ["default", ["aaa", "bbb"]]}, does_not_raise(), None),
# no user creation at all
({"users": []}, does_not_raise(), None),
# different default user creation
({"users": ["foobar"]}, does_not_raise(), None),
(
{"users": [{"name": "bbsw", "lock-passwd": True}]},
pytest.raises(
SchemaValidationError,
match=(
"Cloud config schema deprecations: "
"users.0.lock-passwd: Default: ``true`` "
"Deprecated in version 22.3. Use "
"``lock_passwd`` instead."
),
),
False,
),
# users.groups supports comma-delimited str, list and object type
(
{"users": [{"name": "bbsw", "groups": "adm, sudo"}]},
does_not_raise(),
None,
),
(
{
"users": [
{"name": "bbsw", "groups": {"adm": None, "sudo": None}}
]
},
pytest.raises(
SchemaValidationError,
match=(
"Cloud config schema deprecations: "
"users.0.groups.adm: When providing an object "
"for users.groups the ``<group_name>`` keys "
"are the groups to add this user to Deprecated"
" in version 23.1., users.0.groups.sudo: When "
"providing an object for users.groups the "
"``<group_name>`` keys are the groups to add "
"this user to Deprecated in version 23.1."
),
),
False,
),
({"groups": [{"yep": ["user1"]}]}, does_not_raise(), None),
(
{"user": ["no_list_allowed"]},
pytest.raises(
SchemaValidationError,
match=re.escape("user: ['no_list_allowed'] is not valid "),
),
True,
),
(
{"groups": {"anygrp": 1}},
pytest.raises(
SchemaValidationError,
match="groups.anygrp: 1 is not of type 'string', 'array'",
),
True,
),
(
{
"users": [{"inactive": True, "name": "cloudy"}],
},
pytest.raises(
SchemaValidationError,
match="errors: users.0: {'inactive': True",
),
True,
),
(
{
"users": [
{
"expiredate": "2038-01-19",
"groups": "users",
"name": "foobar",
}
]
},
does_not_raise(),
None,
),
(
{"user": {"name": "aciba", "groups": {"sbuild": None}}},
pytest.raises(
SchemaValidationError,
match=(
"Cloud config schema deprecations: "
"user.groups.sbuild: When providing an object "
"for users.groups the ``<group_name>`` keys "
"are the groups to add this user to Deprecated"
" in version 23.1."
),
),
False,
),
(
{"user": {"name": "mynewdefault", "sudo": False}},
pytest.raises(
SchemaValidationError,
match=(
"Cloud config schema deprecations: user.sudo:"
" Changed in version 22.2. The value "
"``false`` is deprecated for this key, use "
"``null`` instead."
),
),
False,
),
(
{"user": {"name": "mynewdefault", "sudo": None}},
does_not_raise(),
None,
),
(
{"users": [{"name": "a", "uid": "1743"}]},
pytest.raises(
SchemaValidationError,
match=(
"Cloud config schema deprecations: "
"users.0.uid: Changed in version 22.3. The "
"use of ``string`` type is deprecated. Use "
"an ``integer`` instead."
),
),
False,
),
(
{"users": [{"name": "a", "expiredate": "2038,1,19"}]},
pytest.raises(
SchemaValidationError,
match=(
"users.0: {'name': 'a', 'expiredate': '2038,1,19'}"
" is not valid under any of the given schemas"
),
),
True,
),
],
)
@skipUnlessJsonSchema()
def test_schema_validation(self, config, expectation, has_errors):
with expectation as exc_info:
validate_cloudconfig_schema(config, get_schema(), strict=True)
if has_errors is not None:
assert has_errors == exc_info.value.has_errors()