Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/config/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/config/test_cc_users_groups.py
# This file is part of cloud-init. See LICENSE file for license information.
import re

import pytest

from cloudinit.config import cc_users_groups
from cloudinit.config.schema import (
    SchemaValidationError,
    get_schema,
    validate_cloudconfig_schema,
)
from tests.unittests.helpers import (
    CiTestCase,
    does_not_raise,
    mock,
    skipUnlessJsonSchema,
)

MODPATH = "cloudinit.config.cc_users_groups"


@mock.patch("cloudinit.distros.ubuntu.Distro.create_group")
@mock.patch("cloudinit.distros.ubuntu.Distro.create_user")
class TestHandleUsersGroups(CiTestCase):
    """Test cc_users_groups handling of config."""

    with_logs = True

    def test_handle_no_cfg_creates_no_users_or_groups(self, m_user, m_group):
        """Test handle with no config will not create users or groups."""
        cfg = {}  # merged cloud-config
        # System config defines a default user for the distro.
        sys_cfg = {
            "default_user": {
                "name": "ubuntu",
                "lock_passwd": True,
                "groups": ["lxd", "sudo"],
                "shell": "/bin/bash",
            }
        }
        metadata = {}
        cloud = self.tmp_cloud(
            distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
        )
        cc_users_groups.handle("modulename", cfg, cloud, None)
        m_user.assert_not_called()
        m_group.assert_not_called()

    def test_handle_users_in_cfg_calls_create_users(self, m_user, m_group):
        """When users in config, create users with distro.create_user."""
        cfg = {"users": ["default", {"name": "me2"}]}  # merged cloud-config
        # System config defines a default user for the distro.
        sys_cfg = {
            "default_user": {
                "name": "ubuntu",
                "lock_passwd": True,
                "groups": ["lxd", "sudo"],
                "shell": "/bin/bash",
            }
        }
        metadata = {}
        cloud = self.tmp_cloud(
            distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
        )
        cc_users_groups.handle("modulename", cfg, cloud, None)
        self.assertCountEqual(
            m_user.call_args_list,
            [
                mock.call(
                    "ubuntu",
                    groups="lxd,sudo",
                    lock_passwd=True,
                    shell="/bin/bash",
                ),
                mock.call("me2", default=False),
            ],
        )
        m_group.assert_not_called()

    @mock.patch("cloudinit.distros.freebsd.Distro.create_group")
    @mock.patch("cloudinit.distros.freebsd.Distro.create_user")
    def test_handle_users_in_cfg_calls_create_users_on_bsd(
        self,
        m_fbsd_user,
        m_fbsd_group,
        m_linux_user,
        m_linux_group,
    ):
        """When users in config, create users with freebsd.create_user."""
        cfg = {
            "users": ["default", {"name": "me2", "uid": 1234}]
        }  # merged cloud-config
        # System config defines a default user for the distro.
        sys_cfg = {
            "default_user": {
                "name": "freebsd",
                "lock_passwd": True,
                "groups": ["wheel"],
                "shell": "/bin/tcsh",
            }
        }
        metadata = {}
        # patch ifconfig -a
        with mock.patch(
            "cloudinit.distros.networking.subp.subp", return_value=("", None)
        ):
            cloud = self.tmp_cloud(
                distro="freebsd", sys_cfg=sys_cfg, metadata=metadata
            )
        cc_users_groups.handle("modulename", cfg, cloud, None)
        self.assertCountEqual(
            m_fbsd_user.call_args_list,
            [
                mock.call(
                    "freebsd",
                    groups="wheel",
                    lock_passwd=True,
                    shell="/bin/tcsh",
                ),
                mock.call("me2", uid=1234, default=False),
            ],
        )
        m_fbsd_group.assert_not_called()
        m_linux_group.assert_not_called()
        m_linux_user.assert_not_called()

    def test_users_with_ssh_redirect_user_passes_keys(self, m_user, m_group):
        """When ssh_redirect_user is True pass default user and cloud keys."""
        cfg = {
            "users": ["default", {"name": "me2", "ssh_redirect_user": True}]
        }
        # System config defines a default user for the distro.
        sys_cfg = {
            "default_user": {
                "name": "ubuntu",
                "lock_passwd": True,
                "groups": ["lxd", "sudo"],
                "shell": "/bin/bash",
            }
        }
        metadata = {"public-keys": ["key1"]}
        cloud = self.tmp_cloud(
            distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
        )
        cc_users_groups.handle("modulename", cfg, cloud, None)
        self.assertCountEqual(
            m_user.call_args_list,
            [
                mock.call(
                    "ubuntu",
                    groups="lxd,sudo",
                    lock_passwd=True,
                    shell="/bin/bash",
                ),
                mock.call(
                    "me2",
                    cloud_public_ssh_keys=["key1"],
                    default=False,
                    ssh_redirect_user="ubuntu",
                ),
            ],
        )
        m_group.assert_not_called()

    def test_users_with_ssh_redirect_user_default_str(self, m_user, m_group):
        """When ssh_redirect_user is 'default' pass default username."""
        cfg = {
            "users": [
                "default",
                {"name": "me2", "ssh_redirect_user": "default"},
            ]
        }
        # System config defines a default user for the distro.
        sys_cfg = {
            "default_user": {
                "name": "ubuntu",
                "lock_passwd": True,
                "groups": ["lxd", "sudo"],
                "shell": "/bin/bash",
            }
        }
        metadata = {"public-keys": ["key1"]}
        cloud = self.tmp_cloud(
            distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
        )
        cc_users_groups.handle("modulename", cfg, cloud, None)
        self.assertCountEqual(
            m_user.call_args_list,
            [
                mock.call(
                    "ubuntu",
                    groups="lxd,sudo",
                    lock_passwd=True,
                    shell="/bin/bash",
                ),
                mock.call(
                    "me2",
                    cloud_public_ssh_keys=["key1"],
                    default=False,
                    ssh_redirect_user="ubuntu",
                ),
            ],
        )
        m_group.assert_not_called()

    def test_users_without_home_cannot_import_ssh_keys(self, m_user, m_group):
        cfg = {
            "users": [
                "default",
                {
                    "name": "me2",
                    "ssh_import_id": ["snowflake"],
                    "no_create_home": True,
                },
            ]
        }
        cloud = self.tmp_cloud(distro="ubuntu", sys_cfg={}, metadata={})
        with self.assertRaises(ValueError) as context_manager:
            cc_users_groups.handle("modulename", cfg, cloud, None)
        m_group.assert_not_called()
        self.assertEqual(
            "Not creating user me2. Key(s) ssh_import_id cannot be provided"
            " with no_create_home",
            str(context_manager.exception),
        )

    def test_users_with_ssh_redirect_user_non_default(self, m_user, m_group):
        """Warn when ssh_redirect_user is not 'default'."""
        cfg = {
            "users": [
                "default",
                {"name": "me2", "ssh_redirect_user": "snowflake"},
            ]
        }
        # System config defines a default user for the distro.
        sys_cfg = {
            "default_user": {
                "name": "ubuntu",
                "lock_passwd": True,
                "groups": ["lxd", "sudo"],
                "shell": "/bin/bash",
            }
        }
        metadata = {"public-keys": ["key1"]}
        cloud = self.tmp_cloud(
            distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
        )
        with self.assertRaises(ValueError) as context_manager:
            cc_users_groups.handle("modulename", cfg, cloud, None)
        m_group.assert_not_called()
        self.assertEqual(
            "Not creating user me2. Invalid value of ssh_redirect_user:"
            " snowflake. Expected values: true, default or false.",
            str(context_manager.exception),
        )

    def test_users_with_ssh_redirect_user_default_false(self, m_user, m_group):
        """When unspecified ssh_redirect_user is false and not set up."""
        cfg = {"users": ["default", {"name": "me2"}]}
        # System config defines a default user for the distro.
        sys_cfg = {
            "default_user": {
                "name": "ubuntu",
                "lock_passwd": True,
                "groups": ["lxd", "sudo"],
                "shell": "/bin/bash",
            }
        }
        metadata = {"public-keys": ["key1"]}
        cloud = self.tmp_cloud(
            distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
        )
        cc_users_groups.handle("modulename", cfg, cloud, None)
        self.assertCountEqual(
            m_user.call_args_list,
            [
                mock.call(
                    "ubuntu",
                    groups="lxd,sudo",
                    lock_passwd=True,
                    shell="/bin/bash",
                ),
                mock.call("me2", default=False),
            ],
        )
        m_group.assert_not_called()

    def test_users_ssh_redirect_user_and_no_default(self, m_user, m_group):
        """Warn when ssh_redirect_user is True and no default user present."""
        cfg = {
            "users": ["default", {"name": "me2", "ssh_redirect_user": True}]
        }
        # System config defines *no* default user for the distro.
        sys_cfg = {}
        metadata = {}  # no public-keys defined
        cloud = self.tmp_cloud(
            distro="ubuntu", sys_cfg=sys_cfg, metadata=metadata
        )
        cc_users_groups.handle("modulename", cfg, cloud, None)
        m_user.assert_called_once_with("me2", default=False)
        m_group.assert_not_called()
        self.assertEqual(
            "WARNING: Ignoring ssh_redirect_user: True for me2. No"
            " default_user defined. Perhaps missing"
            " cloud configuration users:  [default, ..].\n",
            self.logs.getvalue(),
        )


