cloudflared/component-tests/test_termination.py
GoncaloGarcia e05939f1c9
Some checks failed
Check / check (1.22.x, macos-latest) (push) Has been cancelled
Check / check (1.22.x, ubuntu-latest) (push) Has been cancelled
Check / check (1.22.x, windows-latest) (push) Has been cancelled
TUN-8621: Prevent QUIC connection from closing before grace period after unregistering
Whenever cloudflared receives a SIGTERM or SIGINT it goes into graceful shutdown mode, which unregisters the connection and closes the control stream. Unregistering makes it so we no longer receive any new requests and makes the edge close the connection, allowing in-flight requests to finish (within a 3 minute period).
 This was working fine for http2 connections, but the quic proxy was cancelling the context as soon as the controls stream ended, forcing the process to stop immediately.

 This commit changes the behavior so that we wait the full grace period before cancelling the request
2024-09-05 13:15:00 +00:00

128 lines
5.4 KiB
Python

#!/usr/bin/env python
from contextlib import contextmanager
import platform
import signal
import threading
import time
import pytest
import requests
from constants import protocols
from util import start_cloudflared, wait_tunnel_ready, check_tunnel_not_connected
def supported_signals():
if platform.system() == "Windows":
return [signal.SIGTERM]
return [signal.SIGTERM, signal.SIGINT]
class TestTermination:
grace_period = 5
timeout = 10
sse_endpoint = "/sse?freq=1s"
def _extra_config(self, protocol):
return {
"grace-period": f"{self.grace_period}s",
"protocol": protocol,
}
@pytest.mark.parametrize("signal", supported_signals())
@pytest.mark.parametrize("protocol", protocols())
def test_graceful_shutdown(self, tmp_path, component_tests_config, signal, protocol):
config = component_tests_config(self._extra_config(protocol))
with start_cloudflared(
tmp_path, config, cfd_pre_args=["tunnel", "--ha-connections", "1"], new_process=True, capture_output=False) as cloudflared:
wait_tunnel_ready(tunnel_url=config.get_url())
connected = threading.Condition()
in_flight_req = threading.Thread(
target=self.stream_request, args=(config, connected, False, ))
in_flight_req.start()
with connected:
connected.wait(self.timeout)
# Send signal after the SSE connection is established
with self.within_grace_period():
self.terminate_by_signal(cloudflared, signal)
self.wait_eyeball_thread(
in_flight_req, self.grace_period + self.timeout)
# test cloudflared terminates before grace period expires when all eyeball
# connections are drained
@pytest.mark.parametrize("signal", supported_signals())
@pytest.mark.parametrize("protocol", protocols())
def test_shutdown_once_no_connection(self, tmp_path, component_tests_config, signal, protocol):
config = component_tests_config(self._extra_config(protocol))
with start_cloudflared(
tmp_path, config, cfd_pre_args=["tunnel", "--ha-connections", "1"], new_process=True, capture_output=False) as cloudflared:
wait_tunnel_ready(tunnel_url=config.get_url())
connected = threading.Condition()
in_flight_req = threading.Thread(
target=self.stream_request, args=(config, connected, True, ))
in_flight_req.start()
with connected:
connected.wait(self.timeout)
with self.within_grace_period(has_connection=False):
# Send signal after the SSE connection is established
self.terminate_by_signal(cloudflared, signal)
self.wait_eyeball_thread(in_flight_req, self.grace_period)
@pytest.mark.parametrize("signal", supported_signals())
@pytest.mark.parametrize("protocol", protocols())
def test_no_connection_shutdown(self, tmp_path, component_tests_config, signal, protocol):
config = component_tests_config(self._extra_config(protocol))
with start_cloudflared(
tmp_path, config, cfd_pre_args=["tunnel", "--ha-connections", "1"], new_process=True, capture_output=False) as cloudflared:
wait_tunnel_ready(tunnel_url=config.get_url())
with self.within_grace_period(has_connection=False):
self.terminate_by_signal(cloudflared, signal)
def terminate_by_signal(self, cloudflared, sig):
cloudflared.send_signal(sig)
check_tunnel_not_connected()
cloudflared.wait()
def wait_eyeball_thread(self, thread, timeout):
thread.join(timeout)
assert thread.is_alive() == False, "eyeball thread is still alive"
# Using this context asserts logic within the context is executed within grace period
@contextmanager
def within_grace_period(self, has_connection=True):
try:
start = time.time()
yield
finally:
# If the request takes longer than the grace period then we need to wait at most the grace period.
# If the request fell within the grace period cloudflared can close earlier, but to ensure that it doesn't
# close immediately we add a minimum boundary. If cloudflared shutdown in less than 1s it's likely that
# it shutdown as soon as it received SIGINT. The only way cloudflared can close immediately is if it has no
# in-flight requests
minimum = 1 if has_connection else 0
duration = time.time() - start
# Here we truncate to ensure that we don't fail on minute differences like 10.1 instead of 10
assert minimum <= int(duration) <= self.grace_period
def stream_request(self, config, connected, early_terminate):
expected_terminate_message = "502 Bad Gateway"
url = config.get_url() + self.sse_endpoint
with requests.get(url, timeout=5, stream=True) as resp:
with connected:
connected.notifyAll()
lines = 0
for line in resp.iter_lines():
if expected_terminate_message.encode() == line:
break
lines += 1
if early_terminate and lines == 2:
return
# /sse returns count followed by 2 new lines
assert lines >= (self.grace_period * 2)