mirror of
https://github.com/cloudflare/cloudflared.git
synced 2025-07-27 00:29:58 +00:00
AUTH-2021 - s3 bucket uploading for SSH logs
This commit is contained in:
106
awsuploader/directory_upload_manager.go
Normal file
106
awsuploader/directory_upload_manager.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package awsuploader
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// DirectoryUploadManager is used to manage file uploads on an interval from a directory
|
||||
type DirectoryUploadManager struct {
|
||||
logger *logrus.Logger
|
||||
uploader Uploader
|
||||
rootDirectory string
|
||||
sweepInterval time.Duration
|
||||
ticker *time.Ticker
|
||||
shutdownC chan struct{}
|
||||
workQueue chan string
|
||||
}
|
||||
|
||||
// NewDirectoryUploadManager create a new DirectoryUploadManager
|
||||
// uploader is an Uploader to use as an actual uploading engine
|
||||
// directory is the directory to sweep for files to upload
|
||||
// sweepInterval is how often to iterate the directory and upload the files within
|
||||
func NewDirectoryUploadManager(logger *logrus.Logger, uploader Uploader, directory string, sweepInterval time.Duration, shutdownC chan struct{}) *DirectoryUploadManager {
|
||||
workerCount := 10
|
||||
manager := &DirectoryUploadManager{
|
||||
logger: logger,
|
||||
uploader: uploader,
|
||||
rootDirectory: directory,
|
||||
sweepInterval: sweepInterval,
|
||||
shutdownC: shutdownC,
|
||||
workQueue: make(chan string, workerCount),
|
||||
}
|
||||
|
||||
//start workers
|
||||
for i := 0; i < workerCount; i++ {
|
||||
go manager.worker()
|
||||
}
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
// Upload a file using the uploader
|
||||
// This is useful for "out of band" uploads that need to be triggered immediately instead of waiting for the sweep
|
||||
func (m *DirectoryUploadManager) Upload(filepath string) error {
|
||||
return m.uploader.Upload(filepath)
|
||||
}
|
||||
|
||||
// Start the upload ticker to walk the directories
|
||||
func (m *DirectoryUploadManager) Start() {
|
||||
m.ticker = time.NewTicker(m.sweepInterval)
|
||||
go m.run()
|
||||
}
|
||||
|
||||
func (m *DirectoryUploadManager) run() {
|
||||
for {
|
||||
select {
|
||||
case <-m.shutdownC:
|
||||
m.ticker.Stop()
|
||||
return
|
||||
case <-m.ticker.C:
|
||||
m.sweep()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sweep the directory and kick off uploads
|
||||
func (m *DirectoryUploadManager) sweep() {
|
||||
filepath.Walk(m.rootDirectory, func(path string, info os.FileInfo, err error) error {
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
//30 days ago
|
||||
retentionTime := 30 * (time.Hour * 24)
|
||||
checkTime := time.Now().Add(-time.Duration(retentionTime))
|
||||
|
||||
//delete the file it is stale
|
||||
if info.ModTime().After(checkTime) {
|
||||
os.Remove(path)
|
||||
return nil
|
||||
}
|
||||
//add the upload to the work queue
|
||||
go func() {
|
||||
m.workQueue <- path
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// worker handles upload requests
|
||||
func (m *DirectoryUploadManager) worker() {
|
||||
for {
|
||||
select {
|
||||
case <-m.shutdownC:
|
||||
return
|
||||
case filepath := <-m.workQueue:
|
||||
if err := m.Upload(filepath); err != nil {
|
||||
m.logger.WithError(err).Error("Cannot upload file to s3 bucket")
|
||||
} else {
|
||||
os.Remove(filepath)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
137
awsuploader/directory_upload_manager_test.go
Normal file
137
awsuploader/directory_upload_manager_test.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package awsuploader
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type MockUploader struct {
|
||||
shouldFail bool
|
||||
}
|
||||
|
||||
func (m *MockUploader) Upload(filepath string) error {
|
||||
if m.shouldFail {
|
||||
return errors.New("upload set to fail")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMockUploader(shouldFail bool) Uploader {
|
||||
return &MockUploader{shouldFail: shouldFail}
|
||||
}
|
||||
|
||||
func getDirectoryPath(t *testing.T) string {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatal("couldn't create the test directory!", err)
|
||||
}
|
||||
return filepath.Join(dir, "uploads")
|
||||
}
|
||||
|
||||
func setupTestDirectory(t *testing.T) string {
|
||||
path := getDirectoryPath(t)
|
||||
os.RemoveAll(path)
|
||||
time.Sleep(100 * time.Millisecond) //short way to wait for the OS to delete the folder
|
||||
err := os.MkdirAll(path, os.ModePerm)
|
||||
if err != nil {
|
||||
t.Fatal("couldn't create the test directory!", err)
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
func createUploadManager(t *testing.T, shouldFailUpload bool) *DirectoryUploadManager {
|
||||
rootDirectory := setupTestDirectory(t)
|
||||
uploader := NewMockUploader(shouldFailUpload)
|
||||
logger := logrus.New()
|
||||
shutdownC := make(chan struct{})
|
||||
return NewDirectoryUploadManager(logger, uploader, rootDirectory, 1*time.Second, shutdownC)
|
||||
}
|
||||
|
||||
func createFile(t *testing.T, fileName string) (*os.File, string) {
|
||||
path := filepath.Join(getDirectoryPath(t), fileName)
|
||||
f, err := os.Create(path)
|
||||
if err != nil {
|
||||
t.Fatal("upload to create file for sweep test", err)
|
||||
}
|
||||
return f, path
|
||||
}
|
||||
|
||||
func TestUploadSuccess(t *testing.T) {
|
||||
manager := createUploadManager(t, false)
|
||||
path := filepath.Join(getDirectoryPath(t), "test_file")
|
||||
if err := manager.Upload(path); err != nil {
|
||||
t.Fatal("the upload request method failed", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFailure(t *testing.T) {
|
||||
manager := createUploadManager(t, true)
|
||||
path := filepath.Join(getDirectoryPath(t), "test_file")
|
||||
if err := manager.Upload(path); err == nil {
|
||||
t.Fatal("the upload request method should have failed and didn't", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweepSuccess(t *testing.T) {
|
||||
manager := createUploadManager(t, false)
|
||||
f, path := createFile(t, "test_file")
|
||||
defer f.Close()
|
||||
|
||||
manager.Start()
|
||||
time.Sleep(2 * time.Second)
|
||||
if _, err := os.Stat(path); os.IsExist(err) {
|
||||
//the file should have been deleted
|
||||
t.Fatal("the manager failed to delete the file", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweepFailure(t *testing.T) {
|
||||
manager := createUploadManager(t, true)
|
||||
f, path := createFile(t, "test_file")
|
||||
defer f.Close()
|
||||
|
||||
manager.Start()
|
||||
time.Sleep(2 * time.Second)
|
||||
_, serr := f.Stat()
|
||||
if serr != nil {
|
||||
//the file should still exist
|
||||
os.Remove(path)
|
||||
t.Fatal("the manager failed to delete the file", serr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHighLoad(t *testing.T) {
|
||||
manager := createUploadManager(t, false)
|
||||
for i := 0; i < 30; i++ {
|
||||
f, _ := createFile(t, randomString(6))
|
||||
defer f.Close()
|
||||
}
|
||||
manager.Start()
|
||||
time.Sleep(4 * time.Second)
|
||||
|
||||
directory := getDirectoryPath(t)
|
||||
files, err := ioutil.ReadDir(directory)
|
||||
if err != nil || len(files) > 0 {
|
||||
t.Fatalf("the manager failed to upload all the files: %s files left: %d", err, len(files))
|
||||
}
|
||||
}
|
||||
|
||||
// LowerCase [a-z]
|
||||
const randSet = "abcdefghijklmnopqrstuvwxyz"
|
||||
|
||||
// String returns a string of length 'n' from a set of letters 'lset'
|
||||
func randomString(n int) string {
|
||||
b := make([]byte, n)
|
||||
lsetLen := len(randSet)
|
||||
for i := range b {
|
||||
b[i] = randSet[rand.Intn(lsetLen)]
|
||||
}
|
||||
return string(b)
|
||||
}
|
62
awsuploader/file_uploader.go
Normal file
62
awsuploader/file_uploader.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package awsuploader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
)
|
||||
|
||||
//FileUploader aws compliant bucket upload
|
||||
type FileUploader struct {
|
||||
storage *s3.S3
|
||||
bucketName string
|
||||
clientID string
|
||||
secretID string
|
||||
}
|
||||
|
||||
// NewFileUploader creates a new S3 compliant bucket uploader
|
||||
func NewFileUploader(bucketName, region, accessKeyID, secretID, token, s3Host string) (*FileUploader, error) {
|
||||
sess, err := session.NewSession(&aws.Config{
|
||||
Region: aws.String(region),
|
||||
Credentials: credentials.NewStaticCredentials(accessKeyID, secretID, token),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var storage *s3.S3
|
||||
if s3Host != "" {
|
||||
storage = s3.New(sess, &aws.Config{Endpoint: aws.String(s3Host)})
|
||||
} else {
|
||||
storage = s3.New(sess)
|
||||
}
|
||||
|
||||
return &FileUploader{
|
||||
storage: storage,
|
||||
bucketName: bucketName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Upload a file to the bucket
|
||||
func (u *FileUploader) Upload(filepath string) error {
|
||||
info, err := os.Stat(filepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
file, err := os.Open(filepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
_, serr := u.storage.PutObjectWithContext(context.Background(), &s3.PutObjectInput{
|
||||
Bucket: aws.String(u.bucketName),
|
||||
Key: aws.String(info.Name()),
|
||||
Body: file,
|
||||
})
|
||||
return serr
|
||||
}
|
7
awsuploader/upload_manager.go
Normal file
7
awsuploader/upload_manager.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package awsuploader
|
||||
|
||||
// UploadManager is used to manage file uploads on an interval
|
||||
type UploadManager interface {
|
||||
Upload(string) error
|
||||
Start()
|
||||
}
|
7
awsuploader/uploader.go
Normal file
7
awsuploader/uploader.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package awsuploader
|
||||
|
||||
// Uploader the functions required to upload to a bucket
|
||||
type Uploader interface {
|
||||
//Upload a file to the bucket
|
||||
Upload(string) error
|
||||
}
|
Reference in New Issue
Block a user