Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/sources/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/sources/test_vmware.py
# Copyright (c) 2021-2022 VMware, Inc. All Rights Reserved.
#
# Authors: Andrew Kutz <akutz@vmware.com>
#          Pengpeng Sun <pengpengs@vmware.com>
#
# This file is part of cloud-init. See LICENSE file for license information.

import base64
import gzip
import os
from contextlib import ExitStack
from textwrap import dedent

import pytest

from cloudinit import dmi, helpers, safeyaml, settings, util
from cloudinit.sources import DataSourceVMware
from cloudinit.sources.helpers.vmware.imc import guestcust_util
from tests.unittests.helpers import (
    CiTestCase,
    FilesystemMockingTestCase,
    mock,
    populate_dir,
    wrap_and_call,
)

MPATH = "cloudinit.sources.DataSourceVMware."
PRODUCT_NAME_FILE_PATH = "/sys/class/dmi/id/product_name"
PRODUCT_NAME = "VMware7,1"
PRODUCT_UUID = "82343CED-E4C7-423B-8F6B-0D34D19067AB"
REROOT_FILES = {
    DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID,
    PRODUCT_NAME_FILE_PATH: PRODUCT_NAME,
}

VMW_MULTIPLE_KEYS = [
    "ssh-rsa AAAAB3NzaC1yc2EAAAA... test1@vmw.com",
    "ssh-rsa AAAAB3NzaC1yc2EAAAA... test2@vmw.com",
]
VMW_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... test@vmw.com"

VMW_METADATA_YAML = """instance-id: cloud-vm
local-hostname: cloud-vm
network:
  version: 2
  ethernets:
    nics:
      match:
        name: ens*
      dhcp4: yes
"""

VMW_USERDATA_YAML = """## template: jinja
#cloud-config
users:
- default
"""

VMW_VENDORDATA_YAML = """## template: jinja
#cloud-config
runcmd:
- echo "Hello, world."
"""


@pytest.fixture(autouse=True)
def common_patches():
    mocks = [
        mock.patch("cloudinit.util.platform.platform", return_value="Linux"),
        mock.patch.multiple(
            "cloudinit.dmi",
            is_container=mock.Mock(return_value=False),
            is_FreeBSD=mock.Mock(return_value=False),
        ),
        mock.patch(
            "cloudinit.sources.DataSourceVMware.netifaces.interfaces",
            return_value=[],
        ),
        mock.patch(
            "cloudinit.sources.DataSourceVMware.getfqdn",
            return_value="host.cloudinit.test",
        ),
    ]
    with ExitStack() as stack:
        for some_mock in mocks:
            stack.enter_context(some_mock)
        yield


