|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /usr/src/cloud-init/tests/unittests/sources/ |
Upload File : |
# This file is part of cloud-init. See LICENSE file for license information.
import json
import socket
import sys
from urllib.parse import SplitResult, urlsplit
import requests
import responses
from requests.exceptions import ConnectionError, ConnectTimeout
from cloudinit import helpers, settings, sources
from cloudinit.sources import DataSourceScaleway
from tests.unittests.helpers import CiTestCase, ResponsesTestCase, mock
class DataResponses:
"""
Possible responses of the API endpoint
169.254.42.42/user_data/cloud-init and
169.254.42.42/vendor_data/cloud-init.
"""
FAKE_USER_DATA = '#!/bin/bash\necho "user-data"'
@staticmethod
def rate_limited(request):
return 429, request.headers, ""
@staticmethod
def api_error(request):
return 500, request.headers, ""
@classmethod
def get_ok(cls, request):
return 200, request.headers, cls.FAKE_USER_DATA
@staticmethod
def empty(request):
"""
No user data for this server.
"""
return 404, request.headers, ""
class MetadataResponses:
"""
Possible responses of the metadata API.
"""
FAKE_METADATA = {
"id": "00000000-0000-0000-0000-000000000000",
"hostname": "scaleway.host",
"net_in_use": "ipv4",
"tags": [
"AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
],
"ssh_public_keys": [
{
"key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
"fingerprint": "2048 06:ae:... login (RSA)",
},
{
"key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
"fingerprint": "2048 06:ff:... login2 (RSA)",
},
],
}
@classmethod
def get_ok(cls, response):
return 200, response.headers, json.dumps(cls.FAKE_METADATA)
class TestOnScaleway(CiTestCase):
def setUp(self):
super(TestOnScaleway, self).setUp()
self.tmp = self.tmp_dir()
def install_mocks(self, fake_dmi, fake_file_exists, fake_cmdline):
mock, faked = fake_dmi
mock.return_value = "Scaleway" if faked else "Whatever"
mock, faked = fake_file_exists
mock.return_value = faked
mock, faked = fake_cmdline
mock.return_value = (
"initrd=initrd showopts scaleway nousb"
if faked
else "BOOT_IMAGE=/vmlinuz-3.11.0-26-generic"
)
@mock.patch("cloudinit.util.get_cmdline")
@mock.patch("os.path.exists")
@mock.patch("cloudinit.dmi.read_dmi_data")
def test_not_ds_detect(
self, m_read_dmi_data, m_file_exists, m_get_cmdline
):
self.install_mocks(
fake_dmi=(m_read_dmi_data, False),
fake_file_exists=(m_file_exists, False),
fake_cmdline=(m_get_cmdline, False),
)
self.assertFalse(DataSourceScaleway.DataSourceScaleway.ds_detect())
# When not on Scaleway, get_data() returns False.
datasource = DataSourceScaleway.DataSourceScaleway(
settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp})
)
self.assertFalse(datasource.get_data())
@mock.patch("cloudinit.util.get_cmdline")
@mock.patch("os.path.exists")
@mock.patch("cloudinit.dmi.read_dmi_data")
def test_ds_detect_dmi(
self, m_read_dmi_data, m_file_exists, m_get_cmdline
):
"""
dmidecode returns "Scaleway".
"""
# dmidecode returns "Scaleway"
self.install_mocks(
fake_dmi=(m_read_dmi_data, True),
fake_file_exists=(m_file_exists, False),
fake_cmdline=(m_get_cmdline, False),
)
self.assertTrue(DataSourceScaleway.DataSourceScaleway.ds_detect())
@mock.patch("cloudinit.util.get_cmdline")
@mock.patch("os.path.exists")
@mock.patch("cloudinit.dmi.read_dmi_data")
def test_ds_detect_var_run_scaleway(
self, m_read_dmi_data, m_file_exists, m_get_cmdline
):
"""
/var/run/scaleway exists.
"""
self.install_mocks(
fake_dmi=(m_read_dmi_data, False),
fake_file_exists=(m_file_exists, True),
fake_cmdline=(m_get_cmdline, False),
)
self.assertTrue(DataSourceScaleway.DataSourceScaleway.ds_detect())
@mock.patch("cloudinit.util.get_cmdline")
@mock.patch("os.path.exists")
@mock.patch("cloudinit.dmi.read_dmi_data")
def test_ds_detect_cmdline(
self, m_read_dmi_data, m_file_exists, m_get_cmdline
):
"""
"scaleway" in /proc/cmdline.
"""
self.install_mocks(
fake_dmi=(m_read_dmi_data, False),
fake_file_exists=(m_file_exists, False),
fake_cmdline=(m_get_cmdline, True),
)
self.assertTrue(DataSourceScaleway.DataSourceScaleway.ds_detect())
def get_source_address_adapter(*args, **kwargs):
"""
Scaleway user/vendor data API requires to be called with a privileged port.
If the unittests are run as non-root, the user doesn't have the permission
to bind on ports below 1024.
This function removes the bind on a privileged address, since anyway the
HTTP call is mocked by responses.
"""
kwargs.pop("source_address")
return requests.adapters.HTTPAdapter(*args, **kwargs)
def _fix_mocking_url(url: str) -> str:
# Workaround https://github.com/getsentry/responses/pull/166
# This function can be removed when Bionic is EOL
split_result = urlsplit(url)
return SplitResult(
scheme=split_result.scheme,
netloc=split_result.netloc,
path=split_result.path,
query="", # ignore
fragment=split_result.fragment,
).geturl()
class TestDataSourceScaleway(ResponsesTestCase):
def setUp(self):
tmp = self.tmp_dir()
distro = mock.MagicMock()
distro.get_tmp_exec_path = self.tmp_dir
self.datasource = DataSourceScaleway.DataSourceScaleway(
settings.CFG_BUILTIN, distro, helpers.Paths({"run_dir": tmp})
)
super(TestDataSourceScaleway, self).setUp()
self.base_urls = DataSourceScaleway.DS_BASE_URLS
for url in self.base_urls:
# Make sure that API answers on the first try.
# The trailing / at the end of the URL is needed to
# workaround a bug in responses 3.6 which does not match
# the URL otherwise.
self.responses.add_callback(
responses.GET,
f"{url}/",
callback=MetadataResponses.get_ok,
)
# Define the metadata URLS
self.add_patch(
"cloudinit.sources.DataSourceScaleway."
"DataSourceScaleway.ds_detect",
"_m_ds_detect",
return_value=True,
)
self.add_patch(
"cloudinit.sources.DataSourceScaleway.net.find_fallback_nic",
"_m_find_fallback_nic",
return_value="scalewaynic0",
)
def test_set_metadata_url_ipv4_ok(self):
self.datasource._set_metadata_url([self.base_urls[0]])
self.assertTrue(self.base_urls[0] in self.datasource.metadata_url)
def test_set_metadata_url_ipv6_ok(self):
self.datasource._set_metadata_url([self.base_urls[1]])
self.assertTrue(self.base_urls[1] in self.datasource.metadata_url)
@mock.patch(
"cloudinit.sources.DataSourceScaleway.DataSourceScaleway"
".override_ds_detect"
)
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
def test_ipv4_metadata_ok(self, dhcpv4, ds_detect):
"""
get_data() returns metadata, user data and vendor data from IPv4.
"""
ds_detect.return_value = True
self.datasource._set_metadata_url([self.base_urls[0]])
self.responses.reset()
self.responses.add_callback(
responses.GET,
f"{self.base_urls[0]}/",
callback=MetadataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
f"{self.base_urls[1]}/",
callback=MetadataResponses.get_ok,
)
# Use _fix_mocking_url to workaround py3.6 bug in responses
self.responses.add_callback(
responses.GET,
_fix_mocking_url(f"{self.base_urls[0]}/conf?format=json"),
callback=MetadataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
_fix_mocking_url(f"{self.base_urls[1]}/conf?format=json"),
callback=MetadataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
f"{self.base_urls[0]}/user_data/cloud-init",
callback=DataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
f"{self.base_urls[0]}/vendor_data/cloud-init",
callback=DataResponses.get_ok,
)
self.assertTrue(self.datasource.get_data())
self.assertEqual(
self.datasource.get_instance_id(),
MetadataResponses.FAKE_METADATA["id"],
)
ssh_keys = self.datasource.get_public_ssh_keys()
ssh_keys.sort()
self.assertEqual(
ssh_keys,
[
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
],
)
self.assertEqual(
self.datasource.get_hostname().hostname,
MetadataResponses.FAKE_METADATA["hostname"],
)
self.assertEqual(
self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA
)
self.assertEqual(
self.datasource.get_vendordata_raw(), DataResponses.FAKE_USER_DATA
)
self.assertIsNone(self.datasource.availability_zone)
self.assertIsNone(self.datasource.region)
@mock.patch(
"cloudinit.sources.DataSourceScaleway.DataSourceScaleway"
".override_ds_detect"
)
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralIPv6Network")
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
def test_ipv4_metadata_timeout_ipv6_ok(self, dhcpv4, inet6, ds_detect):
"""
get_data() returns metadata, user data and vendor data from IPv6
after IPv4 has failed.
"""
ds_detect.return_value = True
self.datasource._set_metadata_url([self.base_urls[0]])
self.responses.reset()
self.responses.add_callback(
responses.GET,
f"{self.base_urls[0]}/",
callback=ConnectTimeout,
)
self.responses.add_callback(
responses.GET,
f"{self.base_urls[1]}/",
callback=MetadataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
_fix_mocking_url(f"{self.base_urls[1]}/conf?format=json"),
callback=MetadataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
_fix_mocking_url(f"{self.base_urls[1]}/user_data/cloud-init"),
callback=DataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
f"{self.base_urls[1]}/vendor_data/cloud-init",
callback=DataResponses.get_ok,
)
self.datasource.get_data()
if sys.version_info.minor >= 7:
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[0]}",
1,
)
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[1]}",
1,
)
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[1]}/conf?format=json", 1
)
self.assertEqual(
self.datasource.get_instance_id(),
MetadataResponses.FAKE_METADATA["id"],
)
ssh_keys = self.datasource.get_public_ssh_keys()
ssh_keys.sort()
self.assertEqual(
ssh_keys,
[
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
],
)
self.assertEqual(
self.datasource.get_hostname().hostname,
MetadataResponses.FAKE_METADATA["hostname"],
)
self.assertEqual(
self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA
)
self.assertEqual(
self.datasource.get_vendordata_raw(), DataResponses.FAKE_USER_DATA
)
self.assertIsNone(self.datasource.availability_zone)
self.assertIsNone(self.datasource.region)
@mock.patch(
"cloudinit.sources.DataSourceScaleway.DataSourceScaleway"
".override_ds_detect"
)
@mock.patch("cloudinit.url_helper.time.sleep")
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralIPv6Network")
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
def test_ipv4_ipv6_metadata_timeout(self, dhcpv4, inet6, sleep, ds_detect):
"""
get_data() fails to return metadata. Metadata, user data and
vendor data are empty
"""
ds_detect.return_value = True
self.datasource._set_metadata_url([self.base_urls[0]])
# Remove callbacks defined at class initialization
self.responses.reset()
self.responses.add_callback(
responses.GET,
f"{self.base_urls[0]}/",
callback=ConnectTimeout,
)
self.responses.add_callback(
responses.GET,
f"{self.base_urls[1]}/",
callback=ConnectTimeout,
)
self.datasource.max_wait = 0
ret = self.datasource.get_data()
# assert_call_count is not available in responses for py3.6
if sys.version_info.minor >= 7:
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[0]}",
2,
)
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[1]}",
2,
)
self.assertFalse(ret)
self.assertEqual(self.datasource.metadata, {})
self.assertIsNone(self.datasource.get_userdata_raw())
self.assertIsNone(self.datasource.get_vendordata_raw())
@mock.patch(
"cloudinit.sources.DataSourceScaleway.DataSourceScaleway"
".override_ds_detect"
)
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
def test_metadata_ipv4_404(self, dhcpv4, ds_detect):
"""
get_data() returns metadata, but no user data nor vendor data.
"""
ds_detect.return_value = True
self.datasource._set_metadata_url([self.base_urls[0]])
# Make user and vendor data APIs return HTTP/404, which means there is
# no user / vendor data for the server.
self.responses.add_callback(
responses.GET,
_fix_mocking_url(self.datasource.metadata_url),
callback=MetadataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
_fix_mocking_url(self.datasource.userdata_url),
callback=DataResponses.empty,
)
self.responses.add_callback(
responses.GET,
_fix_mocking_url(self.datasource.vendordata_url),
callback=DataResponses.empty,
)
self.datasource.get_data()
self.assertEqual(
self.datasource.metadata, MetadataResponses.FAKE_METADATA
)
self.assertIsNone(self.datasource.get_userdata_raw())
self.assertIsNone(self.datasource.get_vendordata_raw())
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
def test_metadata_connection_errors_legacy_ipv4_url(self, dhcpv4):
"""
get_data() returns ConnectionError on legacy IPv4 URL
"""
self.datasource._set_metadata_url([self.base_urls[0]])
# Make metadata API fail to connect for legacy ipv4 url
self.datasource.metadata_urls = [
"http://169.254.42.42",
]
self.responses.reset()
with self.assertRaises(ConnectionError):
self.responses.add_callback(
responses.GET,
f"{self.datasource.metadata_urls[0]}/",
callback=ConnectionError,
)
self.datasource._set_metadata_url(self.datasource.metadata_urls)
if sys.version_info.minor >= 7:
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[0]}/",
self.datasource.retries,
)
self.assertEqual(self.datasource.metadata, {})
self.assertIsNone(self.datasource.get_userdata_raw())
self.assertIsNone(self.datasource.get_vendordata_raw())
@mock.patch(
"cloudinit.sources.DataSourceScaleway.DataSourceScaleway"
".override_ds_detect"
)
@mock.patch("cloudinit.url_helper.time.sleep")
@mock.patch("cloudinit.sources.DataSourceScaleway.socket.getaddrinfo")
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralIPv6Network")
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
def test_metadata_connection_errors_two_urls(
self, dhcpv4, net6, getaddr, sleep, ds_detect
):
"""
get_data() returns ConnectionError on legacy or DNS URL
The DNS URL is also tested for IPv6 connectivity
"""
ds_detect.return_value = True
self.datasource._set_metadata_url([self.base_urls[0]])
getaddr.side_effect = [
[
[
socket.AF_INET,
]
],
] * 4
# Make metadata API fail to connect for both legacy & DNS
# DNS url will also be tested for IPv6 connectivity
self.datasource.metadata_urls = [
"http://169.254.42.42/",
"http://api-metadata.com/",
]
self.datasource.has_ipv4 = True
self.responses.reset()
self.responses.add_callback(
responses.GET,
self.datasource.metadata_urls[0],
callback=ConnectionError,
)
self.responses.add_callback(
responses.GET,
self.datasource.metadata_urls[1],
callback=ConnectionError,
)
self.datasource.max_wait = 0
self.datasource.get_data()
# url_helper.wait_on_url tests both URL in list each time so
# called twice for each URL
if sys.version_info.minor >= 7:
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[0]}",
2,
)
self.responses.assert_call_count(
f"{self.datasource.metadata_urls[1]}",
2,
)
self.assertIsNone(self.datasource.get_userdata_raw())
self.assertIsNone(self.datasource.get_vendordata_raw())
@mock.patch(
"cloudinit.sources.DataSourceScaleway.DataSourceScaleway"
".override_ds_detect"
)
@mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
@mock.patch("time.sleep", return_value=None)
def test_metadata_ipv4_rate_limit(self, sleep, dhcpv4, ds_detect):
"""
get_data() is rate limited two times by the metadata API when fetching
user data.
"""
ds_detect.return_value = True
self.datasource._set_metadata_url([self.base_urls[0]])
self.responses.add_callback(
responses.GET,
_fix_mocking_url(self.datasource.metadata_url),
callback=MetadataResponses.get_ok,
)
self.responses.add_callback(
responses.GET,
_fix_mocking_url(self.datasource.vendordata_url),
callback=DataResponses.empty,
)
# Temporary bump the retries count so the two rate limits do not
# trigger an exception
self.datasource.retries = 5
# Workaround https://github.com/getsentry/responses/pull/171
# This mocking can be unrolled when Bionic is EOL
call_count = 0
def _callback(request):
nonlocal call_count
call_count += 1
if call_count <= 2:
return DataResponses.rate_limited(request)
return DataResponses.get_ok(request)
self.responses.add_callback(
responses.GET,
_fix_mocking_url(self.datasource.userdata_url),
callback=_callback,
)
self.datasource.get_data()
self.assertEqual(
self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA
)
self.assertEqual(sleep.call_count, 2)
def test_ssh_keys_empty(self):
"""
get_public_ssh_keys() should return empty list if no ssh key are
available
"""
self.datasource.metadata["tags"] = []
self.datasource.metadata["ssh_public_keys"] = []
self.assertEqual(self.datasource.get_public_ssh_keys(), [])
def test_ssh_keys_only_tags(self):
"""
get_public_ssh_keys() should return list of keys available in tags
"""
self.datasource.metadata["tags"] = [
"AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
"AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABCCCCC",
]
self.datasource.metadata["ssh_public_keys"] = []
ssh_keys = self.datasource.get_public_ssh_keys()
ssh_keys.sort()
self.assertEqual(
ssh_keys,
[
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
],
)
def test_ssh_keys_only_conf(self):
"""
get_public_ssh_keys() should return list of keys available in
ssh_public_keys field
"""
self.datasource.metadata["tags"] = []
self.datasource.metadata["ssh_public_keys"] = [
{
"key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
"fingerprint": "2048 06:ae:... login (RSA)",
},
{
"key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
"fingerprint": "2048 06:ff:... login2 (RSA)",
},
]
ssh_keys = self.datasource.get_public_ssh_keys()
ssh_keys.sort()
self.assertEqual(
ssh_keys,
[
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
],
)
def test_ssh_keys_both(self):
"""
get_public_ssh_keys() should return a merge of keys available
in ssh_public_keys and tags
"""
self.datasource.metadata["tags"] = [
"AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
]
self.datasource.metadata["ssh_public_keys"] = [
{
"key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
"fingerprint": "2048 06:ae:... login (RSA)",
},
{
"key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
"fingerprint": "2048 06:ff:... login2 (RSA)",
},
]
ssh_keys = self.datasource.get_public_ssh_keys()
ssh_keys.sort()
self.assertEqual(
ssh_keys,
[
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
],
)
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_legacy_network_config_ok(self, m_get_cmdline, fallback_nic):
"""
network_config will only generate IPv4 config if no ipv6 data is
available in the metadata
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["private_ip"] = "10.10.10.10"
self.datasource.metadata["ipv6"] = None
netcfg = self.datasource.network_config
resp = {
"version": 1,
"config": [
{
"type": "physical",
"name": "ens2",
"subnets": [{"type": "dhcp4"}],
}
],
}
self.assertEqual(netcfg, resp)
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_legacy_network_config_ipv6_ok(self, m_get_cmdline, fallback_nic):
"""
network_config will only generate IPv4/v6 configs if ipv6 data is
available in the metadata
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["private_ip"] = "10.10.10.10"
self.datasource.metadata["ipv6"] = {
"address": "2000:abc:4444:9876::42:999",
"gateway": "2000:abc:4444:9876::42:000",
"netmask": "127",
}
netcfg = self.datasource.network_config
resp = {
"version": 1,
"config": [
{
"type": "physical",
"name": "ens2",
"subnets": [
{"type": "dhcp4"},
{
"type": "static",
"address": "2000:abc:4444:9876::42:999",
"netmask": "127",
"routes": [
{
"gateway": "2000:abc:4444:9876::42:000",
"network": "::",
"prefix": "0",
}
],
},
],
}
],
}
self.assertEqual(netcfg, resp)
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_legacy_network_config_existing(self, m_get_cmdline, fallback_nic):
"""
network_config() should return the same data if a network config
already exists
"""
m_get_cmdline.return_value = "scaleway"
self.datasource._network_config = "0xdeadbeef"
netcfg = self.datasource.network_config
self.assertEqual(netcfg, "0xdeadbeef")
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_legacy_network_config_unset(self, m_get_cmdline, fallback_nic):
"""
_network_config will be set to sources.UNSET after the first boot.
Make sure it behave correctly.
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["ipv6"] = None
self.datasource.metadata["private_ip"] = "10.10.10.10"
self.datasource._network_config = sources.UNSET
resp = {
"version": 1,
"config": [
{
"type": "physical",
"name": "ens2",
"subnets": [{"type": "dhcp4"}],
}
],
}
netcfg = self.datasource.network_config
self.assertEqual(netcfg, resp)
@mock.patch("cloudinit.sources.DataSourceScaleway.LOG.warning")
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_legacy_network_config_cached_none(
self, m_get_cmdline, fallback_nic, logwarning
):
"""
network_config() should return config data if cached data is None
rather than sources.UNSET
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["ipv6"] = None
self.datasource.metadata["private_ip"] = "10.10.10.10"
self.datasource._network_config = None
resp = {
"version": 1,
"config": [
{
"type": "physical",
"name": "ens2",
"subnets": [{"type": "dhcp4"}],
}
],
}
netcfg = self.datasource.network_config
self.assertEqual(netcfg, resp)
logwarning.assert_called_with(
"Found None as cached _network_config. Resetting to %s",
sources.UNSET,
)
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_ipmob_primary_ipv4_config_ok(self, m_get_cmdline, fallback_nic):
"""
network_config will only generate IPv4 config if no ipv6 data is
available in the metadata
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["private_ip"] = None
self.datasource.metadata["ipv6"] = None
self.datasource.ephemeral_fixed_address = "10.10.10.10"
self.datasource.metadata["public_ips"] = [{"address": "10.10.10.10"}]
netcfg = self.datasource.network_config
resp = {
"version": 2,
"ethernets": {
fallback_nic.return_value: {
"routes": [
{"to": "169.254.42.42/32", "via": "62.210.0.1"}
],
"dhcp4": True,
},
},
}
self.assertEqual(netcfg, resp)
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_ipmob_additional_ipv4_config_ok(
self, m_get_cmdline, fallback_nic
):
"""
network_config will only generate IPv4 config if no ipv6 data is
available in the metadata
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["private_ip"] = None
self.datasource.metadata["ipv6"] = None
self.datasource.ephemeral_fixed_address = "10.10.10.10"
self.datasource.metadata["public_ips"] = [
{
"address": "10.10.10.10",
"netmask": "32",
"family": "inet",
},
{
"address": "20.20.20.20",
"netmask": "32",
"family": "inet",
},
]
netcfg = self.datasource.network_config
resp = {
"version": 2,
"ethernets": {
fallback_nic.return_value: {
"dhcp4": True,
"routes": [
{"to": "169.254.42.42/32", "via": "62.210.0.1"}
],
"addresses": ("20.20.20.20/32",),
},
},
}
self.assertEqual(netcfg, resp)
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_ipmob_primary_ipv6_config_ok(self, m_get_cmdline, fallback_nic):
"""
Generate network_config with only IPv6
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["private_ip"] = None
self.datasource.metadata["ipv6"] = None
self.datasource.ephemeral_fixed_address = "10.10.10.10"
self.datasource.metadata["public_ips"] = [
{
"address": "2001:aaa:aaaa:a:aaaa:aaaa:aaaa:1",
"netmask": "64",
"gateway": "fe80::ffff:ffff:ffff:fff1",
"family": "inet6",
},
]
netcfg = self.datasource.network_config
resp = {
"version": 2,
"ethernets": {
fallback_nic.return_value: {
"addresses": ("2001:aaa:aaaa:a:aaaa:aaaa:aaaa:1/64",),
"routes": [
{
"via": "fe80::ffff:ffff:ffff:fff1",
"to": "::/0",
}
],
},
},
}
self.assertEqual(netcfg, resp)
@mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
@mock.patch("cloudinit.util.get_cmdline")
def test_ipmob_primary_ipv4_v6_config_ok(
self, m_get_cmdline, fallback_nic
):
"""
Generate network_config with only IPv6
"""
m_get_cmdline.return_value = "scaleway"
fallback_nic.return_value = "ens2"
self.datasource.metadata["private_ip"] = None
self.datasource.metadata["ipv6"] = None
self.datasource.ephemeral_fixed_address = "10.10.10.10"
self.datasource.metadata["public_ips"] = [
{
"address": "10.10.10.10",
"netmask": "32",
"family": "inet",
},
{
"address": "2001:aaa:aaaa:a:aaaa:aaaa:aaaa:1",
"netmask": "64",
"gateway": "fe80::ffff:ffff:ffff:fff1",
"family": "inet6",
},
]
netcfg = self.datasource.network_config
resp = {
"version": 2,
"ethernets": {
fallback_nic.return_value: {
"dhcp4": True,
"routes": [
{"to": "169.254.42.42/32", "via": "62.210.0.1"},
{
"via": "fe80::ffff:ffff:ffff:fff1",
"to": "::/0",
},
],
"addresses": ("2001:aaa:aaaa:a:aaaa:aaaa:aaaa:1/64",),
},
},
}
self.assertEqual(netcfg, resp)