TUN-6016: Push local managed tunnels configuration to the edge

This commit is contained in:
João Oliveirinha
2022-04-27 11:51:06 +01:00
parent 0180b6d733
commit 99d4e48656
20 changed files with 441 additions and 50 deletions

View File

@@ -30,6 +30,7 @@ var switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols,
type Orchestrator interface {
UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
GetConfigJSON() ([]byte, error)
GetOriginProxy() (OriginProxy, error)
}

View File

@@ -42,6 +42,10 @@ type mockOrchestrator struct {
originProxy OriginProxy
}
func (mcr *mockOrchestrator) GetConfigJSON() ([]byte, error) {
return nil, fmt.Errorf("not implemented")
}
func (*mockOrchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
return &tunnelpogs.UpdateConfigurationResponse{
LastAppliedVersion: version,

View File

@@ -30,11 +30,15 @@ type controlStream struct {
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
type ControlStreamHandler interface {
// ServeControlStream handles the control plane of the transport in the current goroutine calling this
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error
// IsStopped tells whether the method above has finished
IsStopped() bool
}
type TunnelConfigJSONGetter interface {
GetConfigJSON() ([]byte, error)
}
// NewControlStream returns a new instance of ControlStreamHandler
func NewControlStream(
observer *Observer,
@@ -63,15 +67,28 @@ func (c *controlStream) ServeControlStream(
ctx context.Context,
rw io.ReadWriteCloser,
connOptions *tunnelpogs.ConnectionOptions,
tunnelConfigGetter TunnelConfigJSONGetter,
) error {
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
if err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.observer); err != nil {
registrationDetails, err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.observer)
if err != nil {
rpcClient.Close()
return err
}
c.connectedFuse.Connected()
// if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration
if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged {
if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil {
if err := rpcClient.SendLocalConfiguration(ctx, tunnelConfig, c.observer); err != nil {
c.observer.log.Err(err).Msg("unable to send local configuration")
}
} else {
c.observer.log.Err(err).Msg("failed to obtain current configuration")
}
}
c.waitForUnregister(ctx, rpcClient)
return nil
}

View File

@@ -117,7 +117,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch connType {
case TypeControlStream:
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil {
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, c.orchestrator); err != nil {
c.controlStreamErr = err
c.log.Error().Err(err)
respWriter.WriteErrorResponse()

View File

@@ -15,6 +15,7 @@ import (
"time"
"github.com/gobwas/ws/wsutil"
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -166,18 +167,26 @@ type mockNamedTunnelRPCClient struct {
unregistered chan struct{}
}
func (mc mockNamedTunnelRPCClient) SendLocalConfiguration(c context.Context, config []byte, observer *Observer) error {
return nil
}
func (mc mockNamedTunnelRPCClient) RegisterConnection(
c context.Context,
properties *NamedTunnelProperties,
options *tunnelpogs.ConnectionOptions,
connIndex uint8,
observer *Observer,
) error {
) (*tunnelpogs.ConnectionDetails, error) {
if mc.shouldFail != nil {
return mc.shouldFail
return nil, mc.shouldFail
}
close(mc.registered)
return nil
return &tunnelpogs.ConnectionDetails{
Location: "LIS",
UUID: uuid.New(),
TunnelIsRemotelyManaged: false,
}, nil
}
func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
@@ -477,7 +486,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
select {
case <-rpcClientFactory.registered:
break //ok
break // ok
case <-time.Tick(time.Second):
t.Fatal("timeout out waiting for registration")
}
@@ -487,7 +496,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
select {
case <-rpcClientFactory.unregistered:
break //ok
break // ok
case <-time.Tick(time.Second):
t.Fatal("timeout out waiting for unregistered signal")
}

View File

@@ -13,6 +13,7 @@ const (
MetricsNamespace = "cloudflared"
TunnelSubsystem = "tunnel"
muxerSubsystem = "muxer"
configSubsystem = "config"
)
type muxerMetrics struct {
@@ -36,6 +37,11 @@ type muxerMetrics struct {
compRateAve *prometheus.GaugeVec
}
type localConfigMetrics struct {
pushes prometheus.Counter
pushesErrors prometheus.Counter
}
type tunnelMetrics struct {
timerRetries prometheus.Gauge
serverLocations *prometheus.GaugeVec
@@ -51,6 +57,39 @@ type tunnelMetrics struct {
muxerMetrics *muxerMetrics
tunnelsHA tunnelsForHA
userHostnamesCounts *prometheus.CounterVec
localConfigMetrics *localConfigMetrics
}
func newLocalConfigMetrics() *localConfigMetrics {
pushesMetric := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: configSubsystem,
Name: "local_config_pushes",
Help: "Number of local configuration pushes to the edge",
},
)
pushesErrorsMetric := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: configSubsystem,
Name: "local_config_pushes_errors",
Help: "Number of errors occurred during local configuration pushes",
},
)
prometheus.MustRegister(
pushesMetric,
pushesErrorsMetric,
)
return &localConfigMetrics{
pushes: pushesMetric,
pushesErrors: pushesErrorsMetric,
}
}
func newMuxerMetrics() *muxerMetrics {
@@ -386,6 +425,7 @@ func initTunnelMetrics() *tunnelMetrics {
regFail: registerFail,
rpcFail: rpcFail,
userHostnamesCounts: userHostnamesCounts,
localConfigMetrics: newLocalConfigMetrics(),
}
}

View File

@@ -111,7 +111,7 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
func (q *QUICConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
// This blocks until the control plane is done.
err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions)
err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions, q.orchestrator)
if err != nil {
// Not wrapping error here to be consistent with the http2 message.
return err

View File

@@ -163,7 +163,7 @@ type fakeControlStream struct {
ControlStreamHandler
}
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error {
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error {
<-ctx.Done()
return nil
}

View File

@@ -58,6 +58,11 @@ type NamedTunnelRPCClient interface {
options *tunnelpogs.ConnectionOptions,
connIndex uint8,
observer *Observer,
) (*tunnelpogs.ConnectionDetails, error)
SendLocalConfiguration(
c context.Context,
config []byte,
observer *Observer,
) error
GracefulShutdown(ctx context.Context, gracePeriod time.Duration)
Close()
@@ -90,7 +95,7 @@ func (rsc *registrationServerClient) RegisterConnection(
options *tunnelpogs.ConnectionOptions,
connIndex uint8,
observer *Observer,
) error {
) (*tunnelpogs.ConnectionDetails, error) {
conn, err := rsc.client.RegisterConnection(
ctx,
properties.Credentials.Auth(),
@@ -101,10 +106,10 @@ func (rsc *registrationServerClient) RegisterConnection(
if err != nil {
if err.Error() == DuplicateConnectionError {
observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc()
return errDuplicationConnection
return nil, errDuplicationConnection
}
observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc()
return serverRegistrationErrorFromRPC(err)
return nil, serverRegistrationErrorFromRPC(err)
}
observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
@@ -112,7 +117,18 @@ func (rsc *registrationServerClient) RegisterConnection(
observer.logServerInfo(connIndex, conn.Location, fmt.Sprintf("Connection %s registered", conn.UUID))
observer.sendConnectedEvent(connIndex, conn.Location)
return nil
return conn, nil
}
func (rsc *registrationServerClient) SendLocalConfiguration(ctx context.Context, config []byte, observer *Observer) (err error) {
observer.metrics.localConfigMetrics.pushes.Inc()
defer func() {
if err != nil {
observer.metrics.localConfigMetrics.pushesErrors.Inc()
}
}()
return rsc.client.SendLocalConfiguration(ctx, config)
}
func (rsc *registrationServerClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
@@ -274,7 +290,7 @@ func (h *h2muxConnection) registerNamedTunnel(
rpcClient := h.newRPCClientFunc(ctx, stream, h.observer.log)
defer rpcClient.Close()
if err = rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, h.observer); err != nil {
if _, err = rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, h.observer); err != nil {
return err
}
return nil