TUN-2309: Split ConnectResult into ConnectError and ConnectSuccess, each implementing its own capnp serialization logic

This commit is contained in:
Chung-Ting Huang
2019-09-17 16:58:49 -05:00
parent 4f23da2a6d
commit 5bcb2da0fe
6 changed files with 564 additions and 315 deletions

View File

@@ -50,7 +50,7 @@ func (c *Connection) Serve(ctx context.Context) error {
}
// Connect is used to establish connections with cloudflare's edge network
func (c *Connection) Connect(ctx context.Context, parameters *tunnelpogs.ConnectParameters, logger *logrus.Entry) (*pogs.ConnectResult, error) {
func (c *Connection) Connect(ctx context.Context, parameters *tunnelpogs.ConnectParameters, logger *logrus.Entry) (pogs.ConnectResult, error) {
openStreamCtx, cancel := context.WithTimeout(ctx, openStreamTimeout)
defer cancel()

View File

@@ -17,8 +17,9 @@ import (
)
const (
quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
defaultRetryAfter = time.Second * 5
)
// EdgeManager manages connections with the edge
@@ -87,8 +88,12 @@ func (em *EdgeManager) Run(ctx context.Context) error {
// Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
// in shouldCreateConnection or shouldReduceConnection calculation
if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
if err := em.newConnection(ctx); err != nil {
em.logger.WithError(err).Error("cannot create new connection")
if connErr := em.newConnection(ctx); connErr != nil {
if !connErr.ShouldRetry {
em.logger.WithError(connErr).Error(em.noRetryMessage())
return connErr
}
em.logger.WithError(connErr).Error("cannot create new connection")
}
} else if em.state.shouldReduceConnection() {
if err := em.closeConnection(ctx); err != nil {
@@ -103,11 +108,11 @@ func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurab
em.state.updateConfigurable(newConfigurable)
}
func (em *EdgeManager) newConnection(ctx context.Context) error {
func (em *EdgeManager) newConnection(ctx context.Context) *pogs.ConnectError {
edgeIP := em.serviceDiscoverer.Addr()
edgeConn, err := em.dialEdge(ctx, edgeIP)
if err != nil {
return errors.Wrap(err, "dial edge error")
return retryConnection(fmt.Sprintf("dial edge error: %v", err))
}
configurable := em.state.getConfigurable()
// Establish a muxed connection with the edge
@@ -121,12 +126,12 @@ func (em *EdgeManager) newConnection(ctx context.Context) error {
Logger: em.logger.WithField("subsystem", "muxer"),
})
if err != nil {
return errors.Wrap(err, "couldn't perform handshake with edge")
retryConnection(fmt.Sprintf("couldn't perform handshake with edge: %v", err))
}
h2muxConn, err := newConnection(muxer, edgeIP)
if err != nil {
return errors.Wrap(err, "couldn't create h2mux connection")
return retryConnection(fmt.Sprintf("couldn't create h2mux connection: %v", err))
}
go em.serveConn(ctx, h2muxConn)
@@ -141,18 +146,15 @@ func (em *EdgeManager) newConnection(ctx context.Context) error {
}, em.logger)
if err != nil {
h2muxConn.Shutdown()
return errors.Wrap(err, "couldn't connect to edge")
return retryConnection(fmt.Sprintf("couldn't connect to edge: %v", err))
}
if connErr := connResult.Err; connErr != nil {
if !connErr.ShouldRetry {
return errors.Wrap(connErr, em.noRetryMessage())
}
return errors.Wrapf(connErr, "edge responded with RetryAfter=%v", connErr.RetryAfter)
if connErr := connResult.ConnectError(); connErr != nil {
return connErr
}
em.state.newConnection(h2muxConn)
em.logger.Infof("connected to %s", connResult.ServerInfo.LocationName)
em.logger.Infof("connected to %s", connResult.ConnectedTo())
return nil
}
@@ -282,3 +284,11 @@ func (ems *edgeManagerState) getUserCredential() []byte {
defer ems.RUnlock()
return ems.userCredential
}
func retryConnection(cause string) *pogs.ConnectError {
return &pogs.ConnectError{
Cause: cause,
RetryAfter: defaultRetryAfter,
ShouldRetry: true,
}
}