TUN-2307: Capnp is the only serialization format used in tunnelpogs

This commit is contained in:
Chung-Ting Huang
2019-09-16 18:25:00 -05:00
committed by Chung Ting Huang
parent ff795a7beb
commit fe032843f3
9 changed files with 397 additions and 877 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net"
"net/http"
@@ -29,11 +28,43 @@ import (
// ClientConfig is a collection of FallibleConfig that determines how cloudflared should function
type ClientConfig struct {
Version Version `json:"version"`
SupervisorConfig *SupervisorConfig `json:"supervisor_config"`
EdgeConnectionConfig *EdgeConnectionConfig `json:"edge_connection_config"`
DoHProxyConfigs []*DoHProxyConfig `json:"doh_proxy_configs" capnp:"dohProxyConfigs"`
ReverseProxyConfigs []*ReverseProxyConfig `json:"reverse_proxy_configs"`
Version Version
SupervisorConfig *SupervisorConfig
EdgeConnectionConfig *EdgeConnectionConfig
DoHProxyConfigs []*DoHProxyConfig `capnp:"dohProxyConfigs"`
ReverseProxyConfigs []*ReverseProxyConfig
}
func (c *ClientConfig) MarshalBytes() ([]byte, error) {
msg, firstSeg, err := capnp.NewMessage(capnp.SingleSegment(nil))
if err != nil {
return nil, err
}
capnpEntity, err := tunnelrpc.NewRootClientConfig(firstSeg)
if err != nil {
return nil, err
}
err = MarshalClientConfig(capnpEntity, c)
if err != nil {
return nil, err
}
return msg.Marshal()
}
func UnmarshalClientConfigFromBytes(clientConfigBytes []byte) (*ClientConfig, error) {
msg, err := capnp.Unmarshal(clientConfigBytes)
if err != nil {
return nil, err
}
capnpClientConfig, err := tunnelrpc.ReadRootClientConfig(msg)
if err != nil {
return nil, err
}
pogsClientConfig, err := UnmarshalClientConfig(capnpClientConfig)
if err != nil {
return nil, err
}
return pogsClientConfig, nil
}
// Version type models the version of a ClientConfig
@@ -52,16 +83,17 @@ func (v Version) String() string {
}
// FallibleConfig is an interface implemented by configs that cloudflared might not be able to apply
//go-sumtype:decl FallibleConfig
type FallibleConfig interface {
FailReason(err error) string
jsonType() string
isFallibleConfig()
}
// SupervisorConfig specifies config of components managed by Supervisor other than ConnectionManager
type SupervisorConfig struct {
AutoUpdateFrequency time.Duration `json:"auto_update_frequency"`
MetricsUpdateFrequency time.Duration `json:"metrics_update_frequency"`
GracePeriod time.Duration `json:"grace_period"`
AutoUpdateFrequency time.Duration
MetricsUpdateFrequency time.Duration
GracePeriod time.Duration
}
// FailReason impelents FallibleConfig interface for SupervisorConfig
@@ -69,23 +101,15 @@ func (sc *SupervisorConfig) FailReason(err error) string {
return fmt.Sprintf("Cannot apply SupervisorConfig, err: %v", err)
}
func (sc *SupervisorConfig) MarshalJSON() ([]byte, error) {
marshaler := make(map[string]SupervisorConfig, 1)
marshaler[sc.jsonType()] = *sc
return json.Marshal(marshaler)
}
func (sc *SupervisorConfig) jsonType() string {
return "supervisor_config"
}
func (_ *SupervisorConfig) isFallibleConfig() {}
// EdgeConnectionConfig specifies what parameters and how may connections should ConnectionManager establish with edge
type EdgeConnectionConfig struct {
NumHAConnections uint8 `json:"num_ha_connections"`
HeartbeatInterval time.Duration `json:"heartbeat_interval"`
Timeout time.Duration `json:"timeout"`
MaxFailedHeartbeats uint64 `json:"max_failed_heartbeats"`
UserCredentialPath string `json:"user_credential_path"`
NumHAConnections uint8
HeartbeatInterval time.Duration
Timeout time.Duration
MaxFailedHeartbeats uint64
UserCredentialPath string
}
// FailReason impelents FallibleConfig interface for EdgeConnectionConfig
@@ -93,21 +117,13 @@ func (cmc *EdgeConnectionConfig) FailReason(err error) string {
return fmt.Sprintf("Cannot apply EdgeConnectionConfig, err: %v", err)
}
func (cmc *EdgeConnectionConfig) MarshalJSON() ([]byte, error) {
marshaler := make(map[string]EdgeConnectionConfig, 1)
marshaler[cmc.jsonType()] = *cmc
return json.Marshal(marshaler)
}
func (cmc *EdgeConnectionConfig) jsonType() string {
return "edge_connection_config"
}
func (_ *EdgeConnectionConfig) isFallibleConfig() {}
// DoHProxyConfig is configuration for DNS over HTTPS service
type DoHProxyConfig struct {
ListenHost string `json:"listen_host"`
ListenPort uint16 `json:"listen_port"`
Upstreams []string `json:"upstreams"`
ListenHost string
ListenPort uint16
Upstreams []string
}
// FailReason impelents FallibleConfig interface for DoHProxyConfig
@@ -115,23 +131,15 @@ func (dpc *DoHProxyConfig) FailReason(err error) string {
return fmt.Sprintf("Cannot apply DoHProxyConfig, err: %v", err)
}
func (dpc *DoHProxyConfig) MarshalJSON() ([]byte, error) {
marshaler := make(map[string]DoHProxyConfig, 1)
marshaler[dpc.jsonType()] = *dpc
return json.Marshal(marshaler)
}
func (dpc *DoHProxyConfig) jsonType() string {
return "doh_proxy_config"
}
func (_ *DoHProxyConfig) isFallibleConfig() {}
// ReverseProxyConfig how and for what hostnames can this cloudflared proxy
type ReverseProxyConfig struct {
TunnelHostname h2mux.TunnelHostname `json:"tunnel_hostname"`
OriginConfigJSONHandler *OriginConfigJSONHandler `json:"origin_config"`
Retries uint64 `json:"retries"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
CompressionQuality uint64 `json:"compression_quality"`
TunnelHostname h2mux.TunnelHostname
OriginConfig OriginConfig
Retries uint64
ConnectionTimeout time.Duration
CompressionQuality uint64
}
func NewReverseProxyConfig(
@@ -145,11 +153,11 @@ func NewReverseProxyConfig(
return nil, fmt.Errorf("NewReverseProxyConfig: originConfigUnmarshaler was null")
}
return &ReverseProxyConfig{
TunnelHostname: h2mux.TunnelHostname(tunnelHostname),
OriginConfigJSONHandler: &OriginConfigJSONHandler{originConfig},
Retries: retries,
ConnectionTimeout: connectionTimeout,
CompressionQuality: compressionQuality,
TunnelHostname: h2mux.TunnelHostname(tunnelHostname),
OriginConfig: originConfig,
Retries: retries,
ConnectionTimeout: connectionTimeout,
CompressionQuality: compressionQuality,
}, nil
}
@@ -158,58 +166,29 @@ func (rpc *ReverseProxyConfig) FailReason(err error) string {
return fmt.Sprintf("Cannot apply ReverseProxyConfig, err: %v", err)
}
func (rpc *ReverseProxyConfig) MarshalJSON() ([]byte, error) {
marshaler := make(map[string]ReverseProxyConfig, 1)
marshaler[rpc.jsonType()] = *rpc
return json.Marshal(marshaler)
}
func (rpc *ReverseProxyConfig) jsonType() string {
return "reverse_proxy_config"
}
func (_ *ReverseProxyConfig) isFallibleConfig() {}
//go-sumtype:decl OriginConfig
type OriginConfig interface {
// Service returns a OriginService used to proxy to the origin
Service() (originservice.OriginService, error)
// go-sumtype requires at least one unexported method, otherwise it will complain that interface is not sealed
jsonType() string
}
type originType int
const (
httpType originType = iota
wsType
helloWorldType
)
func (ot originType) String() string {
switch ot {
case httpType:
return "Http"
case wsType:
return "WebSocket"
case helloWorldType:
return "HelloWorld"
default:
return "unknown"
}
isOriginConfig()
}
type HTTPOriginConfig struct {
URLString string `capnp:"urlString" json:"url_string" mapstructure:"url_string"`
TCPKeepAlive time.Duration `capnp:"tcpKeepAlive" json:"tcp_keep_alive" mapstructure:"tcp_keep_alive"`
DialDualStack bool `json:"dial_dual_stack" mapstructure:"dial_dual_stack"`
TLSHandshakeTimeout time.Duration `capnp:"tlsHandshakeTimeout" json:"tls_handshake_timeout" mapstructure:"tls_handshake_timeout"`
TLSVerify bool `capnp:"tlsVerify" json:"tls_verify" mapstructure:"tls_verify"`
OriginCAPool string `json:"origin_ca_pool" mapstructure:"origin_ca_pool"`
OriginServerName string `json:"origin_server_name" mapstructure:"origin_server_name"`
MaxIdleConnections uint64 `json:"max_idle_connections" mapstructure:"max_idle_connections"`
IdleConnectionTimeout time.Duration `json:"idle_connection_timeout" mapstructure:"idle_connection_timeout"`
ProxyConnectionTimeout time.Duration `json:"proxy_connection_timeout" mapstructure:"proxy_connection_timeout"`
ExpectContinueTimeout time.Duration `json:"expect_continue_timeout" mapstructure:"expect_continue_timeout"`
ChunkedEncoding bool `json:"chunked_encoding" mapstructure:"chunked_encoding"`
URLString string `capnp:"urlString"`
TCPKeepAlive time.Duration `capnp:"tcpKeepAlive"`
DialDualStack bool
TLSHandshakeTimeout time.Duration `capnp:"tlsHandshakeTimeout"`
TLSVerify bool `capnp:"tlsVerify"`
OriginCAPool string
OriginServerName string
MaxIdleConnections uint64
IdleConnectionTimeout time.Duration
ProxyConnectionTimeout time.Duration
ExpectContinueTimeout time.Duration
ChunkedEncoding bool
}
func (hc *HTTPOriginConfig) Service() (originservice.OriginService, error) {
@@ -248,15 +227,13 @@ func (hc *HTTPOriginConfig) Service() (originservice.OriginService, error) {
return originservice.NewHTTPService(transport, url, hc.ChunkedEncoding), nil
}
func (_ *HTTPOriginConfig) jsonType() string {
return httpType.String()
}
func (*HTTPOriginConfig) isOriginConfig() {}
type WebSocketOriginConfig struct {
URLString string `capnp:"urlString" json:"url_string" mapstructure:"url_string"`
TLSVerify bool `capnp:"tlsVerify" json:"tls_verify" mapstructure:"tls_verify"`
OriginCAPool string `json:"origin_ca_pool" mapstructure:"origin_ca_pool"`
OriginServerName string `json:"origin_server_name" mapstructure:"origin_server_name"`
URLString string `capnp:"urlString"`
TLSVerify bool `capnp:"tlsVerify"`
OriginCAPool string
OriginServerName string
}
func (wsc *WebSocketOriginConfig) Service() (originservice.OriginService, error) {
@@ -277,13 +254,11 @@ func (wsc *WebSocketOriginConfig) Service() (originservice.OriginService, error)
return originservice.NewWebSocketService(tlsConfig, url)
}
func (_ *WebSocketOriginConfig) jsonType() string {
return wsType.String()
}
func (*WebSocketOriginConfig) isOriginConfig() {}
type HelloWorldOriginConfig struct{}
func (_ *HelloWorldOriginConfig) Service() (originservice.OriginService, error) {
func (*HelloWorldOriginConfig) Service() (originservice.OriginService, error) {
helloCert, err := tlsconfig.GetHelloCertificateX509()
if err != nil {
return nil, errors.Wrap(err, "Cannot get Hello World server certificate")
@@ -308,9 +283,7 @@ func (_ *HelloWorldOriginConfig) Service() (originservice.OriginService, error)
return originservice.NewHelloWorldService(transport)
}
func (_ *HelloWorldOriginConfig) jsonType() string {
return helloWorldType.String()
}
func (*HelloWorldOriginConfig) isOriginConfig() {}
/*
* Boilerplate to convert between these structs and the primitive structs
@@ -519,9 +492,9 @@ func UnmarshalDoHProxyConfig(s tunnelrpc.DoHProxyConfig) (*DoHProxyConfig, error
func MarshalReverseProxyConfig(s tunnelrpc.ReverseProxyConfig, p *ReverseProxyConfig) error {
s.SetTunnelHostname(p.TunnelHostname.String())
switch config := p.OriginConfigJSONHandler.OriginConfig.(type) {
switch config := p.OriginConfig.(type) {
case *HTTPOriginConfig:
ss, err := s.Origin().NewHttp()
ss, err := s.OriginConfig().NewHttp()
if err != nil {
return err
}
@@ -529,7 +502,7 @@ func MarshalReverseProxyConfig(s tunnelrpc.ReverseProxyConfig, p *ReverseProxyCo
return err
}
case *WebSocketOriginConfig:
ss, err := s.Origin().NewWebsocket()
ss, err := s.OriginConfig().NewWebsocket()
if err != nil {
return err
}
@@ -537,7 +510,7 @@ func MarshalReverseProxyConfig(s tunnelrpc.ReverseProxyConfig, p *ReverseProxyCo
return err
}
case *HelloWorldOriginConfig:
ss, err := s.Origin().NewHelloWorld()
ss, err := s.OriginConfig().NewHelloWorld()
if err != nil {
return err
}
@@ -560,9 +533,9 @@ func UnmarshalReverseProxyConfig(s tunnelrpc.ReverseProxyConfig) (*ReverseProxyC
return nil, err
}
p.TunnelHostname = h2mux.TunnelHostname(tunnelHostname)
switch s.Origin().Which() {
case tunnelrpc.ReverseProxyConfig_origin_Which_http:
ss, err := s.Origin().Http()
switch s.OriginConfig().Which() {
case tunnelrpc.ReverseProxyConfig_originConfig_Which_http:
ss, err := s.OriginConfig().Http()
if err != nil {
return nil, err
}
@@ -570,9 +543,9 @@ func UnmarshalReverseProxyConfig(s tunnelrpc.ReverseProxyConfig) (*ReverseProxyC
if err != nil {
return nil, err
}
p.OriginConfigJSONHandler = &OriginConfigJSONHandler{config}
case tunnelrpc.ReverseProxyConfig_origin_Which_websocket:
ss, err := s.Origin().Websocket()
p.OriginConfig = config
case tunnelrpc.ReverseProxyConfig_originConfig_Which_websocket:
ss, err := s.OriginConfig().Websocket()
if err != nil {
return nil, err
}
@@ -580,9 +553,9 @@ func UnmarshalReverseProxyConfig(s tunnelrpc.ReverseProxyConfig) (*ReverseProxyC
if err != nil {
return nil, err
}
p.OriginConfigJSONHandler = &OriginConfigJSONHandler{config}
case tunnelrpc.ReverseProxyConfig_origin_Which_helloWorld:
ss, err := s.Origin().HelloWorld()
p.OriginConfig = config
case tunnelrpc.ReverseProxyConfig_originConfig_Which_helloWorld:
ss, err := s.OriginConfig().HelloWorld()
if err != nil {
return nil, err
}
@@ -590,7 +563,7 @@ func UnmarshalReverseProxyConfig(s tunnelrpc.ReverseProxyConfig) (*ReverseProxyC
if err != nil {
return nil, err
}
p.OriginConfigJSONHandler = &OriginConfigJSONHandler{config}
p.OriginConfig = config
}
p.Retries = s.Retries()
p.ConnectionTimeout = time.Duration(s.ConnectionTimeout())
@@ -690,13 +663,13 @@ func (i ClientService_PogsImpl) UseConfiguration(p tunnelrpc.ClientService_useCo
}
type UseConfigurationResult struct {
Success bool `json:"success"`
FailedConfigs []*FailedConfig `json:"failed_configs"`
Success bool
FailedConfigs []*FailedConfig
}
type FailedConfig struct {
Config FallibleConfig `json:"config"`
Reason string `json:"reason"`
Config FallibleConfig
Reason string
}
func MarshalFailedConfig(s tunnelrpc.FailedConfig, p *FailedConfig) error {