TUN-7125: Add management streaming logs WebSocket protocol

This commit is contained in:
Devin Carr
2023-04-04 15:45:32 -07:00
parent 5972540efa
commit 93acdaface
53 changed files with 12367 additions and 2 deletions

View File

@@ -1,5 +1,67 @@
package management
import (
"context"
"errors"
"fmt"
"io"
jsoniter "github.com/json-iterator/go"
"github.com/rs/zerolog"
"nhooyr.io/websocket"
)
var (
errInvalidMessageType = fmt.Errorf("invalid message type was provided")
)
// ServerEventType represents the event types that can come from the server
type ServerEventType string
// ClientEventType represents the event types that can come from the client
type ClientEventType string
const (
UnknownClientEventType ClientEventType = ""
StartStreaming ClientEventType = "start_streaming"
StopStreaming ClientEventType = "stop_streaming"
UnknownServerEventType ServerEventType = ""
Logs ServerEventType = "logs"
)
// ServerEvent is the base struct that informs, based of the Type field, which Event type was provided from the server.
type ServerEvent struct {
Type ServerEventType `json:"type,omitempty"`
// The raw json message is provided to allow better deserialization once the type is known
event jsoniter.RawMessage
}
// ClientEvent is the base struct that informs, based of the Type field, which Event type was provided from the client.
type ClientEvent struct {
Type ClientEventType `json:"type,omitempty"`
// The raw json message is provided to allow better deserialization once the type is known
event jsoniter.RawMessage
}
// EventStartStreaming signifies that the client wishes to start receiving log events.
// Additional filters can be provided to augment the log events requested.
type EventStartStreaming struct {
ClientEvent
Filters []string `json:"filters"`
}
// EventStopStreaming signifies that the client wishes to halt receiving log events.
type EventStopStreaming struct {
ClientEvent
}
// EventLog is the event that the server sends to the client with the log events.
type EventLog struct {
ServerEvent
Logs []Log `json:"logs"`
}
// LogEventType is the way that logging messages are able to be filtered.
// Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only
// the Cloudflared-related events.
@@ -38,3 +100,113 @@ const (
Warn LogLevel = "warn"
Error LogLevel = "error"
)
// Log is the basic structure of the events that are sent to the client.
type Log struct {
Event LogEventType `json:"event"`
Timestamp string `json:"timestamp"`
Level LogLevel `json:"level"`
Message string `json:"message"`
}
// IntoClientEvent unmarshals the provided ClientEvent into the proper type.
func IntoClientEvent[T EventStartStreaming | EventStopStreaming](e *ClientEvent, eventType ClientEventType) (*T, bool) {
if e.Type != eventType {
return nil, false
}
event := new(T)
err := json.Unmarshal(e.event, event)
if err != nil {
return nil, false
}
return event, true
}
// IntoServerEvent unmarshals the provided ServerEvent into the proper type.
func IntoServerEvent[T EventLog](e *ServerEvent, eventType ServerEventType) (*T, bool) {
if e.Type != eventType {
return nil, false
}
event := new(T)
err := json.Unmarshal(e.event, event)
if err != nil {
return nil, false
}
return event, true
}
// ReadEvent will read a message from the websocket connection and parse it into a valid ServerEvent.
func ReadServerEvent(c *websocket.Conn, ctx context.Context) (*ServerEvent, error) {
message, err := readMessage(c, ctx)
if err != nil {
return nil, err
}
event := ServerEvent{}
if err := json.Unmarshal(message, &event); err != nil {
return nil, err
}
switch event.Type {
case Logs:
event.event = message
return &event, nil
case UnknownServerEventType:
return nil, errInvalidMessageType
default:
return nil, fmt.Errorf("invalid server message type was provided: %s", event.Type)
}
}
// ReadEvent will read a message from the websocket connection and parse it into a valid ClientEvent.
func ReadClientEvent(c *websocket.Conn, ctx context.Context) (*ClientEvent, error) {
message, err := readMessage(c, ctx)
if err != nil {
return nil, err
}
event := ClientEvent{}
if err := json.Unmarshal(message, &event); err != nil {
return nil, err
}
switch event.Type {
case StartStreaming, StopStreaming:
event.event = message
return &event, nil
case UnknownClientEventType:
return nil, errInvalidMessageType
default:
return nil, fmt.Errorf("invalid client message type was provided: %s", event.Type)
}
}
// readMessage will read a message from the websocket connection and return the payload.
func readMessage(c *websocket.Conn, ctx context.Context) ([]byte, error) {
messageType, reader, err := c.Reader(ctx)
if err != nil {
return nil, err
}
if messageType != websocket.MessageText {
return nil, errInvalidMessageType
}
return io.ReadAll(reader)
}
// WriteEvent will write a Event type message to the websocket connection.
func WriteEvent(c *websocket.Conn, ctx context.Context, event any) error {
payload, err := json.Marshal(event)
if err != nil {
return err
}
return c.Write(ctx, websocket.MessageText, payload)
}
// IsClosed returns true if the websocket error is a websocket.CloseError; returns false if not a
// websocket.CloseError
func IsClosed(err error, log *zerolog.Logger) bool {
var closeErr websocket.CloseError
if errors.As(err, &closeErr) {
if closeErr.Code != websocket.StatusNormalClosure {
log.Debug().Msgf("connection is already closed: (%d) %s", closeErr.Code, closeErr.Reason)
}
return true
}
return false
}

