Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/reporting/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/reporting/test_webhook_handler.py
# This file is part of cloud-init. See LICENSE file for license information.
import time
from contextlib import suppress
from unittest.mock import PropertyMock

import pytest
import responses

from cloudinit.reporting import flush_events
from cloudinit.reporting.events import report_start_event
from cloudinit.reporting.handlers import WebHookHandler


class TestWebHookHandler:
    @pytest.fixture(autouse=True)
    def setup(self, mocker):
        handler = WebHookHandler(endpoint="http://localhost")
        m_registered_items = mocker.patch(
            "cloudinit.registry.DictRegistry.registered_items",
            new_callable=PropertyMock,
        )
        m_registered_items.return_value = {"webhook": handler}

    @responses.activate
    def test_webhook_handler(self, caplog):
        """Test the happy path."""
        responses.add(responses.POST, "http://localhost", status=200)
        report_start_event("name", "description")
        flush_events()
        assert 1 == caplog.text.count(
            "Read from http://localhost (200, 0b) after 1 attempts"
        )

    @responses.activate
    def test_404(self, caplog):
        """Test failure"""
        responses.add(responses.POST, "http://localhost", status=404)
        report_start_event("name", "description")
        flush_events()
        assert 1 == caplog.text.count("Failed posting event")

    @responses.activate
    def test_background_processing(self, caplog):
        """Test that processing happens in background.

        In the non-flush case, ensure that the event is still posted.
        Since the event is posted in the background, wait while looping.
        """
        responses.add(responses.POST, "http://localhost", status=200)
        report_start_event("name", "description")
        start_time = time.time()
        while time.time() - start_time < 3:
            with suppress(AssertionError):
                assert (
                    "Read from http://localhost (200, 0b) after 1 attempts"
                    in caplog.text
                )
                break
        else:
            pytest.fail("Never got expected log message")

    @responses.activate
    @pytest.mark.parametrize(
        "num_failures,expected_log_count,expected_cancel",
        [(2, 2, False), (3, 3, True), (50, 3, True)],
    )
    def test_failures_cancel_flush(
        self, caplog, num_failures, expected_log_count, expected_cancel
    ):
        """Test that too many failures will cancel further processing on flush.

        2 messages should not cancel on flush
        3 or more should cancel on flush
        The number of received messages will be based on how many have
        been processed before the flush was initiated.
        """
        responses.add(responses.POST, "http://localhost", status=404)
        for _ in range(num_failures):
            report_start_event("name", "description")
        flush_events()
        # Force a context switch. Without this, it's possible that the
        # expected log message hasn't made it to the log file yet
        time.sleep(0.01)

        # If we've pushed a bunch of messages, any number could have been
        # processed before we get to the flush.
        assert (
            expected_log_count
            <= caplog.text.count("Failed posting event")
            <= num_failures
        )
        cancelled_message = (
            "Multiple consecutive failures in WebHookHandler. "
            "Cancelling all queued events"
        )
        if expected_cancel:
            assert cancelled_message in caplog.text
        else:
            assert cancelled_message not in caplog.text

    @responses.activate
    def test_multiple_failures_no_flush(self, caplog):
        """Test we don't cancel posting if flush hasn't been requested.

        Since processing happens in the background, wait in a loop
        for all messages to be posted
        """
        responses.add(responses.POST, "http://localhost", status=404)
        for _ in range(10):
            report_start_event("name", "description")
        start_time = time.time()
        while time.time() - start_time < 3:
            with suppress(AssertionError):
                assert 10 == caplog.text.count("Failed posting event")
                break
            time.sleep(0.01)  # Force context switch
        else:
            pytest.fail(
                "Expected 20 failures, only got "
                f"{caplog.text.count('Failed posting event')}"
            )

haha - 2025