Support file locking
TODO: Actually check for locks in cleanup.go
This commit is contained in:
parent
3f503442f1
commit
4512264e84
|
@ -16,6 +16,7 @@ import (
|
|||
type LocalfsBackend struct {
|
||||
metaPath string
|
||||
filesPath string
|
||||
locksPath string
|
||||
}
|
||||
|
||||
type MetadataJSON struct {
|
||||
|
@ -127,6 +128,29 @@ func (b LocalfsBackend) writeMetadata(key string, metadata backends.Metadata) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Lock(filename string) (err error) {
|
||||
lockPath := path.Join(b.locksPath, filename)
|
||||
|
||||
lock, err := os.Create(lockPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lock.Close()
|
||||
return
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Unlock(filename string) (err error) {
|
||||
lockPath := path.Join(b.locksPath, filename)
|
||||
|
||||
err = os.Remove(lockPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Put(key string, r io.Reader, expiry time.Time, deleteKey, accessKey string, srcIp string) (m backends.Metadata, err error) {
|
||||
filePath := path.Join(b.filesPath, key)
|
||||
|
||||
|
@ -201,9 +225,10 @@ func (b LocalfsBackend) List() ([]string, error) {
|
|||
return output, nil
|
||||
}
|
||||
|
||||
func NewLocalfsBackend(metaPath string, filesPath string) LocalfsBackend {
|
||||
func NewLocalfsBackend(metaPath string, filesPath string, locksPath string) LocalfsBackend {
|
||||
return LocalfsBackend{
|
||||
metaPath: metaPath,
|
||||
filesPath: filesPath,
|
||||
locksPath: locksPath,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package s3
|
|||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
|
@ -156,6 +157,16 @@ func unmapMetadata(input map[string]*string) (m backends.Metadata, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (b S3Backend) Lock(filename string) (err error) {
|
||||
log.Printf("Locking is not supported on S3")
|
||||
return
|
||||
}
|
||||
|
||||
func (b S3Backend) Unlock(filename string) (err error) {
|
||||
log.Printf("Locking is not supported on S3")
|
||||
return
|
||||
}
|
||||
|
||||
func (b S3Backend) Put(key string, r io.Reader, expiry time.Time, deleteKey, accessKey string, srcIp string) (m backends.Metadata, err error) {
|
||||
tmpDst, err := ioutil.TempFile("", "linx-server-upload")
|
||||
if err != nil {
|
||||
|
|
|
@ -12,6 +12,8 @@ type StorageBackend interface {
|
|||
Exists(key string) (bool, error)
|
||||
Head(key string) (Metadata, error)
|
||||
Get(key string) (Metadata, io.ReadCloser, error)
|
||||
Lock(filename string) (error)
|
||||
Unlock(filename string) (error)
|
||||
Put(key string, r io.Reader, expiry time.Time, deleteKey, accessKey string, srcIp string) (Metadata, error)
|
||||
PutMetadata(key string, m Metadata) error
|
||||
ServeFile(key string, w http.ResponseWriter, r *http.Request) error
|
||||
|
@ -25,3 +27,4 @@ type MetaStorageBackend interface {
|
|||
|
||||
var NotFoundErr = errors.New("File not found.")
|
||||
var FileEmptyError = errors.New("Empty file")
|
||||
var FileLockedError = errors.New("Locked file")
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
"github.com/andreimarcu/linx-server/expiry"
|
||||
)
|
||||
|
||||
func Cleanup(filesDir string, metaDir string, noLogs bool) {
|
||||
fileBackend := localfs.NewLocalfsBackend(metaDir, filesDir)
|
||||
func Cleanup(filesDir string, metaDir string, locksDir string, noLogs bool) {
|
||||
fileBackend := localfs.NewLocalfsBackend(metaDir, filesDir, locksDir)
|
||||
|
||||
files, err := fileBackend.List()
|
||||
if err != nil {
|
||||
|
@ -33,10 +33,10 @@ func Cleanup(filesDir string, metaDir string, noLogs bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func PeriodicCleanup(minutes time.Duration, filesDir string, metaDir string, noLogs bool) {
|
||||
func PeriodicCleanup(minutes time.Duration, filesDir string, metaDir string, locksDir string, noLogs bool) {
|
||||
c := time.Tick(minutes)
|
||||
for range c {
|
||||
Cleanup(filesDir, metaDir, noLogs)
|
||||
Cleanup(filesDir, metaDir, locksDir, noLogs)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
12
server.go
12
server.go
|
@ -43,6 +43,7 @@ var Config struct {
|
|||
bind string
|
||||
filesDir string
|
||||
metaDir string
|
||||
locksDir string
|
||||
siteName string
|
||||
siteURL string
|
||||
sitePath string
|
||||
|
@ -137,6 +138,11 @@ func setup() *web.Mux {
|
|||
log.Fatal("Could not create metadata directory:", err)
|
||||
}
|
||||
|
||||
err = os.MkdirAll(Config.locksDir, 0700)
|
||||
if err != nil {
|
||||
log.Fatal("Could not create locks directory:", err)
|
||||
}
|
||||
|
||||
if Config.siteURL != "" {
|
||||
// ensure siteURL ends wth '/'
|
||||
if lastChar := Config.siteURL[len(Config.siteURL)-1:]; lastChar != "/" {
|
||||
|
@ -161,9 +167,9 @@ func setup() *web.Mux {
|
|||
if Config.s3Bucket != "" {
|
||||
storageBackend = s3.NewS3Backend(Config.s3Bucket, Config.s3Region, Config.s3Endpoint, Config.s3ForcePathStyle)
|
||||
} else {
|
||||
storageBackend = localfs.NewLocalfsBackend(Config.metaDir, Config.filesDir)
|
||||
storageBackend = localfs.NewLocalfsBackend(Config.metaDir, Config.filesDir, Config.locksDir)
|
||||
if Config.cleanupEveryMinutes > 0 {
|
||||
go cleanup.PeriodicCleanup(time.Duration(Config.cleanupEveryMinutes)*time.Minute, Config.filesDir, Config.metaDir, Config.noLogs)
|
||||
go cleanup.PeriodicCleanup(time.Duration(Config.cleanupEveryMinutes)*time.Minute, Config.filesDir, Config.metaDir, Config.locksDir, Config.noLogs)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -249,6 +255,8 @@ func main() {
|
|||
"path to files directory")
|
||||
flag.StringVar(&Config.metaDir, "metapath", "meta/",
|
||||
"path to metadata directory")
|
||||
flag.StringVar(&Config.locksDir, "lockspath", "locks/",
|
||||
"path to metadata directory")
|
||||
flag.BoolVar(&Config.basicAuth, "basicauth", false,
|
||||
"allow logging by basic auth password")
|
||||
flag.BoolVar(&Config.noLogs, "nologs", false,
|
||||
|
|
16
upload.go
16
upload.go
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
|
@ -318,6 +319,13 @@ func processUpload(upReq UploadRequest) (upload Upload, err error) {
|
|||
return upload, errors.New("Prohibited filename")
|
||||
}
|
||||
|
||||
// Lock the upload
|
||||
log.Printf("Lock %s", upload.Filename)
|
||||
err = storageBackend.Lock(upload.Filename)
|
||||
if err != nil {
|
||||
return upload, err
|
||||
}
|
||||
|
||||
// Get the rest of the metadata needed for storage
|
||||
var fileExpiry time.Time
|
||||
maxDurationTime := time.Duration(Config.maxDurationTime) * time.Second
|
||||
|
@ -341,11 +349,19 @@ func processUpload(upReq UploadRequest) (upload Upload, err error) {
|
|||
if Config.disableAccessKey == true {
|
||||
upReq.accessKey = ""
|
||||
}
|
||||
log.Printf("Write %s", upload.Filename)
|
||||
upload.Metadata, err = storageBackend.Put(upload.Filename, io.MultiReader(bytes.NewReader(header), upReq.src), fileExpiry, upReq.deleteKey, upReq.accessKey, upReq.srcIp)
|
||||
if err != nil {
|
||||
return upload, err
|
||||
}
|
||||
|
||||
// Unlock the upload
|
||||
log.Printf("Unlock %s", upload.Filename)
|
||||
err = storageBackend.Unlock(upload.Filename)
|
||||
if err != nil {
|
||||
return upload, err
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue