From 4512264e8405bcdde10a20451a2a29ca21535a70 Mon Sep 17 00:00:00 2001 From: BBaoVanC Date: Thu, 23 Dec 2021 01:20:44 -0600 Subject: [PATCH] Support file locking TODO: Actually check for locks in cleanup.go --- backends/localfs/localfs.go | 27 ++++++++++++++++++++++++++- backends/s3/s3.go | 11 +++++++++++ backends/storage.go | 3 +++ cleanup/cleanup.go | 8 ++++---- server.go | 12 ++++++++++-- upload.go | 16 ++++++++++++++++ 6 files changed, 70 insertions(+), 7 deletions(-) diff --git a/backends/localfs/localfs.go b/backends/localfs/localfs.go index 3f48299..bd68d20 100644 --- a/backends/localfs/localfs.go +++ b/backends/localfs/localfs.go @@ -16,6 +16,7 @@ import ( type LocalfsBackend struct { metaPath string filesPath string + locksPath string } type MetadataJSON struct { @@ -127,6 +128,29 @@ func (b LocalfsBackend) writeMetadata(key string, metadata backends.Metadata) er return nil } +func (b LocalfsBackend) Lock(filename string) (err error) { + lockPath := path.Join(b.locksPath, filename) + + lock, err := os.Create(lockPath) + if err != nil { + return err + } + + lock.Close() + return +} + +func (b LocalfsBackend) Unlock(filename string) (err error) { + lockPath := path.Join(b.locksPath, filename) + + err = os.Remove(lockPath) + if err != nil { + return err + } + + return +} + func (b LocalfsBackend) Put(key string, r io.Reader, expiry time.Time, deleteKey, accessKey string, srcIp string) (m backends.Metadata, err error) { filePath := path.Join(b.filesPath, key) @@ -201,9 +225,10 @@ func (b LocalfsBackend) List() ([]string, error) { return output, nil } -func NewLocalfsBackend(metaPath string, filesPath string) LocalfsBackend { +func NewLocalfsBackend(metaPath string, filesPath string, locksPath string) LocalfsBackend { return LocalfsBackend{ metaPath: metaPath, filesPath: filesPath, + locksPath: locksPath, } } diff --git a/backends/s3/s3.go b/backends/s3/s3.go index 58ccd14..b48837f 100644 --- a/backends/s3/s3.go +++ b/backends/s3/s3.go @@ -3,6 +3,7 @@ package s3 import ( "io" "io/ioutil" + "log" "net/http" "os" "strconv" @@ -156,6 +157,16 @@ func unmapMetadata(input map[string]*string) (m backends.Metadata, err error) { return } +func (b S3Backend) Lock(filename string) (err error) { + log.Printf("Locking is not supported on S3") + return +} + +func (b S3Backend) Unlock(filename string) (err error) { + log.Printf("Locking is not supported on S3") + return +} + func (b S3Backend) Put(key string, r io.Reader, expiry time.Time, deleteKey, accessKey string, srcIp string) (m backends.Metadata, err error) { tmpDst, err := ioutil.TempFile("", "linx-server-upload") if err != nil { diff --git a/backends/storage.go b/backends/storage.go index 4e2ad6b..119ddb6 100644 --- a/backends/storage.go +++ b/backends/storage.go @@ -12,6 +12,8 @@ type StorageBackend interface { Exists(key string) (bool, error) Head(key string) (Metadata, error) Get(key string) (Metadata, io.ReadCloser, error) + Lock(filename string) (error) + Unlock(filename string) (error) Put(key string, r io.Reader, expiry time.Time, deleteKey, accessKey string, srcIp string) (Metadata, error) PutMetadata(key string, m Metadata) error ServeFile(key string, w http.ResponseWriter, r *http.Request) error @@ -25,3 +27,4 @@ type MetaStorageBackend interface { var NotFoundErr = errors.New("File not found.") var FileEmptyError = errors.New("Empty file") +var FileLockedError = errors.New("Locked file") diff --git a/cleanup/cleanup.go b/cleanup/cleanup.go index 5920c22..6230b00 100644 --- a/cleanup/cleanup.go +++ b/cleanup/cleanup.go @@ -8,8 +8,8 @@ import ( "github.com/andreimarcu/linx-server/expiry" ) -func Cleanup(filesDir string, metaDir string, noLogs bool) { - fileBackend := localfs.NewLocalfsBackend(metaDir, filesDir) +func Cleanup(filesDir string, metaDir string, locksDir string, noLogs bool) { + fileBackend := localfs.NewLocalfsBackend(metaDir, filesDir, locksDir) files, err := fileBackend.List() if err != nil { @@ -33,10 +33,10 @@ func Cleanup(filesDir string, metaDir string, noLogs bool) { } } -func PeriodicCleanup(minutes time.Duration, filesDir string, metaDir string, noLogs bool) { +func PeriodicCleanup(minutes time.Duration, filesDir string, metaDir string, locksDir string, noLogs bool) { c := time.Tick(minutes) for range c { - Cleanup(filesDir, metaDir, noLogs) + Cleanup(filesDir, metaDir, locksDir, noLogs) } } diff --git a/server.go b/server.go index 41cd932..b34dfd4 100644 --- a/server.go +++ b/server.go @@ -43,6 +43,7 @@ var Config struct { bind string filesDir string metaDir string + locksDir string siteName string siteURL string sitePath string @@ -137,6 +138,11 @@ func setup() *web.Mux { log.Fatal("Could not create metadata directory:", err) } + err = os.MkdirAll(Config.locksDir, 0700) + if err != nil { + log.Fatal("Could not create locks directory:", err) + } + if Config.siteURL != "" { // ensure siteURL ends wth '/' if lastChar := Config.siteURL[len(Config.siteURL)-1:]; lastChar != "/" { @@ -161,9 +167,9 @@ func setup() *web.Mux { if Config.s3Bucket != "" { storageBackend = s3.NewS3Backend(Config.s3Bucket, Config.s3Region, Config.s3Endpoint, Config.s3ForcePathStyle) } else { - storageBackend = localfs.NewLocalfsBackend(Config.metaDir, Config.filesDir) + storageBackend = localfs.NewLocalfsBackend(Config.metaDir, Config.filesDir, Config.locksDir) if Config.cleanupEveryMinutes > 0 { - go cleanup.PeriodicCleanup(time.Duration(Config.cleanupEveryMinutes)*time.Minute, Config.filesDir, Config.metaDir, Config.noLogs) + go cleanup.PeriodicCleanup(time.Duration(Config.cleanupEveryMinutes)*time.Minute, Config.filesDir, Config.metaDir, Config.locksDir, Config.noLogs) } } @@ -249,6 +255,8 @@ func main() { "path to files directory") flag.StringVar(&Config.metaDir, "metapath", "meta/", "path to metadata directory") + flag.StringVar(&Config.locksDir, "lockspath", "locks/", + "path to metadata directory") flag.BoolVar(&Config.basicAuth, "basicauth", false, "allow logging by basic auth password") flag.BoolVar(&Config.noLogs, "nologs", false, diff --git a/upload.go b/upload.go index 8e5fc91..87ef6ba 100644 --- a/upload.go +++ b/upload.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "log" "net/http" "net/url" "path" @@ -318,6 +319,13 @@ func processUpload(upReq UploadRequest) (upload Upload, err error) { return upload, errors.New("Prohibited filename") } + // Lock the upload + log.Printf("Lock %s", upload.Filename) + err = storageBackend.Lock(upload.Filename) + if err != nil { + return upload, err + } + // Get the rest of the metadata needed for storage var fileExpiry time.Time maxDurationTime := time.Duration(Config.maxDurationTime) * time.Second @@ -341,11 +349,19 @@ func processUpload(upReq UploadRequest) (upload Upload, err error) { if Config.disableAccessKey == true { upReq.accessKey = "" } + log.Printf("Write %s", upload.Filename) upload.Metadata, err = storageBackend.Put(upload.Filename, io.MultiReader(bytes.NewReader(header), upReq.src), fileExpiry, upReq.deleteKey, upReq.accessKey, upReq.srcIp) if err != nil { return upload, err } + // Unlock the upload + log.Printf("Unlock %s", upload.Filename) + err = storageBackend.Unlock(upload.Filename) + if err != nil { + return upload, err + } + return }