class TestUsersGroupsSchema:
    @pytest.mark.parametrize(
        "config, expectation, has_errors",
        [
            # Validate default settings not covered by examples
            ({"groups": ["anygrp"]}, does_not_raise(), None),
            (
                {"groups": "anygrp,anyothergroup"},
                does_not_raise(),
                None,
            ),  # DEPRECATED
            # Create anygrp with user1 as member
            ({"groups": [{"anygrp": "user1"}]}, does_not_raise(), None),
            # Create anygrp with user1 as member using object/string syntax
            ({"groups": {"anygrp": "user1"}}, does_not_raise(), None),
            # Create anygrp with user1 as member using object/list syntax
            ({"groups": {"anygrp": ["user1"]}}, does_not_raise(), None),
            (
                {"groups": [{"anygrp": ["user1", "user2"]}]},
                does_not_raise(),
                None,
            ),
            # Make default username "olddefault": DEPRECATED
            ({"user": "olddefault"}, does_not_raise(), None),
            # Create multiple users, and include default user. DEPRECATED
            ({"users": [{"name": "bbsw"}]}, does_not_raise(), None),
            (
                {"users": [{"name": "bbsw", "garbage-key": None}]},
                pytest.raises(
                    SchemaValidationError,
                    match="is not valid under any of the given schemas",
                ),
                True,
            ),
            (
                {"groups": {"": "bbsw"}},
                pytest.raises(
                    SchemaValidationError,
                    match="does not match any of the regexes",
                ),
                True,
            ),
            (
                {"users": [{"name": "bbsw", "groups": ["anygrp"]}]},
                does_not_raise(),
                None,
            ),  # user with a list of groups
            ({"groups": [{"yep": ["user1"]}]}, does_not_raise(), None),
            ({"users": "oldstyle,default"}, does_not_raise(), None),
            ({"users": ["default"]}, does_not_raise(), None),
            ({"users": ["default", ["aaa", "bbb"]]}, does_not_raise(), None),
            # no user creation at all
            ({"users": []}, does_not_raise(), None),
            # different default user creation
            ({"users": ["foobar"]}, does_not_raise(), None),
            (
                {"users": [{"name": "bbsw", "lock-passwd": True}]},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "Cloud config schema deprecations: "
                        "users.0.lock-passwd: Default: ``true`` "
                        "Deprecated in version 22.3. Use "
                        "``lock_passwd`` instead."
                    ),
                ),
                False,
            ),
            # users.groups supports comma-delimited str, list and object type
            (
                {"users": [{"name": "bbsw", "groups": "adm, sudo"}]},
                does_not_raise(),
                None,
            ),
            (
                {
                    "users": [
                        {"name": "bbsw", "groups": {"adm": None, "sudo": None}}
                    ]
                },
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "Cloud config schema deprecations: "
                        "users.0.groups.adm: When providing an object "
                        "for users.groups the ``<group_name>`` keys "
                        "are the groups to add this user to Deprecated"
                        " in version 23.1., users.0.groups.sudo: When "
                        "providing an object for users.groups the "
                        "``<group_name>`` keys are the groups to add "
                        "this user to Deprecated in version 23.1."
                    ),
                ),
                False,
            ),
            ({"groups": [{"yep": ["user1"]}]}, does_not_raise(), None),
            (
                {"user": ["no_list_allowed"]},
                pytest.raises(
                    SchemaValidationError,
                    match=re.escape("user: ['no_list_allowed'] is not valid "),
                ),
                True,
            ),
            (
                {"groups": {"anygrp": 1}},
                pytest.raises(
                    SchemaValidationError,
                    match="groups.anygrp: 1 is not of type 'string', 'array'",
                ),
                True,
            ),
            (
                {
                    "users": [{"inactive": True, "name": "cloudy"}],
                },
                pytest.raises(
                    SchemaValidationError,
                    match="errors: users.0: {'inactive': True",
                ),
                True,
            ),
            (
                {
                    "users": [
                        {
                            "expiredate": "2038-01-19",
                            "groups": "users",
                            "name": "foobar",
                        }
                    ]
                },
                does_not_raise(),
                None,
            ),
            (
                {"user": {"name": "aciba", "groups": {"sbuild": None}}},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "Cloud config schema deprecations: "
                        "user.groups.sbuild: When providing an object "
                        "for users.groups the ``<group_name>`` keys "
                        "are the groups to add this user to Deprecated"
                        " in version 23.1."
                    ),
                ),
                False,
            ),
            (
                {"user": {"name": "mynewdefault", "sudo": False}},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "Cloud config schema deprecations: user.sudo:"
                        "  Changed in version 22.2. The value "
                        "``false`` is deprecated for this key, use "
                        "``null`` instead."
                    ),
                ),
                False,
            ),
            (
                {"user": {"name": "mynewdefault", "sudo": None}},
                does_not_raise(),
                None,
            ),
            (
                {"users": [{"name": "a", "uid": "1743"}]},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "Cloud config schema deprecations: "
                        "users.0.uid:  Changed in version 22.3. The "
                        "use of ``string`` type is deprecated. Use "
                        "an ``integer`` instead."
                    ),
                ),
                False,
            ),
            (
                {"users": [{"name": "a", "expiredate": "2038,1,19"}]},
                pytest.raises(
                    SchemaValidationError,
                    match=(
                        "users.0: {'name': 'a', 'expiredate': '2038,1,19'}"
                        " is not valid under any of the given schemas"
                    ),
                ),
                True,
            ),
        ],
    )
    @skipUnlessJsonSchema()
    def test_schema_validation(self, config, expectation, has_errors):
        with expectation as exc_info:
            validate_cloudconfig_schema(config, get_schema(), strict=True)
            if has_errors is not None:
                assert has_errors == exc_info.value.has_errors()

haha - 2025