168
management/events_test.go Normal file
View File

@@ -0,0 +1,168 @@
package management
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"nhooyr.io/websocket"
"github.com/cloudflare/cloudflared/internal/test"
)
func TestIntoClientEvent_StartStreaming(t *testing.T) {
event := ClientEvent{
Type: StartStreaming,
event: []byte(`{"type": "start_streaming"}`),
}
ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
require.True(t, ok)
require.Equal(t, EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}}, *ce)
}
func TestIntoClientEvent_StopStreaming(t *testing.T) {
event := ClientEvent{
Type: StopStreaming,
event: []byte(`{"type": "stop_streaming"}`),
}
ce, ok := IntoClientEvent[EventStopStreaming](&event, StopStreaming)
require.True(t, ok)
require.Equal(t, EventStopStreaming{ClientEvent: ClientEvent{Type: StopStreaming}}, *ce)
}
func TestIntoClientEvent_Invalid(t *testing.T) {
event := ClientEvent{
Type: UnknownClientEventType,
event: []byte(`{"type": "invalid"}`),
}
_, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
require.False(t, ok)
}
func TestIntoServerEvent_Logs(t *testing.T) {
event := ServerEvent{
Type: Logs,
event: []byte(`{"type": "logs"}`),
}
ce, ok := IntoServerEvent(&event, Logs)
require.True(t, ok)
require.Equal(t, EventLog{ServerEvent: ServerEvent{Type: Logs}}, *ce)
}
func TestIntoServerEvent_Invalid(t *testing.T) {
event := ServerEvent{
Type: UnknownServerEventType,
event: []byte(`{"type": "invalid"}`),
}
_, ok := IntoServerEvent(&event, Logs)
require.False(t, ok)
}
func TestReadServerEvent(t *testing.T) {
sentEvent := EventLog{
ServerEvent: ServerEvent{Type: Logs},
Logs: []Log{
{
Timestamp: time.Now().UTC().Format(time.RFC3339),
Event: HTTP,
Level: Info,
Message: "test",
},
},
}
client, server := test.WSPipe(nil, nil)
server.CloseRead(context.Background())
defer func() {
server.Close(websocket.StatusInternalError, "")
}()
go func() {
err := WriteEvent(server, context.Background(), &sentEvent)
require.NoError(t, err)
}()
event, err := ReadServerEvent(client, context.Background())
require.NoError(t, err)
require.Equal(t, sentEvent.Type, event.Type)
client.Close(websocket.StatusInternalError, "")
}
func TestReadServerEvent_InvalidWebSocketMessageType(t *testing.T) {
client, server := test.WSPipe(nil, nil)
server.CloseRead(context.Background())
defer func() {
server.Close(websocket.StatusInternalError, "")
}()
go func() {
err := server.Write(context.Background(), websocket.MessageBinary, []byte("test1234"))
require.NoError(t, err)
}()
_, err := ReadServerEvent(client, context.Background())
require.Error(t, err)
client.Close(websocket.StatusInternalError, "")
}
func TestReadServerEvent_InvalidMessageType(t *testing.T) {
sentEvent := ClientEvent{Type: ClientEventType(UnknownServerEventType)}
client, server := test.WSPipe(nil, nil)
server.CloseRead(context.Background())
defer func() {
server.Close(websocket.StatusInternalError, "")
}()
go func() {
err := WriteEvent(server, context.Background(), &sentEvent)
require.NoError(t, err)
}()
_, err := ReadServerEvent(client, context.Background())
require.ErrorIs(t, err, errInvalidMessageType)
client.Close(websocket.StatusInternalError, "")
}
func TestReadClientEvent(t *testing.T) {
sentEvent := EventStartStreaming{
ClientEvent: ClientEvent{Type: StartStreaming},
}
client, server := test.WSPipe(nil, nil)
client.CloseRead(context.Background())
defer func() {
client.Close(websocket.StatusInternalError, "")
}()
go func() {
err := WriteEvent(client, context.Background(), &sentEvent)
require.NoError(t, err)
}()
event, err := ReadClientEvent(server, context.Background())
require.NoError(t, err)
require.Equal(t, sentEvent.Type, event.Type)
server.Close(websocket.StatusInternalError, "")
}
func TestReadClientEvent_InvalidWebSocketMessageType(t *testing.T) {
client, server := test.WSPipe(nil, nil)
client.CloseRead(context.Background())
defer func() {
client.Close(websocket.StatusInternalError, "")
}()
go func() {
err := client.Write(context.Background(), websocket.MessageBinary, []byte("test1234"))
require.NoError(t, err)
}()
_, err := ReadClientEvent(server, context.Background())
require.Error(t, err)
server.Close(websocket.StatusInternalError, "")
}
func TestReadClientEvent_InvalidMessageType(t *testing.T) {
sentEvent := ClientEvent{Type: UnknownClientEventType}
client, server := test.WSPipe(nil, nil)
client.CloseRead(context.Background())
defer func() {
client.Close(websocket.StatusInternalError, "")
}()
go func() {
err := WriteEvent(client, context.Background(), &sentEvent)
require.NoError(t, err)
}()
_, err := ReadClientEvent(server, context.Background())
require.ErrorIs(t, err, errInvalidMessageType)
server.Close(websocket.StatusInternalError, "")
}