class TestDataSourceVMware(CiTestCase):
    """
    Test common functionality that is not transport specific.
    """

    with_logs = True

    def setUp(self):
        super(TestDataSourceVMware, self).setUp()
        self.tmp = self.tmp_dir()

    def test_no_data_access_method(self):
        ds = get_ds(self.tmp)
        ds.vmware_rpctool = None
        with mock.patch(
            "cloudinit.sources.DataSourceVMware.is_vmware_platform",
            return_value=False,
        ):
            ret = ds.get_data()
        self.assertFalse(ret)

    @mock.patch("cloudinit.sources.DataSourceVMware.get_default_ip_addrs")
    def test_get_host_info_ipv4(self, m_fn_ipaddr):
        m_fn_ipaddr.return_value = ("10.10.10.1", None)
        host_info = DataSourceVMware.get_host_info()
        self.assertTrue(host_info)
        self.assertTrue(host_info["hostname"])
        self.assertTrue(host_info["hostname"] == "host.cloudinit.test")
        self.assertTrue(host_info["local-hostname"])
        self.assertTrue(host_info["local_hostname"])
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV4])
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV4] == "10.10.10.1")
        self.assertFalse(host_info.get(DataSourceVMware.LOCAL_IPV6))

    @mock.patch("cloudinit.sources.DataSourceVMware.get_default_ip_addrs")
    def test_get_host_info_ipv6(self, m_fn_ipaddr):
        m_fn_ipaddr.return_value = (None, "2001:db8::::::8888")
        host_info = DataSourceVMware.get_host_info()
        self.assertTrue(host_info)
        self.assertTrue(host_info["hostname"])
        self.assertTrue(host_info["hostname"] == "host.cloudinit.test")
        self.assertTrue(host_info["local-hostname"])
        self.assertTrue(host_info["local_hostname"])
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV6])
        self.assertTrue(
            host_info[DataSourceVMware.LOCAL_IPV6] == "2001:db8::::::8888"
        )
        self.assertFalse(host_info.get(DataSourceVMware.LOCAL_IPV4))

    @mock.patch("cloudinit.sources.DataSourceVMware.get_default_ip_addrs")
    def test_get_host_info_dual(self, m_fn_ipaddr):
        m_fn_ipaddr.return_value = ("10.10.10.1", "2001:db8::::::8888")
        host_info = DataSourceVMware.get_host_info()
        self.assertTrue(host_info)
        self.assertTrue(host_info["hostname"])
        self.assertTrue(host_info["hostname"] == "host.cloudinit.test")
        self.assertTrue(host_info["local-hostname"])
        self.assertTrue(host_info["local_hostname"])
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV4])
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV4] == "10.10.10.1")
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV6])
        self.assertTrue(
            host_info[DataSourceVMware.LOCAL_IPV6] == "2001:db8::::::8888"
        )

    @mock.patch("cloudinit.sources.DataSourceVMware.get_host_info")
    def test_wait_on_network(self, m_fn):
        metadata = {
            DataSourceVMware.WAIT_ON_NETWORK: {
                DataSourceVMware.WAIT_ON_NETWORK_IPV4: True,
                DataSourceVMware.WAIT_ON_NETWORK_IPV6: False,
            },
        }
        m_fn.side_effect = [
            {
                "hostname": "host.cloudinit.test",
                "local-hostname": "host.cloudinit.test",
                "local_hostname": "host.cloudinit.test",
                "network": {
                    "interfaces": {
                        "by-ipv4": {},
                        "by-ipv6": {},
                        "by-mac": {
                            "aa:bb:cc:dd:ee:ff": {"ipv4": [], "ipv6": []}
                        },
                    },
                },
            },
            {
                "hostname": "host.cloudinit.test",
                "local-hostname": "host.cloudinit.test",
                "local-ipv4": "10.10.10.1",
                "local_hostname": "host.cloudinit.test",
                "network": {
                    "interfaces": {
                        "by-ipv4": {
                            "10.10.10.1": {
                                "mac": "aa:bb:cc:dd:ee:ff",
                                "netmask": "255.255.255.0",
                            }
                        },
                        "by-mac": {
                            "aa:bb:cc:dd:ee:ff": {
                                "ipv4": [
                                    {
                                        "addr": "10.10.10.1",
                                        "broadcast": "10.10.10.255",
                                        "netmask": "255.255.255.0",
                                    }
                                ],
                                "ipv6": [],
                            }
                        },
                    },
                },
            },
        ]

        host_info = DataSourceVMware.wait_on_network(metadata)

        logs = self.logs.getvalue()
        expected_logs = [
            "DEBUG: waiting on network: wait4=True, "
            + "ready4=False, wait6=False, ready6=False\n",
            "DEBUG: waiting on network complete\n",
        ]
        for log in expected_logs:
            self.assertIn(log, logs)

        self.assertTrue(host_info)
        self.assertTrue(host_info["hostname"])
        self.assertTrue(host_info["hostname"] == "host.cloudinit.test")
        self.assertTrue(host_info["local-hostname"])
        self.assertTrue(host_info["local_hostname"])
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV4])
        self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV4] == "10.10.10.1")


