Test curl-impersonate's HTTP headers

Add tests to verify that the HTTP headers and HTTP/2 pseudo-headers
generated by curl-impersonate match the expected ones from the browser.

The test uses a local nghttpd HTTP/2 server instance with a self-signed
certificate.
This commit is contained in:
lwthiker
2022-03-04 16:40:58 +02:00
parent cf5c661c5a
commit 9bab04c170
9 changed files with 422 additions and 34 deletions

View File

@@ -1,5 +1,6 @@
import os
import io
import re
import logging
import subprocess
@@ -7,7 +8,11 @@ import yaml
import dpkt
import pytest
from signature import BrowserSignature, TLSClientHelloSignature
from signature import (
BrowserSignature,
TLSClientHelloSignature,
HTTP2Signature
)
@pytest.fixture
@@ -93,14 +98,11 @@ class TestSignatureModule:
"""
Test the TLS Client Hello parsing code.
"""
sig = BrowserSignature(
tls_client_hello=TLSClientHelloSignature.from_bytes(
self.CLIENT_HELLO
)
)
sig2 = BrowserSignature.from_dict(
browser_signatures["chrome_98.0.4758.102_win10"]["signature"]
sig = TLSClientHelloSignature.from_bytes(self.CLIENT_HELLO)
sig2 = TLSClientHelloSignature.from_dict(
browser_signatures["chrome_98.0.4758.102_win10"] \
["signature"] \
["tls_client_hello"]
)
equals, reason = sig.equals(sig2, reason=True)
@@ -199,7 +201,22 @@ class TestImpersonation:
p.terminate()
p.wait(timeout=10)
def _run_curl(self, curl_binary, env_vars, url):
@pytest.fixture
def nghttpd(self):
"""Initiailize an HTTP/2 server"""
logging.debug(f"Running nghttpd on :8443")
p = subprocess.Popen([
"nghttpd", "-v",
"8443", "ssl/server.key", "ssl/server.crt"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
yield p
p.terminate()
p.wait(timeout=10)
def _run_curl(self, curl_binary, env_vars, extra_args, url):
env = os.environ.copy()
if env_vars:
env |= env_vars
@@ -209,13 +226,16 @@ class TestImpersonation:
logging.debug("Environment variables: {}".format(
" ".join([f"{k}={v}" for k, v in env_vars.items()])))
curl = subprocess.Popen([
args = [
curl_binary,
"-o", "/dev/null",
"--local-port", f"{self.LOCAL_PORTS[0]}-{self.LOCAL_PORTS[1]}",
url
], env=env)
"--local-port", f"{self.LOCAL_PORTS[0]}-{self.LOCAL_PORTS[1]}"
]
if extra_args:
args += extra_args
args.append(url)
curl = subprocess.Popen(args, env=env)
return curl.wait(timeout=10)
def _extract_client_hello(self, pcap: bytes) -> bytes:
@@ -249,6 +269,41 @@ class TestImpersonation:
return None
def _parse_nghttpd2_output(self, output):
"""Parse the output of nghttpd2.
nghttpd2 in verbose mode writes out the HTTP/2
headers that the client had sent.
"""
lines = output.decode("utf-8").splitlines()
stream_id = None
for line in lines:
m = re.search(r"recv HEADERS frame.*stream_id=(\d+)", line)
if m:
stream_id = m.group(1)
break
assert stream_id is not None, \
"Failed to find HEADERS frame in nghttpd2 output"
pseudo_headers = []
headers = []
for line in lines:
m = re.search(rf"recv \(stream_id={stream_id}\) (.*)", line)
if m:
header = m.group(1)
# If the headers starts with ":" it is a pseudo-header,
# i.e. ":authority". In this case keep only the header name and
# discard the value
if header.startswith(":"):
m = re.match(r"(:\w+):", header)
if m:
pseudo_headers.append(m.group(1))
else:
headers.append(header)
return pseudo_headers, headers
@pytest.mark.parametrize(
"curl_binary, env_vars, expected_signature",
CURL_BINARIES_AND_SIGNATURES
@@ -267,7 +322,10 @@ class TestImpersonation:
extracts the Client Hello packet from the capture and compares its
signature with the expected one defined in the YAML database.
"""
ret = self._run_curl(curl_binary, env_vars, self.TEST_URL)
ret = self._run_curl(curl_binary,
env_vars=env_vars,
extra_args=None,
url=self.TEST_URL)
assert ret == 0
try:
@@ -300,3 +358,48 @@ class TestImpersonation:
equals, msg = sig.equals(expected_sig, reason=True)
assert equals, msg
@pytest.mark.parametrize(
"curl_binary, env_vars, expected_signature",
CURL_BINARIES_AND_SIGNATURES
)
def test_http2_headers(self,
nghttpd,
curl_binary,
env_vars,
browser_signatures,
expected_signature):
ret = self._run_curl(curl_binary,
env_vars=env_vars,
extra_args=["-k"],
url="https://localhost:8443")
assert ret == 0
try:
output, stderr = nghttpd.communicate(timeout=2)
# If nghttpd finished running before timeout, it's likely it failed
# with an error.
assert nghttpd.returncode == 0, \
(f"nghttpd failed with error code {nghttpd.returncode}, "
f"stderr: {stderr}")
except subprocess.TimeoutExpired:
nghttpd.kill()
output, stderr = nghttpd.communicate(timeout=3)
assert len(output) > 0
pseudo_headers, headers = self._parse_nghttpd2_output(output)
logging.debug(
f"Received {len(pseudo_headers)} HTTP/2 pseudo-headers "
f"and {len(headers)} HTTP/2 headers"
)
sig = HTTP2Signature(pseudo_headers, headers)
expected_sig = HTTP2Signature.from_dict(
browser_signatures[expected_signature] \
["signature"] \
["http2"]
)
equals, msg = sig.equals(expected_sig, reason=True)
assert equals, msg