|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /usr/src/cloud-init/tests/integration_tests/ |
Upload File : |
import logging
import multiprocessing
import os
import re
import time
from collections import namedtuple
from contextlib import contextmanager
from functools import lru_cache
from itertools import chain
from pathlib import Path
from typing import Set
import pytest
from cloudinit.subp import subp
from tests.integration_tests.instances import IntegrationInstance
log = logging.getLogger("integration_testing")
key_pair = namedtuple("key_pair", "public_key private_key")
ASSETS_DIR = Path("tests/integration_tests/assets")
KEY_PATH = ASSETS_DIR / "keys"
def verify_ordered_items_in_text(to_verify: list, text: str):
"""Assert all items in list appear in order in text.
Examples:
verify_ordered_items_in_text(['a', '1'], 'ab1') # passes
verify_ordered_items_in_text(['1', 'a'], 'ab1') # raises AssertionError
"""
index = 0
for item in to_verify:
try:
matched = re.search(item, text[index:])
except re.error:
matched = re.search(re.escape(item), text[index:])
assert matched, "Expected item not found: '{}'".format(item)
index = matched.start()
def verify_clean_log(log: str, ignore_deprecations: bool = True):
"""Assert no unexpected tracebacks or warnings in logs"""
if ignore_deprecations:
is_deprecated = re.compile("deprecat", flags=re.IGNORECASE)
log_lines = log.split("\n")
log_lines = list(
filter(lambda line: not is_deprecated.search(line), log_lines)
)
log = "\n".join(log_lines)
error_logs = re.findall("CRITICAL.*", log) + re.findall("ERROR.*", log)
if error_logs:
raise AssertionError(
"Found unexpected errors: %s" % "\n".join(error_logs)
)
warning_count = log.count("WARN")
expected_warnings = 0
traceback_count = log.count("Traceback")
expected_tracebacks = 0
warning_texts = [
# Consistently on all Azure launches:
# azure.py[WARNING]: No lease found; using default endpoint
"No lease found; using default endpoint",
# Ubuntu lxd storage
"thinpool by default on Ubuntu due to LP #1982780",
"WARNING]: Could not match supplied host pattern, ignoring:",
]
traceback_texts = []
if "oracle" in log:
# LP: #1842752
lease_exists_text = "Stderr: RTNETLINK answers: File exists"
warning_texts.append(lease_exists_text)
traceback_texts.append(lease_exists_text)
# LP: #1833446
fetch_error_text = (
"UrlError: 404 Client Error: Not Found for url: "
"http://169.254.169.254/latest/meta-data/"
)
warning_texts.append(fetch_error_text)
traceback_texts.append(fetch_error_text)
# Oracle has a file in /etc/cloud/cloud.cfg.d that contains
# users:
# - default
# - name: opc
# ssh_redirect_user: true
# This can trigger a warning about opc having no public key
warning_texts.append(
"Unable to disable SSH logins for opc given ssh_redirect_user"
)
for warning_text in warning_texts:
expected_warnings += log.count(warning_text)
for traceback_text in traceback_texts:
expected_tracebacks += log.count(traceback_text)
assert warning_count == expected_warnings, (
f"Unexpected warning count != {expected_warnings}. Found: "
f"{re.findall('WARNING.*', log)}"
)
assert traceback_count == expected_tracebacks
def get_inactive_modules(log: str) -> Set[str]:
matches = re.findall(
r"Skipping modules '(.*)' because no applicable config is provided.",
log,
)
return set(
map(
lambda module: module.strip(),
chain(*map(lambda match: match.split(","), matches)),
)
)
@contextmanager
def emit_dots_on_travis():
"""emit a dot every 60 seconds if running on Travis.
Travis will kill jobs that don't emit output for a certain amount of time.
This context manager spins up a background process which will emit a dot to
stdout every 60 seconds to avoid being killed.
It should be wrapped selectively around operations that are known to take a
long time.
"""
if os.environ.get("TRAVIS") != "true":
# If we aren't on Travis, don't do anything.
yield
return
def emit_dots():
while True:
log.info(".")
time.sleep(60)
dot_process = multiprocessing.Process(target=emit_dots)
dot_process.start()
try:
yield
finally:
dot_process.terminate()
def get_test_rsa_keypair(key_name: str = "test1") -> key_pair:
private_key_path = KEY_PATH / "id_rsa.{}".format(key_name)
public_key_path = KEY_PATH / "id_rsa.{}.pub".format(key_name)
with public_key_path.open() as public_file:
public_key = public_file.read()
with private_key_path.open() as private_file:
private_key = private_file.read()
return key_pair(public_key, private_key)
def get_console_log(client: IntegrationInstance):
try:
console_log = client.instance.console_log()
except NotImplementedError:
pytest.skip("NotImplementedError when requesting console log")
if console_log.lower().startswith("no console output"):
pytest.fail("no console output")
return console_log
@lru_cache()
def lxd_has_nocloud(client: IntegrationInstance) -> bool:
# Bionic or Focal may be detected as NoCloud rather than LXD
lxd_image_metadata = subp(
["lxc", "config", "metadata", "show", client.instance.name]
)
return "/var/lib/cloud/seed/nocloud" in lxd_image_metadata.stdout
def get_feature_flag_value(client: IntegrationInstance, key):
value = client.execute(
'python3 -c "from cloudinit import features; '
f'print(features.{key})"'
).strip()
if "NameError" in value:
raise NameError(f"name '{key}' is not defined")
return value