cloudflared/management/service.go
Devin Carr 38cd455e4d TUN-7373: Streaming logs override for same actor
To help accommodate web browser interactions with websockets, when a
streaming logs session is requested for the same actor while already
serving a session for that user in a separate request, the original
request will be closed and the new request start streaming logs
instead. This should help with rogue sessions holding on for too long
with no client on the other side (before idle timeout or connection
close).
2023-04-21 11:54:37 -07:00

303 lines
9.5 KiB
Go

package management
import (
"context"
"fmt"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/rs/zerolog"
"nhooyr.io/websocket"
)
const (
// In the current state, an invalid command was provided by the client
StatusInvalidCommand websocket.StatusCode = 4001
reasonInvalidCommand = "expected start streaming as first event"
// There are a limited number of available streaming log sessions that cloudflared will service, exceeding this
// value will return this error to incoming requests.
StatusSessionLimitExceeded websocket.StatusCode = 4002
reasonSessionLimitExceeded = "limit exceeded for streaming sessions"
// There is a limited idle time while not actively serving a session for a request before dropping the connection.
StatusIdleLimitExceeded websocket.StatusCode = 4003
reasonIdleLimitExceeded = "session was idle for too long"
)
type ManagementService struct {
// The management tunnel hostname
Hostname string
// Host details related configurations
serviceIP string
clientID uuid.UUID
label string
log *zerolog.Logger
router chi.Router
// streamingMut is a lock to prevent concurrent requests to start streaming. Utilizing the atomic.Bool is not
// sufficient to complete this operation since many other checks during an incoming new request are needed
// to validate this before setting streaming to true.
streamingMut sync.Mutex
logger LoggerListener
}
func New(managementHostname string,
serviceIP string,
clientID uuid.UUID,
label string,
log *zerolog.Logger,
logger LoggerListener,
) *ManagementService {
s := &ManagementService{
Hostname: managementHostname,
log: log,
logger: logger,
serviceIP: serviceIP,
clientID: clientID,
label: label,
}
r := chi.NewRouter()
r.Use(ValidateAccessTokenQueryMiddleware)
r.Get("/ping", ping)
r.Head("/ping", ping)
r.Get("/logs", s.logs)
r.Get("/host_details", s.getHostDetails)
s.router = r
return s
}
func (m *ManagementService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.router.ServeHTTP(w, r)
}
// Management Ping handler
func ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
// The response provided by the /host_details endpoint
type getHostDetailsResponse struct {
ClientID string `json:"connector_id"`
IP string `json:"ip,omitempty"`
HostName string `json:"hostname,omitempty"`
}
func (m *ManagementService) getHostDetails(w http.ResponseWriter, r *http.Request) {
var getHostDetailsResponse = getHostDetailsResponse{
ClientID: m.clientID.String(),
}
if ip, err := getPrivateIP(m.serviceIP); err == nil {
getHostDetailsResponse.IP = ip
}
getHostDetailsResponse.HostName = m.getLabel()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
json.NewEncoder(w).Encode(getHostDetailsResponse)
}
func (m *ManagementService) getLabel() string {
if m.label != "" {
return fmt.Sprintf("custom:%s", m.label)
}
// If no label is provided we return the system hostname. This is not
// a fqdn hostname.
hostname, err := os.Hostname()
if err != nil {
return "unknown"
}
return hostname
}
// Get preferred private ip of this machine
func getPrivateIP(addr string) (string, error) {
conn, err := net.DialTimeout("tcp", addr, 1*time.Second)
if err != nil {
return "", err
}
defer conn.Close()
localAddr := conn.LocalAddr().String()
host, _, err := net.SplitHostPort(localAddr)
return host, err
}
// readEvents will loop through all incoming websocket messages from a client and marshal them into the
// proper Event structure and pass through to the events channel. Any invalid messages sent will automatically
// terminate the connection.
func (m *ManagementService) readEvents(c *websocket.Conn, ctx context.Context, events chan<- *ClientEvent) {
for {
event, err := ReadClientEvent(c, ctx)
select {
case <-ctx.Done():
return
default:
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Send()
m.log.Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
}
// Any errors when reading the messages from the client will close the connection
return
}
events <- event
}
}
}
// streamLogs will begin the process of reading from the Session listener and write the log events to the client.
func (m *ManagementService) streamLogs(c *websocket.Conn, ctx context.Context, session *session) {
for session.Active() {
select {
case <-ctx.Done():
session.Stop()
return
case event := <-session.listener:
err := WriteEvent(c, ctx, &EventLog{
ServerEvent: ServerEvent{Type: Logs},
Logs: []*Log{event},
})
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Send()
m.log.Err(c.Close(websocket.StatusInternalError, err.Error())).Send()
}
// Any errors when writing the messages to the client will stop streaming and close the connection
session.Stop()
return
}
default:
// No messages to send
}
}
}
// canStartStream will check the conditions of the request and return if the session can begin streaming.
func (m *ManagementService) canStartStream(session *session) bool {
m.streamingMut.Lock()
defer m.streamingMut.Unlock()
// Limits to one actor for streaming logs
if m.logger.ActiveSessions() > 0 {
// Allow the same user to preempt their existing session to disconnect their old session and start streaming
// with this new session instead.
if existingSession := m.logger.ActiveSession(session.actor); existingSession != nil {
m.log.Info().
Msgf("Another management session request for the same actor was requested; the other session will be disconnected to handle the new request.")
existingSession.Stop()
m.logger.Remove(existingSession)
existingSession.cancel()
} else {
m.log.Warn().
Msgf("Another management session request was attempted but one session already being served; there is a limit of streaming log sessions to reduce overall performance impact.")
return false
}
}
return true
}
// parseFilters will check the ClientEvent for start_streaming and assign filters if provided to the session
func (m *ManagementService) parseFilters(c *websocket.Conn, event *ClientEvent, session *session) bool {
// Expect the first incoming request
startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
if !ok {
return false
}
session.Filters(startEvent.Filters)
return true
}
// Management Streaming Logs accept handler
func (m *ManagementService) logs(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, nil)
if err != nil {
m.log.Debug().Msgf("management handshake: %s", err.Error())
return
}
// Make sure the connection is closed if other go routines fail to close the connection after completing.
defer c.Close(websocket.StatusInternalError, "")
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
events := make(chan *ClientEvent)
go m.readEvents(c, ctx, events)
// Send a heartbeat ping to hold the connection open even if not streaming.
ping := time.NewTicker(15 * time.Second)
defer ping.Stop()
// Close the connection if no operation has occurred after the idle timeout. The timeout is halted
// when streaming logs is active.
idleTimeout := 5 * time.Minute
idle := time.NewTimer(idleTimeout)
defer idle.Stop()
// Fetch the claims from the request context to acquire the actor
claims, ok := ctx.Value(accessClaimsCtxKey).(*managementTokenClaims)
if !ok || claims == nil {
// Typically should never happen as it is provided in the context from the middleware
m.log.Err(c.Close(websocket.StatusInternalError, "missing access_token")).Send()
return
}
session := newSession(logWindow, claims.Actor, cancel)
defer m.logger.Remove(session)
for {
select {
case <-ctx.Done():
m.log.Debug().Msgf("management logs: context cancelled")
c.Close(websocket.StatusNormalClosure, "context closed")
return
case event := <-events:
switch event.Type {
case StartStreaming:
idle.Stop()
// Expect the first incoming request
startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
if !ok {
m.log.Warn().Msgf("expected start_streaming as first recieved event")
m.log.Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Send()
return
}
// Make sure the session can start
if !m.canStartStream(session) {
m.log.Err(c.Close(StatusSessionLimitExceeded, reasonSessionLimitExceeded)).Send()
return
}
session.Filters(startEvent.Filters)
m.logger.Listen(session)
m.log.Debug().Msgf("Streaming logs")
go m.streamLogs(c, ctx, session)
continue
case StopStreaming:
idle.Reset(idleTimeout)
// Stop the current session for the current actor who requested it
session.Stop()
m.logger.Remove(session)
case UnknownClientEventType:
fallthrough
default:
// Drop unknown events and close connection
m.log.Debug().Msgf("unexpected management message received: %s", event.Type)
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
}
return
}
case <-ping.C:
go c.Ping(ctx)
case <-idle.C:
c.Close(StatusIdleLimitExceeded, reasonIdleLimitExceeded)
return
}
}
}