View File

@@ -1,10 +1,24 @@
package management
import (
"context"
"net/http"
"sync"
"sync/atomic"
"github.com/go-chi/chi/v5"
"github.com/rs/zerolog"
"nhooyr.io/websocket"
)
const (
// In the current state, an invalid command was provided by the client
StatusInvalidCommand websocket.StatusCode = 4001
reasonInvalidCommand = "expected start streaming as first event"
// There are a limited number of available streaming log sessions that cloudflared will service, exceeding this
// value will return this error to incoming requests.
StatusSessionLimitExceeded websocket.StatusCode = 4002
reasonSessionLimitExceeded = "limit exceeded for streaming sessions"
)
type ManagementService struct {
@@ -13,7 +27,15 @@ type ManagementService struct {
log *zerolog.Logger
router chi.Router
logger LoggerListener
// streaming signifies if the service is already streaming logs. Helps limit the number of active users streaming logs
// from this cloudflared instance.
streaming atomic.Bool
// streamingMut is a lock to prevent concurrent requests to start streaming. Utilizing the atomic.Bool is not
// sufficient to complete this operation since many other checks during an incoming new request are needed
// to validate this before setting streaming to true.
streamingMut sync.Mutex
logger LoggerListener
}
func New(managementHostname string, log *zerolog.Logger, logger LoggerListener) *ManagementService {
@@ -25,6 +47,7 @@ func New(managementHostname string, log *zerolog.Logger, logger LoggerListener)
r := chi.NewRouter()
r.Get("/ping", ping)
r.Head("/ping", ping)
r.Get("/logs", s.logs)
s.router = r
return s
}
@@ -37,3 +60,127 @@ func (m *ManagementService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
// readEvents will loop through all incoming websocket messages from a client and marshal them into the
// proper Event structure and pass through to the events channel. Any invalid messages sent will automatically
// terminate the connection.
func (m *ManagementService) readEvents(c *websocket.Conn, ctx context.Context, events chan<- *ClientEvent) {
for {
event, err := ReadClientEvent(c, ctx)
select {
case <-ctx.Done():
return
default:
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Send()
m.log.Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
}
// Any errors when reading the messages from the client will close the connection
return
}
events <- event
}
}
}
// streamLogs will begin the process of reading from the Session listener and write the log events to the client.
func (m *ManagementService) streamLogs(c *websocket.Conn, ctx context.Context, session *Session) {
defer m.logger.Close(session)
for m.streaming.Load() {
select {
case <-ctx.Done():
m.streaming.Store(false)
return
case event := <-session.listener:
err := WriteEvent(c, ctx, &EventLog{
ServerEvent: ServerEvent{Type: Logs},
Logs: []Log{{
Event: Cloudflared,
Timestamp: event.Time,
Level: event.Level,
Message: event.Message,
}},
})
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Send()
m.log.Err(c.Close(websocket.StatusInternalError, err.Error())).Send()
}
// Any errors when writing the messages to the client will stop streaming and close the connection
m.streaming.Store(false)
return
}
default:
// No messages to send
}
}
}
// startStreaming will check the conditions of the request and begin streaming or close the connection for invalid
// requests.
func (m *ManagementService) startStreaming(c *websocket.Conn, ctx context.Context, event *ClientEvent) {
m.streamingMut.Lock()
defer m.streamingMut.Unlock()
// Limits to one user for streaming logs
if m.streaming.Load() {
m.log.Warn().
Msgf("Another management session request was attempted but one session already being served; there is a limit of streaming log sessions to reduce overall performance impact.")
m.log.Err(c.Close(StatusSessionLimitExceeded, reasonSessionLimitExceeded)).Send()
return
}
// Expect the first incoming request
_, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
if !ok {
m.log.Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Msgf("expected start_streaming as first recieved event")
return
}
m.streaming.Store(true)
listener := m.logger.Listen()
m.log.Debug().Msgf("Streaming logs")
go m.streamLogs(c, ctx, listener)
}
// Management Streaming Logs accept handler
func (m *ManagementService) logs(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, nil)
if err != nil {
m.log.Debug().Msgf("management handshake: %s", err.Error())
return
}
// Make sure the connection is closed if other go routines fail to close the connection after completing.
defer c.Close(websocket.StatusInternalError, "")
ctx := r.Context()
events := make(chan *ClientEvent)
go m.readEvents(c, ctx, events)
for {
select {
case <-ctx.Done():
m.log.Debug().Msgf("management logs: context cancelled")
c.Close(websocket.StatusNormalClosure, "context closed")
return
case event := <-events:
switch event.Type {
case StartStreaming:
m.startStreaming(c, ctx, event)
continue
case StopStreaming:
// TODO: limit StopStreaming to only halt streaming for clients that are already streaming
m.streaming.Store(false)
case UnknownClientEventType:
fallthrough
default:
// Drop unknown events and close connection
m.log.Debug().Msgf("unexpected management message received: %s", event.Type)
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
}
return
}
}
}
}

