From 6e672bc71939625637b57a5a6405fd52442c51cd Mon Sep 17 00:00:00 2001 From: lwthiker Date: Tue, 22 Feb 2022 18:03:20 +0200 Subject: [PATCH] Add automated tests to verify curl's TLS signature Add automated testing infrastructure with tests verifying that `curl-impersonate` has the same TLS signature as that of the impersonated browser. Each wrapper script (e.g. curl_chrome98) is launched to wikipedia.org while a packet capture is running in the background. The Client Hello is extracted from the capture, parsed and then compared to the known browser signature. The known signatures are stored in a YAML database. --- README.md | 3 + tests/Dockerfile | 21 + tests/README.md | 24 + tests/requirements.txt | 3 + tests/signature.py | 891 ++++++++++++++++++++++++++++++++++++++ tests/signatures.yaml | 225 ++++++++++ tests/test_impersonate.py | 240 ++++++++++ 7 files changed, 1407 insertions(+) create mode 100644 tests/Dockerfile create mode 100644 tests/README.md create mode 100644 tests/requirements.txt create mode 100644 tests/signature.py create mode 100644 tests/signatures.yaml create mode 100644 tests/test_impersonate.py diff --git a/README.md b/README.md index d7b448d..feff654 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,9 @@ The layout is similar for both. For example, the Firefox directory contains: * [curl-configure.patch](firefox/patches/curl-configure.patch) - Patch to make curl compile with a static libnghttp2. * [curl-static-libnss.patch](firefox/patches/curl-static-libnss.patch) - Patch to make curl compile with a static libnss. +Other files of interest: +* [tests/signatures.yaml](tests/signatures.yaml) - YAML database of known browser signatures that can be impersonated. + ## What's next? This was done in a very hacky way, but I hope it could be turned into a real project. Imagine that you could run: ``` diff --git a/tests/Dockerfile b/tests/Dockerfile new file mode 100644 index 0000000..5676cab --- /dev/null +++ b/tests/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.10.1-slim-buster + +WORKDIR /tests + +RUN apt-get update && \ + apt-get install -y tcpdump libbrotli1 libnss3 + +COPY requirements.txt requirements.txt + +RUN pip install --upgrade pip && \ + pip install -r requirements.txt + +RUN mkdir /tests/firefox /tests/chrome + +# Copy the built binaries from both containers +COPY --from=curl-impersonate-ff /build/out/* /tests/firefox/ +COPY --from=curl-impersonate-chrome /build/out/* /tests/chrome/ + +COPY . . + +ENTRYPOINT ["pytest"] diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..8b414b2 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,24 @@ +The tests verify that `curl-impersonate` has the same network signature as that of the supported browsers. They do not test curl's functionality itself. + +## Running the tests + +The tests assume that you've built both `curl-impersonate-chrome` and `curl-impersonate-ff` docker images before (see [Installation](https://github.com/lwthiker/curl-impersonate#installation)). + +To run the tests, build with: +``` +docker build -t curl-impersonate-tests tests/ +``` +then run with: +``` +docker run --rm curl-impersonate-tests +``` +This simply runs `pytest` in the container. You can pass additional flags to `pytest` such as `--log-cli-level DEBUG`. + +## How the tests work +For each supported browser, a packet capture is started while `curl-impersonate` is run with the relevant wrapper script. The Client Hello message is extracted from the capture, and compared against the known signature of the browser. + +## What's missing +The following tests are still missing: +* Test that `curl-impersonate` sends the HTTP headers in the same order as the browser. +* Test that `curl-impersonate` sends the HTTP/2 pseudo-headers in the same order as the browser. +* Test that `curl-impersonate` sends the same HTTP/2 SETTINGS as the browser. diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000..fa2223a --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,3 @@ +pyyaml +pytest +dpkt diff --git a/tests/signature.py b/tests/signature.py new file mode 100644 index 0000000..016cb85 --- /dev/null +++ b/tests/signature.py @@ -0,0 +1,891 @@ +import enum +import struct +import collections +from typing import List, Any + +import yaml + + +# Special value to denote GREASE in various placements in the Client Hello. +# Intentionally negative so that it won't conflict with any real field. +TLS_GREASE = -1 + + +class TLSVersion(enum.Enum): + # See https://github.com/openssl/openssl/blob/master/include/openssl/prov_ssl.h + TLS_VERSION_1_0 = 0x0301 + TLS_VERSION_1_1 = 0x0302 + TLS_VERSION_1_2 = 0x0303 + TLS_VERSION_1_3 = 0x0304 + + # Special value to denote a GREASE randomized value. + GREASE = TLS_GREASE + + @classmethod + def has_value(cls, value): + return value in [x.value for x in cls] + + +class TLSExtensionType(enum.Enum): + # TLS extensions list + # See https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml + # for the official list, and + # https://github.com/google/boringssl/blob/master/include/openssl/tls1.h + # for BoringSSL's list of supported extensions + server_name = 0 + status_request = 5 + supported_groups = 10 + ec_point_formats = 11 + signature_algorithms = 13 + application_layer_protocol_negotiation = 16 + signed_certificate_timestamp = 18 + padding = 21 + extended_master_secret = 23 + compress_certificate = 27 + record_size_limit = 28 + delegated_credentials = 34 + session_ticket = 35 + supported_versions = 43 + psk_key_exchange_modes = 45 + keyshare = 51 + application_settings = 17513 + renegotiation_info = 65281 + + # Special value to denote a GREASE extension. + GREASE = TLS_GREASE + + +# Possible values for GREASE +TLS_GREASE_VALUES = [ + 0x0a0a, 0x1a1a, 0x2a2a, 0x3a3a, 0x4a4a, 0x5a5a, 0x6a6a, 0x7a7a, + 0x8a8a, 0x9a9a, 0xaaaa, 0xbaba, 0xcaca, 0xdada, 0xeaea, 0xfafa +] + +# Structs for parsing TLS packets +TLS_RECORD_HEADER = "!BHH" +TLSRecordHeader = collections.namedtuple( + "TLSRecordHeader", + "type, version, length" +) + +TLS_HANDSHAKE_HEADER = "!BBHH32sB" +TLSHandshakeHeader = collections.namedtuple( + "TLSHandshakeHeader", + "type, length_high, length_low, version, random, session_id_length" +) + +TLS_EXTENSION_HEADER = "!HH" +TLSExtensionHeader = collections.namedtuple( + "TLSExtensionHeader", + "type, length" +) + + +def serialize_grease(l: List[Any]) -> List[Any]: + return list(map(lambda x: "GREASE" if x == TLS_GREASE else x, l)) + + +def unserialize_grease(l: List[Any]) -> List[Any]: + return list(map(lambda x: TLS_GREASE if x == "GREASE" else x, l)) + + +def parse_tls_int_list(data: bytes, + entry_size: int, + header_size: int = 2, + replace_grease=True): + """Parse a TLS-encoded list of integers. + + This list format is common in TLS packets. + It consists of a two-byte header indicating the total length + of the list, with the entries following. + + The entries may be one of TLS_GREASE_VALUES, in which case they + are replaced with the constant TLS_GREASE (unless replace_grease=False). + + Returns + ------- + entries : list[int] + List of entries extracted from the TLS-encoded list. + size : int + Total size, in bytes, of the list. + """ + + off = 0 + h = "!H" if header_size == 2 else "!B" + (list_length, ) = struct.unpack_from(h, data, off) + off += struct.calcsize(h) + if list_length > len(data) - off: + raise Exception(f"TLS list of integers too long: {list_length} bytes") + + entries = [] + s = "!H" if entry_size == 2 else "!B" + for i in range(list_length // entry_size): + (entry, ) = struct.unpack_from(s, data, off) + off += struct.calcsize(s) + if replace_grease and entry in TLS_GREASE_VALUES: + entry = TLS_GREASE + entries.append(entry) + + return entries, struct.calcsize(h) + list_length + + +def parse_tls_str_list(data: bytes): + """Parse a TLS-encoded list of strings. + + Returns + ------- + entries : list[str] + List of entries extracted from the TLS-encoded list. + size : int + Total size, in bytes, of the list. + """ + off = 0 + header_size = struct.calcsize("!H") + (list_length, ) = struct.unpack_from("!H", data, off) + off += header_size + if list_length > len(data) - off: + raise Exception("TLS list of strings too long") + + entries = [] + while off - header_size < list_length: + (strlen, ) = struct.unpack_from("!B", data, off) + off += struct.calcsize("!B") + entries.append(data[off:off + strlen].decode()) + off += strlen + + return entries, struct.calcsize("!H") + list_length + + +class TLSExtensionSignature(): + """ + Signature of a TLS extension. + + Used to check if two TLS extensions are configured similarly. + + For TLS extensions that have internal parameters to be checked, + a subclass should be created. Subclasses should implement to_dict(), + from_dict() and from_bytes() classmethods. See the subclasses below. + """ + + # A registry of subclasses + registry = {} + + def __init__(self, + ext_type: TLSExtensionType, + length=None): + self.ext_type = ext_type + self.length = length + + def __init_subclass__(cls, /, ext_type: TLSExtensionType, **kwargs): + """Register subclasses to the registry""" + super().__init_subclass__(**kwargs) + cls.registry[ext_type] = cls + cls.ext_type = ext_type + + def to_dict(self): + """Serialize to a dict object. + + By default we serialize the type and length only. + To serialize additional parameters, override this in a subclass. + """ + d = { + "type": self.ext_type.name, + } + if self.length is not None: + d["length"] = self.length + return d + + def equals(self, other: 'TLSExtensionSignature'): + # To check equality, we just compare the dict serializations. + return self.to_dict() == other.to_dict() + + @classmethod + def from_dict(cls, d): + """Unserialize a TLSExtensionSignature from a dict. + + Initializes the suitable subclass if exists, otherwise initializes + a TLSExtensionSignature proper instance. + """ + d = d.copy() + ext_type = TLSExtensionType[d.pop("type")] + if ext_type in cls.registry: + return cls.registry[ext_type].from_dict(d) + else: + return TLSExtensionSignature( + ext_type=ext_type, + length=d.pop("length", None) + ) + + @classmethod + def from_bytes(cls, ext: bytes): + """Build a TLSExtensionSignature from a raw TLS extension. + + Parameters + ---------- + ext : bytes + Raw over-the-wire contents of the TLS extension. + """ + off = 0 + header = TLSExtensionHeader._make(struct.unpack_from( + TLS_EXTENSION_HEADER, ext, off + )) + off += struct.calcsize(TLS_EXTENSION_HEADER) + if header.type in TLS_GREASE_VALUES: + ext_type = TLSExtensionType.GREASE + else: + ext_type = TLSExtensionType(header.type) + + if ext_type in cls.registry: + return cls.registry[ext_type].from_bytes( + length=header.length, + data=ext[off:off + header.length] + ) + else: + return TLSExtensionSignature( + ext_type=ext_type, + length=header.length + ) + + +class TLSExtensionGrease(TLSExtensionSignature, + ext_type=TLSExtensionType.GREASE): + def __init__(self, length, data=None): + super().__init__(self.ext_type, length) + self.data = data + + def to_dict(self): + # Add the binary data to the serialization. + d = super().to_dict() + if self.data: + d["data"] = self.data + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionGrease(d["length"], d.get("data")) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + return TLSExtensionGrease(length, data) + + +class TLSExtensionServerName(TLSExtensionSignature, + ext_type=TLSExtensionType.server_name): + def __init__(self): + # Set length to None. Server names have differing lengths, + # so the length should not be part of the signature. + super().__init__(self.ext_type, length=None) + + @classmethod + def from_dict(cls, d): + return TLSExtensionServerName() + + @classmethod + def from_bytes(cls, length: int, data: bytes): + return TLSExtensionServerName() + + +class TLSExtensionStatusRequest(TLSExtensionSignature, + ext_type=TLSExtensionType.status_request): + def __init__(self, length, status_request_type: int): + super().__init__(self.ext_type, length=length) + self.status_request_type = status_request_type + + def to_dict(self): + d = super().to_dict() + d["status_request_type"] = self.status_request_type + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionStatusRequest(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + (status_request_type, ) = struct.unpack_from("!B", data, 0) + return TLSExtensionStatusRequest(length, status_request_type) + + +class TLSExtensionSupportedGroups(TLSExtensionSignature, + ext_type=TLSExtensionType.supported_groups): + def __init__(self, length, supported_groups: List[int]): + super().__init__(self.ext_type, length) + self.supported_groups = supported_groups + + def to_dict(self): + d = super().to_dict() + d["supported_groups"] = serialize_grease(self.supported_groups) + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionSupportedGroups( + length=d["length"], + supported_groups=unserialize_grease(d["supported_groups"]) + ) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + groups, _ = parse_tls_int_list(data, entry_size=2) + return TLSExtensionSupportedGroups(length, groups) + + +class TLSExtensionECPointFormats(TLSExtensionSignature, + ext_type=TLSExtensionType.ec_point_formats): + def __init__(self, length, ec_point_formats: List[int]): + super().__init__(self.ext_type, length) + self.ec_point_formats = ec_point_formats + + def to_dict(self): + d = super().to_dict() + d["ec_point_formats"] = self.ec_point_formats + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionECPointFormats(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + ec_point_formats, _ = parse_tls_int_list( + data, entry_size=1, header_size=1 + ) + return TLSExtensionECPointFormats(length, ec_point_formats) + + +class TLSExtensionSignatureAlgorithms(TLSExtensionSignature, + ext_type=TLSExtensionType.signature_algorithms): + def __init__(self, length, sig_hash_algs: List[int]): + super().__init__(self.ext_type, length=length) + self.sig_hash_algs = sig_hash_algs + + def to_dict(self): + d = super().to_dict() + d["sig_hash_algs"] = self.sig_hash_algs + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionSignatureAlgorithms(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + sig_hash_algs, _ = parse_tls_int_list(data, entry_size=2) + return TLSExtensionSignatureAlgorithms(length, sig_hash_algs) + + +class TLSExtensionALPN(TLSExtensionSignature, + ext_type=TLSExtensionType.application_layer_protocol_negotiation): + def __init__(self, length, alpn_list: List[str]): + super().__init__(self.ext_type, length=length) + self.alpn_list = alpn_list + + def to_dict(self): + d = super().to_dict() + d["alpn_list"] = self.alpn_list + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionALPN(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + alpn_list, _ = parse_tls_str_list(data) + return TLSExtensionALPN(length, alpn_list) + + +class TLSExtensionPadding(TLSExtensionSignature, + ext_type=TLSExtensionType.padding): + def __init__(self): + # Padding has varying lengths, so don't include in the signature + super().__init__(self.ext_type, length=None) + + @classmethod + def from_dict(cls, d): + return TLSExtensionPadding() + + @classmethod + def from_bytes(cls, length: int, data: bytes): + return TLSExtensionPadding() + + +class TLSExtensionCompressCertificate(TLSExtensionSignature, + ext_type=TLSExtensionType.compress_certificate): + def __init__(self, length, algorithms): + super().__init__(self.ext_type, length=length) + self.algorithms = algorithms + + def to_dict(self): + d = super().to_dict() + d["algorithms"] = self.algorithms + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionCompressCertificate(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + algos, _ = parse_tls_int_list(data, entry_size=2, header_size=1) + return TLSExtensionCompressCertificate(length, algos) + + +class TLSExtensionRecordSizeLimit(TLSExtensionSignature, + ext_type=TLSExtensionType.record_size_limit): + def __init__(self, length, record_size_limit): + super().__init__(self.ext_type, length=length) + self.record_size_limit = record_size_limit + + def to_dict(self): + d = super().to_dict() + d["record_size_limit"] = self.record_size_limit + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionRecordSizeLimit(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + (limit, ) = struct.unpack("!H", data) + return TLSExtensionRecordSizeLimit(length, limit) + + +class TLSExtensionDelegatedCredentials(TLSExtensionSignature, + ext_type=TLSExtensionType.delegated_credentials): + def __init__(self, length, sig_hash_algs): + super().__init__(self.ext_type, length=length) + self.sig_hash_algs = sig_hash_algs + + def to_dict(self): + d = super().to_dict() + d["sig_hash_algs"] = self.sig_hash_algs + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionDelegatedCredentials(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + algs, _ = parse_tls_int_list(data, entry_size=2) + return TLSExtensionDelegatedCredentials(length, algs) + + +class TLSExtensionSupportedVersions(TLSExtensionSignature, + ext_type=TLSExtensionType.supported_versions): + def __init__(self, length, supported_versions: List[TLSVersion]): + super().__init__(self.ext_type, length=length) + self.supported_versions = supported_versions + + def to_dict(self): + d = super().to_dict() + d["supported_versions"] = list(map( + lambda v: v.name, + self.supported_versions + )) + return d + + @classmethod + def from_dict(cls, d): + supported_versions = list(map( + lambda v: TLSVersion[v], + d["supported_versions"] + )) + return TLSExtensionSupportedVersions(d["length"], supported_versions) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + versions, _ = parse_tls_int_list(data, entry_size=2, header_size=1) + versions = list(map(lambda v: TLSVersion(v), versions)) + return TLSExtensionSupportedVersions(length, versions) + + +class TLSExtensionPSKKeyExchangeModes(TLSExtensionSignature, + ext_type=TLSExtensionType.psk_key_exchange_modes): + def __init__(self, length, psk_ke_mode): + super().__init__(self.ext_type, length=length) + self.psk_ke_mode = psk_ke_mode + + def to_dict(self): + d = super().to_dict() + d["psk_ke_mode"] = self.psk_ke_mode + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionPSKKeyExchangeModes(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + (ke_length, ke_mode) = struct.unpack_from("!BB", data, 0) + if ke_length > 1: + # Unsupported + raise Exception("Failed to parse psk_key_exchange_modes extension") + + return TLSExtensionPSKKeyExchangeModes(length, ke_mode) + + +class TLSExtensionKeyshare(TLSExtensionSignature, + ext_type=TLSExtensionType.keyshare): + def __init__(self, length, key_shares): + super().__init__(self.ext_type, length=length) + self.key_shares = key_shares + + def to_dict(self): + d = super().to_dict() + d["key_shares"] = [ + { + "group": "GREASE" if ks["group"] == TLS_GREASE else ks["group"], + "length": ks["length"] + } + for ks in self.key_shares + ] + return d + + @classmethod + def from_dict(cls, d): + key_shares = [ + { + "group": TLS_GREASE if ks["group"] == "GREASE" else ks["group"], + "length": ks["length"] + } + for ks in d["key_shares"] + ] + return TLSExtensionKeyshare(d["length"], d["key_shares"]) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + off = 0 + (key_share_length, ) = struct.unpack_from("!H", data, off) + off += struct.calcsize("!H") + + key_shares = [] + while off < length: + (group, key_ex_length) = struct.unpack_from("!HH", data, off) + key_shares.append({ + "group": TLS_GREASE if group in TLS_GREASE_VALUES else group, + "length": key_ex_length + }) + off += struct.calcsize("!HH") + off += key_ex_length + + return TLSExtensionKeyshare(length, key_shares) + + +class TLSExtensionApplicationSettings(TLSExtensionSignature, + ext_type=TLSExtensionType.application_settings): + def __init__(self, length, alps_alpn_list): + super().__init__(self.ext_type, length=length) + self.alps_alpn_list = alps_alpn_list + + def to_dict(self): + d = super().to_dict() + d["alps_alpn_list"] = self.alps_alpn_list + return d + + @classmethod + def from_dict(cls, d): + return TLSExtensionApplicationSettings(**d) + + @classmethod + def from_bytes(cls, length: int, data: bytes): + alpn, _ = parse_tls_str_list(data) + return TLSExtensionApplicationSettings(length, alpn) + + +class TLSClientHelloSignature(): + """ + Signature of a TLS Client Hello message. + + Combines multiple parameters from a TLS Client Hello message into a + signature that is used to check if two such messages are identical, up to + various random values which may be present. + + Why not use JA3? (https://github.com/salesforce/ja3) + Our signature is more extensive and covers more parameters. For example, it + checks whether a session ID is present, or what values are sent inside + TLS extensions such as ALPN. + """ + + def __init__(self, + record_version: TLSVersion, + handshake_version: TLSVersion, + session_id_length: int, + ciphersuites: List[int], + comp_methods: List[int], + extensions: List[TLSExtensionSignature]): + """ + Initialize a new TLSClientHelloSignature. + + Signatures can be compared with one another to check if they are equal. + + Parameters + ---------- + record_version : TLSVersion + Represents the "tls.record.version" field of the Client Hello. + handshake_version : TLSVersion + Represents the "tls.handshake.type" field. + session_id_length : int + Represents the "tls.handshake.session_id_length" field. + ciphersuites : list[int] + Represents the "tls.handshake.ciphersuites" list of ciphersuites. + comp_methods : list[int] + Represents the "tls.handshake.comp_methods" list of compression + methods. + extensions : list[TLSExtensionSignature] + Represents the list of TLS extensions in the Client Hello. + """ + self.record_version = record_version + self.handshake_version = handshake_version + self.session_id_length = session_id_length + self.ciphersuites = ciphersuites + self.comp_methods = comp_methods + self.extensions = extensions + + @property + def extension_list(self): + return list(map(lambda ext: ext.ext_type, self.extensions)) + + def _compare_extensions(self, other: 'TLSClientHelloSignature'): + """Compare the TLS extensions of two Client Hello messages.""" + # Check that the extension lists are identical in content. + if set(self.extension_list) != set(other.extension_list): + symdiff = list(set(self.extension_list).symmetric_difference( + other.extension_list + )) + return False, (f"TLS extension lists differ: " + f"Symmatric difference {symdiff}") + + if self.extension_list != other.extension_list: + return False, "TLS extension lists identical but differ in order" + + # Check the extensions' parameters. + for i, ext in enumerate(self.extensions): + if not ext.equals(other.extensions[i]): + ours = ext.to_dict() + ours.pop("type") + theirs = other.extensions[i].to_dict() + theirs.pop("type") + msg = (f"TLS extension {ext.ext_type.name} is different. " + f"{ours} != {theirs}") + return False, msg + + return True, None + + def _equals(self, other: 'TLSClientHelloSignature', reason: bool = False): + """Check if another TLSClientHelloSignature is identical.""" + if self.record_version != other.record_version: + msg = (f"TLS record versions differ: " + f"{self.record_version} != {other.record_version}") + return False, msg + + if self.handshake_version != other.handshake_version: + msg = (f"TLS handshake versions differ: " + f"{self.handshake_version} != " + f"{other.handshake_version}") + return False, msg + + if self.session_id_length != other.session_id_length: + msg = (f"TLS session ID lengths differ: " + f"{self.session_id_length} != {other.session_id_length}") + return False, msg + + if self.ciphersuites != other.ciphersuites: + msg = f"TLS ciphersuites differ in contents or order. " + return False, msg + + if self.comp_methods != other.comp_methods: + msg = f"TLS compression methods differ in contents or order. " + return False, msg + + return self._compare_extensions(other) + + def equals(self, other: 'TLSClientHelloSignature', reason: bool = False): + """Checks whether two Client Hello messages have the same signature. + + Parameters + ---------- + other : TLSClientHelloSignature + The signature of the other Client Hello message. + reason : bool + If True, returns an additional string describing the reason of the + difference in case of a difference, and None otherwise. + """ + equal, msg = self._equals(other) + if reason: + return equal, msg + else: + return equal + + def to_dict(self): + """Serialize to a dict object.""" + return { + "record_version": self.record_version.name, + "handshake_version": self.handshake_version.name, + "session_id_length": self.session_id_length, + "ciphersuites": serialize_grease(self.ciphersuites), + "comp_methods": self.comp_methods, + "extensions": list(map(lambda ext: ext.to_dict(), self.extensions)) + } + + @classmethod + def from_dict(cls, d): + """Unserialize a TLSClientHelloSignature from a dict. + + Parameters + ---------- + d : dict + Client Hello signature encoded to a Python dict. + + Returns + ------- + sig : TLSClientHelloSignature + Signature constructed based on the dict representation. + """ + return TLSClientHelloSignature( + record_version=TLSVersion[d["record_version"]], + handshake_version=TLSVersion[d["handshake_version"]], + session_id_length=d["session_id_length"], + ciphersuites=unserialize_grease(d["ciphersuites"]), + comp_methods=d["comp_methods"], + extensions=list(map( + lambda ext: TLSExtensionSignature.from_dict(ext), + d["extensions"] + )) + ) + + @classmethod + def from_bytes(cls, record: bytes): + """Build a TLSClientHelloSignature from a Client Hello TLS record. + + Parameters + ---------- + record : bytes + Raw over-the-wire content of the Client Hello TLS record. + + Returns + ------- + sig : TLSClientHelloSignature + Signature of the TLS record. + """ + off = 0 + record_header = TLSRecordHeader._make(struct.unpack_from( + TLS_RECORD_HEADER, record, off + )) + off += struct.calcsize(TLS_RECORD_HEADER) + + if record_header.type != 0x16: + raise Exception( + f"TLS record not of type Handshake (0x16). " + f"Got 0x{record_header.type:02x}" + ) + + if not TLSVersion.has_value(record_header.version): + raise Exception( + f"Unknown TLS version 0x{record_header.version:04x}" + ) + + if len(record) - off != record_header.length: + raise Exception("Corrupt record length") + + handshake_header = TLSHandshakeHeader._make(struct.unpack_from( + TLS_HANDSHAKE_HEADER, record, off + )) + + if handshake_header.type != 0x01: + raise Exception( + f"TLS handshake not of type Client Hello (0x01). " + f"Got 0x{handshake_header.type:02x}" + ) + + if (len(record) - off - 4 != + (handshake_header.length_high << 16) + handshake_header.length_low): + raise Exception("Corrupt handshake length") + + off += struct.calcsize(TLS_HANDSHAKE_HEADER) + + if not TLSVersion.has_value(handshake_header.version): + raise Exception( + f"Unknown TLS version 0x{handshake_header.version:04x}" + ) + + off += handshake_header.session_id_length + + ciphersuites, s = parse_tls_int_list(record[off:], entry_size=2) + off += s + + comp_methods, s = parse_tls_int_list( + record[off:], entry_size=1, header_size=1, replace_grease=False + ) + off += s + + (extensions_length, ) = struct.unpack_from("!H", record, off) + off += struct.calcsize("!H") + + if len(record) - off != extensions_length: + raise Exception(f"Corrupt TLS extensions length") + + extensions = [] + while off < len(record): + (ext_type, ext_len) = struct.unpack_from( + TLS_EXTENSION_HEADER, record, off + ) + ext_total_len = ext_len + struct.calcsize(TLS_EXTENSION_HEADER) + extensions.append(TLSExtensionSignature.from_bytes( + record[off:off + ext_total_len] + )) + off += ext_total_len + + return TLSClientHelloSignature( + record_version=TLSVersion(record_header.version), + handshake_version=TLSVersion(handshake_header.version), + session_id_length=handshake_header.session_id_length, + ciphersuites=ciphersuites, + comp_methods=comp_methods, + extensions=extensions + ) + + +class BrowserSignature(): + """ + Represents the network signature of a specific browser based on multiple + network parameters. + + Currently includes only the signature of the Client Hello message, but + designed to include other parameters (HTTP headers, HTTP2 settings, etc.) + """ + + def __init__(self, tls_client_hello: TLSClientHelloSignature): + self.tls_client_hello = tls_client_hello + + def equals(self, other: 'BrowserSignature', reason=False): + """Checks whether two browsers have the same network signatures. + + Parameters + ---------- + other : BrowserSignature + The other browser's network signature + reason : bool + If True, returns an additional string describing the reason of the + difference in case of a difference, and None otherwise. + """ + return self.tls_client_hello.equals(other.tls_client_hello, reason) + + def to_dict(self): + """Serialize to a dict object.""" + return { + "tls_client_hello": self.tls_client_hello.to_dict() + } + + @classmethod + def from_dict(cls, d): + """Unserialize a BrowserSignature from a dict.""" + tls_client_hello = None + if d.get("tls_client_hello"): + tls_client_hello=TLSClientHelloSignature.from_dict( + d["tls_client_hello"] + ) + + return BrowserSignature(tls_client_hello=tls_client_hello) diff --git a/tests/signatures.yaml b/tests/signatures.yaml new file mode 100644 index 0000000..20aaa68 --- /dev/null +++ b/tests/signatures.yaml @@ -0,0 +1,225 @@ +# Browser signatures database +# +# Each signature refers to the browser's behavior upon browsing to a site +# not cached or visited before. Each signature contains the various parameters +# in the TLS Client Hello message, and is designed to accomodate other +# parameters as well (such as HTTP headers, HTTP/2 settings). +--- +name: chrome_98.0.4758.102_win10 +browser: + name: chrome + version: 98.0.4758.102 + os: win10 + mode: regular +signature: + tls_client_hello: + record_version: 'TLS_VERSION_1_0' + handshake_version: 'TLS_VERSION_1_2' + session_id_length: 32 + ciphersuites: [ + 'GREASE', + 0x1301, 0x1302, 0x1303, 0xc02b, 0xc02f, 0xc02c, 0xc030, + 0xcca9, 0xcca8, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, + 0x0035 + ] + comp_methods: [0x00] + extensions: + - type: GREASE + length: 0 + - type: server_name + - type: extended_master_secret + length: 0 + - type: renegotiation_info + length: 1 + - type: supported_groups + length: 10 + supported_groups: [ + 'GREASE', + 0x001d, 0x0017, 0x0018 + ] + - type: ec_point_formats + length: 2 + ec_point_formats: [0] + - type: session_ticket + length: 0 + - type: application_layer_protocol_negotiation + length: 14 + alpn_list: ['h2', 'http/1.1'] + - type: status_request + length: 5 + status_request_type: 0x01 + - type: signature_algorithms + length: 18 + sig_hash_algs: [ + 0x0403, 0x0804, 0x0401, 0x0503, + 0x0805, 0x0501, 0x0806, 0x0601 + ] + - type: signed_certificate_timestamp + length: 0 + - type: keyshare + length: 43 + key_shares: + - group: GREASE + length: 1 + - group: 29 + length: 32 + - type: psk_key_exchange_modes + length: 2 + psk_ke_mode: 1 + - type: supported_versions + length: 7 + supported_versions: [ + 'GREASE', 'TLS_VERSION_1_3', 'TLS_VERSION_1_2' + ] + - type: compress_certificate + length: 3 + algorithms: [0x02] + - type: application_settings + length: 5 + alps_alpn_list: ['h2'] + - type: GREASE + length: 1 + data: !!binary AA== + - type: padding +--- +name: firefox_91.6.0esr_win10 +browser: + name: firefox + version: 91.6.0esr + os: win10 + mode: regular +signature: + tls_client_hello: + record_version: 'TLS_VERSION_1_0' + handshake_version: 'TLS_VERSION_1_2' + session_id_length: 32 + ciphersuites: [ + 0x1301, 0x1303, 0x1302, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc02c, + 0xc030, 0xc00a, 0xc009, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, + 0x0035, 0x000a + ] + comp_methods: [0x00] + extensions: + - type: server_name + - type: extended_master_secret + length: 0 + - type: renegotiation_info + length: 1 + - type: supported_groups + length: 14 + supported_groups: [ + 0x1d, 0x017, 0x18, 0x19, 0x0100, 0x0101 + ] + - type: ec_point_formats + length: 2 + ec_point_formats: [0] + - type: session_ticket + length: 0 + - type: application_layer_protocol_negotiation + length: 14 + alpn_list: ['h2', 'http/1.1'] + - type: status_request + length: 5 + status_request_type: 0x01 + - type: delegated_credentials + length: 10 + sig_hash_algs: [ + 0x0403, 0x0503, 0x0603, 0x0203 + ] + - type: keyshare + length: 107 + key_shares: + - group: 29 + length: 32 + - group: 23 + length: 65 + - type: supported_versions + length: 5 + supported_versions: [ + 'TLS_VERSION_1_3', 'TLS_VERSION_1_2' + ] + - type: signature_algorithms + length: 24 + sig_hash_algs: [ + 0x0403, 0x0503, 0x0603, 0x0804, + 0x0805, 0x0806, 0x0401, 0x0501, + 0x0601, 0x0203, 0x0201 + ] + - type: psk_key_exchange_modes + length: 2 + psk_ke_mode: 1 + - type: record_size_limit + length: 2 + record_size_limit: 16385 + - type: padding +--- +name: firefox_95.0.2_win10 +browser: + name: firefox + version: 95.0.2 + os: win10 + mode: regular +signature: + tls_client_hello: + record_version: 'TLS_VERSION_1_0' + handshake_version: 'TLS_VERSION_1_2' + session_id_length: 32 + ciphersuites: [ + 0x1301, 0x1303, 0x1302, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc02c, + 0xc030, 0xc00a, 0xc009, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, + 0x0035 + ] + comp_methods: [0x00] + extensions: + - type: server_name + - type: extended_master_secret + length: 0 + - type: renegotiation_info + length: 1 + - type: supported_groups + length: 14 + supported_groups: [ + 0x1d, 0x017, 0x18, 0x19, 0x0100, 0x0101 + ] + - type: ec_point_formats + length: 2 + ec_point_formats: [0] + - type: session_ticket + length: 0 + - type: application_layer_protocol_negotiation + length: 14 + alpn_list: ['h2', 'http/1.1'] + - type: status_request + length: 5 + status_request_type: 0x01 + - type: delegated_credentials + length: 10 + sig_hash_algs: [ + 0x0403, 0x0503, 0x0603, 0x0203 + ] + - type: keyshare + length: 107 + key_shares: + - group: 29 + length: 32 + - group: 23 + length: 65 + - type: supported_versions + length: 5 + supported_versions: [ + 'TLS_VERSION_1_3', 'TLS_VERSION_1_2' + ] + - type: signature_algorithms + length: 24 + sig_hash_algs: [ + 0x0403, 0x0503, 0x0603, 0x0804, + 0x0805, 0x0806, 0x0401, 0x0501, + 0x0601, 0x0203, 0x0201 + ] + - type: psk_key_exchange_modes + length: 2 + psk_ke_mode: 1 + - type: record_size_limit + length: 2 + record_size_limit: 16385 + - type: padding diff --git a/tests/test_impersonate.py b/tests/test_impersonate.py new file mode 100644 index 0000000..3f2b108 --- /dev/null +++ b/tests/test_impersonate.py @@ -0,0 +1,240 @@ +import io +import logging +import subprocess + +import yaml +import dpkt +import pytest + +from signature import BrowserSignature, TLSClientHelloSignature + + +@pytest.fixture +def browser_signatures(): + with open("signatures.yaml", "r") as f: + # Parse signatures.yaml database. + return { + doc["name"]: doc + for doc in yaml.safe_load_all(f.read()) + if doc + } + + +class TestSignatureModule: + """Test the signature.py module. + + signature.py is responsible for decoding signatures from the YAML format, + parsing raw TLS packets, and comparing signatures. + """ + + # Client Hello record sent by Chrome 98. + CLIENT_HELLO = ( + b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03\x06\x84\xbd\x63\xac" + b"\xa4\x0a\x5b\xbe\x79\x7d\x14\x48\xcc\x1f\xf8\x62\x8c\x7d\xf4\xc7" + b"\xfe\x04\xe3\x30\xb7\x56\xec\x87\x40\xf2\x63\x20\x92\x9d\x01\xc8" + b"\x82\x3c\x92\xe1\x8a\x75\x4e\xaa\x6b\xf1\x31\xd2\xb7\x4d\x18\xc6" + b"\xda\x3d\x31\xa6\x35\xb2\x08\xbc\x5b\x82\x2f\x97\x00\x20\x9a\x9a" + b"\x13\x01\x13\x02\x13\x03\xc0\x2b\xc0\x2f\xc0\x2c\xc0\x30\xcc\xa9" + b"\xcc\xa8\xc0\x13\xc0\x14\x00\x9c\x00\x9d\x00\x2f\x00\x35\x01\x00" + b"\x01\x93\xca\xca\x00\x00\x00\x00\x00\x16\x00\x14\x00\x00\x11\x77" + b"\x77\x77\x2e\x77\x69\x6b\x69\x70\x65\x64\x69\x61\x2e\x6f\x72\x67" + b"\x00\x17\x00\x00\xff\x01\x00\x01\x00\x00\x0a\x00\x0a\x00\x08\xaa" + b"\xaa\x00\x1d\x00\x17\x00\x18\x00\x0b\x00\x02\x01\x00\x00\x23\x00" + b"\x00\x00\x10\x00\x0e\x00\x0c\x02\x68\x32\x08\x68\x74\x74\x70\x2f" + b"\x31\x2e\x31\x00\x05\x00\x05\x01\x00\x00\x00\x00\x00\x0d\x00\x12" + b"\x00\x10\x04\x03\x08\x04\x04\x01\x05\x03\x08\x05\x05\x01\x08\x06" + b"\x06\x01\x00\x12\x00\x00\x00\x33\x00\x2b\x00\x29\xaa\xaa\x00\x01" + b"\x00\x00\x1d\x00\x20\xfc\x58\xaa\x8b\xd6\x2d\x65\x9c\x58\xa2\xc9" + b"\x0c\x5a\x6f\x69\xa5\xef\xc0\x05\xb3\xd1\xb4\x01\x9d\x61\x84\x00" + b"\x42\x74\xc7\xa9\x43\x00\x2d\x00\x02\x01\x01\x00\x2b\x00\x07\x06" + b"\xaa\xaa\x03\x04\x03\x03\x00\x1b\x00\x03\x02\x00\x02\x44\x69\x00" + b"\x05\x00\x03\x02\x68\x32\xfa\xfa\x00\x01\x00\x00\x15\x00\xc6\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00" + ) + + def test_serialization(self, browser_signatures): + """ + Test that deserializing and then serializing the YAML signatures + produces idential results. + """ + for browser_name, data in browser_signatures.items(): + sig = data["signature"] + + # Unserialize and serialize back. + sig2 = BrowserSignature.from_dict(sig).to_dict() + + # Go extension by extension and check equality. + # It could be done with a single comparison, but this way the error + # will be more indicative. + for i, ext in enumerate(sig["tls_client_hello"]["extensions"]): + assert ext == sig2["tls_client_hello"]["extensions"][i], \ + (f"Inconsistent serialization in signature " + f"{browser_name}: Serialized extension " + f"{ext['type']} differs.") + + assert sig == sig2, \ + (f"Inconsistent serialization in signature " + f"{browser_name}") + + def test_tls_client_hello_parsing(self, browser_signatures): + """ + Test the TLS Client Hello parsing code. + """ + sig = BrowserSignature( + tls_client_hello=TLSClientHelloSignature.from_bytes( + self.CLIENT_HELLO + ) + ) + + sig2 = BrowserSignature.from_dict( + browser_signatures["chrome_98.0.4758.102_win10"]["signature"] + ) + + equals, reason = sig.equals(sig2, reason=True) + assert equals == True, reason + + +class TestImpersonation: + """ + Test that the network signature of curl-impersonate is identical to that of + a real browser, by comparing with known signatures + """ + + TCPDUMP_CAPTURE_INTERFACE = "eth0" + + # When running curl use a specific range of local ports. + # This ensures we will capture the correct traffic in tcpdump. + LOCAL_PORTS = (50000, 50100) + + TEST_URL = "https://www.wikipedia.org" + + @pytest.fixture + def tcpdump(self): + """Initialize a sniffer to capture curl's traffic.""" + logging.debug( + f"Running tcpdump on interface {self.TCPDUMP_CAPTURE_INTERFACE}" + ) + + p = subprocess.Popen([ + "tcpdump", "-n", + "-i", self.TCPDUMP_CAPTURE_INTERFACE, + "-s", "0", + "-w", "-", + "-U", # Important, makes tcpdump unbuffered + (f"(tcp src portrange {self.LOCAL_PORTS[0]}-{self.LOCAL_PORTS[1]}" + f" and tcp dst port 443) or" + f"(tcp dst portrange {self.LOCAL_PORTS[0]}-{self.LOCAL_PORTS[1]}" + f" and tcp src port 443)") + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + yield p + + p.terminate() + p.wait(timeout=10) + + def _extract_client_hello(self, pcap: bytes) -> bytes: + """Find and return the Client Hello TLS record from a pcap. + + If there are multiple, returns the first. + If there are none, returns None. + """ + for ts, buf in dpkt.pcap.Reader(io.BytesIO(pcap)): + eth = dpkt.ethernet.Ethernet(buf) + if not isinstance(eth.data, dpkt.ip.IP): + continue + ip = eth.data + if not isinstance(ip.data, dpkt.tcp.TCP): + continue + tcp = ip.data + if tcp.dport != 443 or not tcp.data: + continue + # We hope that the record is in a single TCP packet + # and wasn't split across multiple packets. This is usually the case. + tls = dpkt.ssl.TLSRecord(tcp.data) + # Check if it's a Handshake record + if tls.type != 0x16: + continue + handshake = dpkt.ssl.TLSHandshake(tls.data) + # Check if it's a Client Hello + if handshake.type != 0x01: + continue + # Return the whole TLS record + return tcp.data + + return None + + @pytest.mark.parametrize( + "curl_binary, expected_signature", + [ + ("chrome/curl_chrome98", "chrome_98.0.4758.102_win10"), + ("firefox/curl_ff91esr", "firefox_91.6.0esr_win10"), + ("firefox/curl_ff95", "firefox_95.0.2_win10") + ] + ) + def test_impersonation(self, + tcpdump, + curl_binary, + browser_signatures, + expected_signature): + """ + Check that curl's network signature is identical to that of a + real browser. + + Launches curl while sniffing its TLS traffic with tcpdump. Then + extract the Client Hello packet from the capture and compares its + signature with the expected one defined in the YAML database. + """ + logging.debug(f"Launching '{curl_binary}' to {self.TEST_URL}") + curl = subprocess.Popen([ + curl_binary, + "-o", "/dev/null", + "--local-port", f"{self.LOCAL_PORTS[0]}-{self.LOCAL_PORTS[1]}", + self.TEST_URL + ]) + + ret = curl.wait(timeout=10) + assert ret == 0 + + try: + pcap, stderr = tcpdump.communicate(timeout=5) + + # If tcpdump finished running before timeout, it's likely it failed + # with an error. + assert tcpdump.returncode == 0, \ + (f"tcpdump failed with error code {tcpdump.returncode}, " + f"stderr: {stderr}") + except subprocess.TimeoutExpired: + tcpdump.kill() + pcap, stderr = tcpdump.communicate(timeout=3) + + assert len(pcap) > 0 + logging.debug(f"Captured pcap of length {len(pcap)} bytes") + + client_hello = self._extract_client_hello(pcap) + assert client_hello is not None + + logging.debug(f"Found Client Hello, " + f"comparing to signature '{expected_signature}'") + + sig = BrowserSignature( + tls_client_hello=TLSClientHelloSignature.from_bytes(client_hello) + ) + + expected_sig = BrowserSignature.from_dict( + browser_signatures[expected_signature]["signature"] + ) + + equals, reason = sig.equals(expected_sig, reason=True) + assert equals, reason