class TestDataSourceVMwareEnvVars(FilesystemMockingTestCase):
    """
    Test the envvar transport.
    """

    def setUp(self):
        super(TestDataSourceVMwareEnvVars, self).setUp()
        self.tmp = self.tmp_dir()
        os.environ[DataSourceVMware.VMX_GUESTINFO] = "1"
        self.create_system_files()

    def tearDown(self):
        del os.environ[DataSourceVMware.VMX_GUESTINFO]
        return super(TestDataSourceVMwareEnvVars, self).tearDown()

    def create_system_files(self):
        rootd = self.tmp_dir()
        populate_dir(
            rootd,
            {
                DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID,
            },
        )
        self.assertTrue(self.reRoot(rootd))

    def assert_get_data_ok(self, m_fn, m_fn_call_count=6):
        ds = get_ds(self.tmp)
        ds.vmware_rpctool = None
        ret = ds.get_data()
        self.assertTrue(ret)
        self.assertEqual(m_fn_call_count, m_fn.call_count)
        self.assertEqual(
            ds.data_access_method, DataSourceVMware.DATA_ACCESS_METHOD_ENVVAR
        )
        return ds

    def assert_metadata(self, metadata, m_fn, m_fn_call_count=6):
        ds = self.assert_get_data_ok(m_fn, m_fn_call_count)
        assert_metadata(self, ds, metadata)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_subplatform(self, m_fn):
        m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""]
        ds = self.assert_get_data_ok(m_fn, m_fn_call_count=4)
        self.assertEqual(
            ds.subplatform,
            "%s (%s)"
            % (
                DataSourceVMware.DATA_ACCESS_METHOD_ENVVAR,
                DataSourceVMware.get_guestinfo_envvar_key_name("metadata"),
            ),
        )

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_data_metadata_only(self, m_fn):
        m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_data_userdata_only(self, m_fn):
        m_fn.side_effect = ["", VMW_USERDATA_YAML, "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_data_vendordata_only(self, m_fn):
        m_fn.side_effect = ["", "", VMW_VENDORDATA_YAML, ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_data_metadata_base64(self, m_fn):
        data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8"))
        m_fn.side_effect = [data, "base64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_data_metadata_b64(self, m_fn):
        data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8"))
        m_fn.side_effect = [data, "b64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_data_metadata_gzip_base64(self, m_fn):
        data = VMW_METADATA_YAML.encode("utf-8")
        data = gzip.compress(data)
        data = base64.b64encode(data)
        m_fn.side_effect = [data, "gzip+base64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_get_data_metadata_gz_b64(self, m_fn):
        data = VMW_METADATA_YAML.encode("utf-8")
        data = gzip.compress(data)
        data = base64.b64encode(data)
        m_fn.side_effect = [data, "gz+b64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_metadata_single_ssh_key(self, m_fn):
        metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML)
        metadata["public_keys"] = VMW_SINGLE_KEY
        metadata_yaml = safeyaml.dumps(metadata)
        m_fn.side_effect = [metadata_yaml, "", "", ""]
        self.assert_metadata(metadata, m_fn, m_fn_call_count=4)

    @mock.patch(
        "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value"
    )
    def test_metadata_multiple_ssh_keys(self, m_fn):
        metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML)
        metadata["public_keys"] = VMW_MULTIPLE_KEYS
        metadata_yaml = safeyaml.dumps(metadata)
        m_fn.side_effect = [metadata_yaml, "", "", ""]
        self.assert_metadata(metadata, m_fn, m_fn_call_count=4)


class TestDataSourceVMwareGuestInfo(FilesystemMockingTestCase):
    """
    Test the guestinfo transport on a VMware platform.
    """

    def setUp(self):
        super(TestDataSourceVMwareGuestInfo, self).setUp()
        self.tmp = self.tmp_dir()
        self.create_system_files()

    def create_system_files(self):
        rootd = self.tmp_dir()
        populate_dir(
            rootd,
            {
                DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID,
                PRODUCT_NAME_FILE_PATH: PRODUCT_NAME,
            },
        )
        self.assertTrue(self.reRoot(rootd))

    def assert_get_data_ok(self, m_fn, m_fn_call_count=6):
        ds = get_ds(self.tmp)
        ds.vmware_rpctool = "vmware-rpctool"
        ret = ds.get_data()
        self.assertTrue(ret)
        self.assertEqual(m_fn_call_count, m_fn.call_count)
        self.assertEqual(
            ds.data_access_method,
            DataSourceVMware.DATA_ACCESS_METHOD_GUESTINFO,
        )
        return ds

    def assert_metadata(self, metadata, m_fn, m_fn_call_count=6):
        ds = self.assert_get_data_ok(m_fn, m_fn_call_count)
        assert_metadata(self, ds, metadata)

    def test_ds_valid_on_vmware_platform(self):
        system_type = dmi.read_dmi_data("system-product-name")
        self.assertEqual(system_type, PRODUCT_NAME)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_get_subplatform(self, m_fn):
        m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""]
        ds = self.assert_get_data_ok(m_fn, m_fn_call_count=4)
        self.assertEqual(
            ds.subplatform,
            "%s (%s)"
            % (
                DataSourceVMware.DATA_ACCESS_METHOD_GUESTINFO,
                DataSourceVMware.get_guestinfo_key_name("metadata"),
            ),
        )

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_get_data_userdata_only(self, m_fn):
        m_fn.side_effect = ["", VMW_USERDATA_YAML, "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_get_data_vendordata_only(self, m_fn):
        m_fn.side_effect = ["", "", VMW_VENDORDATA_YAML, ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_metadata_single_ssh_key(self, m_fn):
        metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML)
        metadata["public_keys"] = VMW_SINGLE_KEY
        metadata_yaml = safeyaml.dumps(metadata)
        m_fn.side_effect = [metadata_yaml, "", "", ""]
        self.assert_metadata(metadata, m_fn, m_fn_call_count=4)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_metadata_multiple_ssh_keys(self, m_fn):
        metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML)
        metadata["public_keys"] = VMW_MULTIPLE_KEYS
        metadata_yaml = safeyaml.dumps(metadata)
        m_fn.side_effect = [metadata_yaml, "", "", ""]
        self.assert_metadata(metadata, m_fn, m_fn_call_count=4)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_get_data_metadata_base64(self, m_fn):
        data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8"))
        m_fn.side_effect = [data, "base64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_get_data_metadata_b64(self, m_fn):
        data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8"))
        m_fn.side_effect = [data, "b64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_get_data_metadata_gzip_base64(self, m_fn):
        data = VMW_METADATA_YAML.encode("utf-8")
        data = gzip.compress(data)
        data = base64.b64encode(data)
        m_fn.side_effect = [data, "gzip+base64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_get_data_metadata_gz_b64(self, m_fn):
        data = VMW_METADATA_YAML.encode("utf-8")
        data = gzip.compress(data)
        data = base64.b64encode(data)
        m_fn.side_effect = [data, "gz+b64", "", ""]
        self.assert_get_data_ok(m_fn, m_fn_call_count=4)


class TestDataSourceVMwareGuestInfo_InvalidPlatform(FilesystemMockingTestCase):
    """
    Test the guestinfo transport on a non-VMware platform.
    """

    def setUp(self):
        super(TestDataSourceVMwareGuestInfo_InvalidPlatform, self).setUp()
        self.tmp = self.tmp_dir()
        self.create_system_files()

    def create_system_files(self):
        rootd = self.tmp_dir()
        populate_dir(
            rootd,
            {
                DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID,
            },
        )
        self.assertTrue(self.reRoot(rootd))

    @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value")
    def test_ds_invalid_on_non_vmware_platform(self, m_fn):
        system_type = dmi.read_dmi_data("system-product-name")
        self.assertEqual(system_type, None)

        m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""]
        ds = get_ds(self.tmp)
        ds.vmware_rpctool = "vmware-rpctool"
        ret = ds.get_data()
        self.assertFalse(ret)


class TestDataSourceVMwareIMC(CiTestCase):
    """
    Test the VMware Guest OS Customization transport
    """

    with_logs = True

    def setUp(self):
        super(TestDataSourceVMwareIMC, self).setUp()
        self.datasource = DataSourceVMware.DataSourceVMware
        self.tdir = self.tmp_dir()

    def test_get_data_false_on_none_dmi_data(self):
        """When dmi for system-product-name is None, get_data returns False."""
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
        result = wrap_and_call(
            "cloudinit.sources.DataSourceVMware",
            {
                "dmi.read_dmi_data": None,
            },
            ds.get_data,
        )
        self.assertFalse(result, "Expected False return from ds.get_data")
        self.assertIn("No system-product-name found", self.logs.getvalue())

    def test_get_imc_data_vmware_customization_disabled(self):
        """
        When vmware customization is disabled via sys_cfg and
        allow_raw_data is disabled via ds_cfg, log a message.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={
                "disable_vmware_customization": True,
                "datasource": {"VMware": {"allow_raw_data": False}},
            },
            distro={},
            paths=paths,
        )
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [MISC]
            MARKER-ID = 12345345
            """
        )
        util.write_file(conf_file, conf_content)
        result = wrap_and_call(
            "cloudinit.sources.DataSourceVMware",
            {
                "dmi.read_dmi_data": "vmware",
            },
            ds.get_imc_data_fn,
        )
        self.assertEqual(result, (None, None, None))
        self.assertIn(
            "Customization for VMware platform is disabled",
            self.logs.getvalue(),
        )

    def test_get_imc_data_vmware_customization_sys_cfg_disabled(self):
        """
        When vmware customization is disabled via sys_cfg and
        no meta data is found, log a message.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={
                "disable_vmware_customization": True,
                "datasource": {"VMware": {"allow_raw_data": True}},
            },
            distro={},
            paths=paths,
        )
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [MISC]
            MARKER-ID = 12345345
            """
        )
        util.write_file(conf_file, conf_content)
        result = wrap_and_call(
            "cloudinit.sources.DataSourceVMware",
            {
                "dmi.read_dmi_data": "vmware",
                "util.del_dir": True,
                "guestcust_util.search_file": self.tdir,
                "guestcust_util.wait_for_cust_cfg_file": conf_file,
            },
            ds.get_imc_data_fn,
        )
        self.assertEqual(result, (None, None, None))
        self.assertIn(
            "No allowed customization configuration data found",
            self.logs.getvalue(),
        )

    def test_get_imc_data_allow_raw_data_disabled(self):
        """
        When allow_raw_data is disabled via ds_cfg and
        meta data is found, log a message.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={
                "disable_vmware_customization": False,
                "datasource": {"VMware": {"allow_raw_data": False}},
            },
            distro={},
            paths=paths,
        )

        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CLOUDINIT]
            METADATA = test-meta
            """
        )
        util.write_file(conf_file, conf_content)
        result = wrap_and_call(
            "cloudinit.sources.DataSourceVMware",
            {
                "dmi.read_dmi_data": "vmware",
                "util.del_dir": True,
                "guestcust_util.search_file": self.tdir,
                "guestcust_util.wait_for_cust_cfg_file": conf_file,
            },
            ds.get_imc_data_fn,
        )
        self.assertEqual(result, (None, None, None))
        self.assertIn(
            "No allowed customization configuration data found",
            self.logs.getvalue(),
        )

    def test_get_imc_data_vmware_customization_enabled(self):
        """
        When cloud-init workflow for vmware is enabled via sys_cfg log a
        message.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": False},
            distro={},
            paths=paths,
        )
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CUSTOM-SCRIPT]
            SCRIPT-NAME = test-script
            [MISC]
            MARKER-ID = 12345345
            """
        )
        util.write_file(conf_file, conf_content)
        with mock.patch(
            MPATH + "guestcust_util.get_tools_config",
            return_value="true",
        ):
            result = wrap_and_call(
                "cloudinit.sources.DataSourceVMware",
                {
                    "dmi.read_dmi_data": "vmware",
                    "util.del_dir": True,
                    "guestcust_util.search_file": self.tdir,
                    "guestcust_util.wait_for_cust_cfg_file": conf_file,
                },
                ds.get_imc_data_fn,
            )
            self.assertEqual(result, (None, None, None))
        custom_script = self.tmp_path("test-script", self.tdir)
        self.assertIn(
            "Script %s not found!!" % custom_script,
            self.logs.getvalue(),
        )

    def test_get_imc_data_cust_script_disabled(self):
        """
        If custom script is disabled by VMware tools configuration,
        log a message.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": False},
            distro={},
            paths=paths,
        )
        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CUSTOM-SCRIPT]
            SCRIPT-NAME = test-script
            [MISC]
            MARKER-ID = 12345346
            """
        )
        util.write_file(conf_file, conf_content)
        # Prepare the custom sript
        customscript = self.tmp_path("test-script", self.tdir)
        util.write_file(customscript, "This is the post cust script")

        with mock.patch(
            MPATH + "guestcust_util.get_tools_config",
            return_value="invalid",
        ):
            with mock.patch(
                MPATH + "guestcust_util.set_customization_status",
                return_value=("msg", b""),
            ):
                result = wrap_and_call(
                    "cloudinit.sources.DataSourceVMware",
                    {
                        "dmi.read_dmi_data": "vmware",
                        "util.del_dir": True,
                        "guestcust_util.search_file": self.tdir,
                        "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    },
                    ds.get_imc_data_fn,
                )
                self.assertEqual(result, (None, None, None))
        self.assertIn(
            "Custom script is disabled by VM Administrator",
            self.logs.getvalue(),
        )

    def test_get_imc_data_cust_script_enabled(self):
        """
        If custom script is enabled by VMware tools configuration,
        execute the script.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": False},
            distro={},
            paths=paths,
        )
        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CUSTOM-SCRIPT]
            SCRIPT-NAME = test-script
            [MISC]
            MARKER-ID = 12345346
            """
        )
        util.write_file(conf_file, conf_content)

        # Mock custom script is enabled by return true when calling
        # get_tools_config
        with mock.patch(
            MPATH + "guestcust_util.get_tools_config",
            return_value="true",
        ):
            with mock.patch(
                MPATH + "guestcust_util.set_customization_status",
                return_value=("msg", b""),
            ):
                result = wrap_and_call(
                    "cloudinit.sources.DataSourceVMware",
                    {
                        "dmi.read_dmi_data": "vmware",
                        "util.del_dir": True,
                        "guestcust_util.search_file": self.tdir,
                        "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    },
                    ds.get_imc_data_fn,
                )
                self.assertEqual(result, (None, None, None))
        # Verify custom script is trying to be executed
        custom_script = self.tmp_path("test-script", self.tdir)
        self.assertIn(
            "Script %s not found!!" % custom_script,
            self.logs.getvalue(),
        )

    def test_get_imc_data_force_run_post_script_is_yes(self):
        """
        If DEFAULT-RUN-POST-CUST-SCRIPT is yes, custom script could run if
        enable-custom-scripts is not defined in VM Tools configuration
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": False},
            distro={},
            paths=paths,
        )
        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        # set DEFAULT-RUN-POST-CUST-SCRIPT = yes so that enable-custom-scripts
        # default value is TRUE
        conf_content = dedent(
            """\
            [CUSTOM-SCRIPT]
            SCRIPT-NAME = test-script
            [MISC]
            MARKER-ID = 12345346
            DEFAULT-RUN-POST-CUST-SCRIPT = yes
            """
        )
        util.write_file(conf_file, conf_content)

        # Mock get_tools_config(section, key, defaultVal) to return
        # defaultVal
        def my_get_tools_config(*args, **kwargs):
            return args[2]

        with mock.patch(
            MPATH + "guestcust_util.get_tools_config",
            side_effect=my_get_tools_config,
        ):
            with mock.patch(
                MPATH + "guestcust_util.set_customization_status",
                return_value=("msg", b""),
            ):
                result = wrap_and_call(
                    "cloudinit.sources.DataSourceVMware",
                    {
                        "dmi.read_dmi_data": "vmware",
                        "util.del_dir": True,
                        "guestcust_util.search_file": self.tdir,
                        "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    },
                    ds.get_imc_data_fn,
                )
                self.assertEqual(result, (None, None, None))
        # Verify custom script still runs although it is
        # disabled by VMware Tools
        custom_script = self.tmp_path("test-script", self.tdir)
        self.assertIn(
            "Script %s not found!!" % custom_script,
            self.logs.getvalue(),
        )

    def test_get_data_cloudinit_metadata_json(self):
        """
        Test metadata can be loaded to cloud-init metadata and network.
        The metadata format is json.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": True},
            distro={},
            paths=paths,
        )
        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CLOUDINIT]
            METADATA = test-meta
            """
        )
        util.write_file(conf_file, conf_content)
        # Prepare the meta data file
        metadata_file = self.tmp_path("test-meta", self.tdir)
        metadata_content = dedent(
            """\
            {
              "instance-id": "cloud-vm",
              "local-hostname": "my-host.domain.com",
              "network": {
                "version": 2,
                "ethernets": {
                  "eths": {
                    "match": {
                      "name": "ens*"
                    },
                    "dhcp4": true
                  }
                }
              }
            }
            """
        )
        util.write_file(metadata_file, metadata_content)

        with mock.patch(
            MPATH + "guestcust_util.set_customization_status",
            return_value=("msg", b""),
        ):
            result = wrap_and_call(
                "cloudinit.sources.DataSourceVMware",
                {
                    "dmi.read_dmi_data": "vmware",
                    "util.del_dir": True,
                    "guestcust_util.search_file": self.tdir,
                    "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    "guestcust_util.get_imc_dir_path": self.tdir,
                },
                ds._get_data,
            )
            self.assertTrue(result)
        self.assertEqual("cloud-vm", ds.metadata["instance-id"])
        self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"])
        self.assertEqual(2, ds.network_config["version"])
        self.assertTrue(ds.network_config["ethernets"]["eths"]["dhcp4"])

    def test_get_data_cloudinit_metadata_yaml(self):
        """
        Test metadata can be loaded to cloud-init metadata and network.
        The metadata format is yaml.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": True},
            distro={},
            paths=paths,
        )
        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CLOUDINIT]
            METADATA = test-meta
            """
        )
        util.write_file(conf_file, conf_content)
        # Prepare the meta data file
        metadata_file = self.tmp_path("test-meta", self.tdir)
        metadata_content = dedent(
            """\
            instance-id: cloud-vm
            local-hostname: my-host.domain.com
            network:
                version: 2
                ethernets:
                    nics:
                        match:
                            name: ens*
                        dhcp4: yes
            """
        )
        util.write_file(metadata_file, metadata_content)

        with mock.patch(
            MPATH + "guestcust_util.set_customization_status",
            return_value=("msg", b""),
        ):
            result = wrap_and_call(
                "cloudinit.sources.DataSourceVMware",
                {
                    "dmi.read_dmi_data": "vmware",
                    "util.del_dir": True,
                    "guestcust_util.search_file": self.tdir,
                    "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    "guestcust_util.get_imc_dir_path": self.tdir,
                },
                ds._get_data,
            )
            self.assertTrue(result)
        self.assertEqual("cloud-vm", ds.metadata["instance-id"])
        self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"])
        self.assertEqual(2, ds.network_config["version"])
        self.assertTrue(ds.network_config["ethernets"]["nics"]["dhcp4"])

    def test_get_imc_data_cloudinit_metadata_not_valid(self):
        """
        Test metadata is not JSON or YAML format, log a message
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": True},
            distro={},
            paths=paths,
        )

        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CLOUDINIT]
            METADATA = test-meta
            """
        )
        util.write_file(conf_file, conf_content)

        # Prepare the meta data file
        metadata_file = self.tmp_path("test-meta", self.tdir)
        metadata_content = "[This is not json or yaml format]a=b"
        util.write_file(metadata_file, metadata_content)

        with mock.patch(
            MPATH + "guestcust_util.set_customization_status",
            return_value=("msg", b""),
        ):
            result = wrap_and_call(
                "cloudinit.sources.DataSourceVMware",
                {
                    "dmi.read_dmi_data": "vmware",
                    "util.del_dir": True,
                    "guestcust_util.search_file": self.tdir,
                    "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    "guestcust_util.get_imc_dir_path": self.tdir,
                },
                ds.get_data,
            )
        self.assertFalse(result)
        self.assertIn(
            "expected '<document start>', but found '<scalar>'",
            self.logs.getvalue(),
        )

    def test_get_imc_data_cloudinit_metadata_not_found(self):
        """
        Test metadata file can't be found, log a message
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": True},
            distro={},
            paths=paths,
        )
        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CLOUDINIT]
            METADATA = test-meta
            """
        )
        util.write_file(conf_file, conf_content)
        # Don't prepare the meta data file

        with mock.patch(
            MPATH + "guestcust_util.set_customization_status",
            return_value=("msg", b""),
        ):
            result = wrap_and_call(
                "cloudinit.sources.DataSourceVMware",
                {
                    "dmi.read_dmi_data": "vmware",
                    "util.del_dir": True,
                    "guestcust_util.search_file": self.tdir,
                    "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    "guestcust_util.get_imc_dir_path": self.tdir,
                },
                ds.get_imc_data_fn,
            )
            self.assertEqual(result, (None, None, None))
        self.assertIn("Meta data file is not found", self.logs.getvalue())

    def test_get_data_cloudinit_userdata(self):
        """
        Test user data can be loaded to cloud-init user data.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": False},
            distro={},
            paths=paths,
        )

        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CLOUDINIT]
            METADATA = test-meta
            USERDATA = test-user
            """
        )
        util.write_file(conf_file, conf_content)

        # Prepare the meta data file
        metadata_file = self.tmp_path("test-meta", self.tdir)
        metadata_content = dedent(
            """\
            instance-id: cloud-vm
            local-hostname: my-host.domain.com
            network:
                version: 2
                ethernets:
                    nics:
                        match:
                            name: ens*
                        dhcp4: yes
            """
        )
        util.write_file(metadata_file, metadata_content)

        # Prepare the user data file
        userdata_file = self.tmp_path("test-user", self.tdir)
        userdata_content = "This is the user data"
        util.write_file(userdata_file, userdata_content)

        with mock.patch(
            MPATH + "guestcust_util.set_customization_status",
            return_value=("msg", b""),
        ):
            result = wrap_and_call(
                "cloudinit.sources.DataSourceVMware",
                {
                    "dmi.read_dmi_data": "vmware",
                    "util.del_dir": True,
                    "guestcust_util.search_file": self.tdir,
                    "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    "guestcust_util.get_imc_dir_path": self.tdir,
                },
                ds._get_data,
            )
            self.assertTrue(result)
        self.assertEqual("cloud-vm", ds.metadata["instance-id"])
        self.assertEqual(userdata_content, ds.userdata_raw)

    def test_get_imc_data_cloudinit_userdata_not_found(self):
        """
        Test userdata file can't be found.
        """
        paths = helpers.Paths({"cloud_dir": self.tdir})
        ds = self.datasource(
            sys_cfg={"disable_vmware_customization": True},
            distro={},
            paths=paths,
        )

        # Prepare the conf file
        conf_file = self.tmp_path("test-cust", self.tdir)
        conf_content = dedent(
            """\
            [CLOUDINIT]
            METADATA = test-meta
            USERDATA = test-user
            """
        )
        util.write_file(conf_file, conf_content)

        # Prepare the meta data file
        metadata_file = self.tmp_path("test-meta", self.tdir)
        metadata_content = dedent(
            """\
            instance-id: cloud-vm
            local-hostname: my-host.domain.com
            network:
                version: 2
                ethernets:
                    nics:
                        match:
                            name: ens*
                        dhcp4: yes
            """
        )
        util.write_file(metadata_file, metadata_content)

        # Don't prepare the user data file

        with mock.patch(
            MPATH + "guestcust_util.set_customization_status",
            return_value=("msg", b""),
        ):
            result = wrap_and_call(
                "cloudinit.sources.DataSourceVMware",
                {
                    "dmi.read_dmi_data": "vmware",
                    "util.del_dir": True,
                    "guestcust_util.search_file": self.tdir,
                    "guestcust_util.wait_for_cust_cfg_file": conf_file,
                    "guestcust_util.get_imc_dir_path": self.tdir,
                },
                ds.get_imc_data_fn,
            )
            self.assertEqual(result, (None, None, None))
        self.assertIn("Userdata file is not found", self.logs.getvalue())


class TestDataSourceVMwareIMC_MarkerFiles(CiTestCase):
    def setUp(self):
        super(TestDataSourceVMwareIMC_MarkerFiles, self).setUp()
        self.tdir = self.tmp_dir()

    def test_false_when_markerid_none(self):
        """Return False when markerid provided is None."""
        self.assertFalse(
            guestcust_util.check_marker_exists(
                markerid=None, marker_dir=self.tdir
            )
        )

    def test_markerid_file_exist(self):
        """Return False when markerid file path does not exist,
        True otherwise."""
        self.assertFalse(guestcust_util.check_marker_exists("123", self.tdir))
        marker_file = self.tmp_path(".markerfile-123.txt", self.tdir)
        util.write_file(marker_file, "")
        self.assertTrue(guestcust_util.check_marker_exists("123", self.tdir))

    def test_marker_file_setup(self):
        """Test creation of marker files."""
        markerfilepath = self.tmp_path(".markerfile-hi.txt", self.tdir)
        self.assertFalse(os.path.exists(markerfilepath))
        guestcust_util.setup_marker_files(marker_id="hi", marker_dir=self.tdir)
        self.assertTrue(os.path.exists(markerfilepath))


def assert_metadata(test_obj, ds, metadata):
    test_obj.assertEqual(metadata.get("instance-id"), ds.get_instance_id())
    test_obj.assertEqual(
        metadata.get("local-hostname"), ds.get_hostname().hostname
    )

    expected_public_keys = metadata.get("public_keys")
    if not isinstance(expected_public_keys, list):
        expected_public_keys = [expected_public_keys]

    test_obj.assertEqual(expected_public_keys, ds.get_public_ssh_keys())
    test_obj.assertIsInstance(ds.get_public_ssh_keys(), list)


def get_ds(temp_dir):
    ds = DataSourceVMware.DataSourceVMware(
        settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": temp_dir})
    )
    ds.vmware_rpctool = "vmware-rpctool"
    return ds


# vi: ts=4 expandtab

haha - 2025