View File

@@ -0,0 +1,61 @@
package management
import (
"context"
"io"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"nhooyr.io/websocket"
"github.com/cloudflare/cloudflared/internal/test"
)
var (
noopLogger = zerolog.New(io.Discard)
)
func TestReadEventsLoop(t *testing.T) {
sentEvent := EventStartStreaming{
ClientEvent: ClientEvent{Type: StartStreaming},
}
client, server := test.WSPipe(nil, nil)
client.CloseRead(context.Background())
defer func() {
client.Close(websocket.StatusInternalError, "")
}()
go func() {
err := WriteEvent(client, context.Background(), &sentEvent)
require.NoError(t, err)
}()
m := ManagementService{
log: &noopLogger,
}
events := make(chan *ClientEvent)
go m.readEvents(server, context.Background(), events)
event := <-events
require.Equal(t, sentEvent.Type, event.Type)
server.Close(websocket.StatusInternalError, "")
}
func TestReadEventsLoop_ContextCancelled(t *testing.T) {
client, server := test.WSPipe(nil, nil)
ctx, cancel := context.WithCancel(context.Background())
client.CloseRead(ctx)
defer func() {
client.Close(websocket.StatusInternalError, "")
}()
m := ManagementService{
log: &noopLogger,
}
events := make(chan *ClientEvent)
go func() {
time.Sleep(time.Second)
cancel()
}()
// Want to make sure this function returns when context is cancelled
m.readEvents(server, ctx, events)
server.Close(websocket.StatusInternalError, "")
}