xl: add quorum support for DeleteFile() (#1426)

Fixes #1396
This commit is contained in:
Bala FA 2016-04-30 07:13:18 +05:30 committed by Harshavardhana
parent dc45ea3946
commit a978975eea
2 changed files with 78 additions and 10 deletions

View File

@ -29,6 +29,7 @@ type xlMetaV1 struct {
Size int64 `json:"size"` Size int64 `json:"size"`
ModTime time.Time `json:"modTime"` ModTime time.Time `json:"modTime"`
Version int64 `json:"version"` Version int64 `json:"version"`
Deleted bool `json:"deleted"`
} `json:"stat"` } `json:"stat"`
Erasure struct { Erasure struct {
DataBlocks int `json:"data"` DataBlocks int `json:"data"`

View File

@ -18,6 +18,7 @@ package main
import ( import (
"fmt" "fmt"
"io"
"os" "os"
slashpath "path" slashpath "path"
"sort" "sort"
@ -547,10 +548,54 @@ func (xl XL) DeleteFile(volume, path string) error {
if !isValidPath(path) { if !isValidPath(path) {
return errInvalidArgument return errInvalidArgument
} }
// Lock right before reading from disk.
nsMutex.RLock(volume, path)
partsMetadata, errs := xl.getPartsMetadata(volume, path)
nsMutex.RUnlock(volume, path)
// List all the file versions on existing files.
versions, err := listFileVersions(partsMetadata, errs)
// Get highest file version.
higherVersion := highestInt(versions)
// Increment to have next higher version.
higherVersion++
// Take last meta data to use later
var mdata xlMetaV1
onlineDisksCount := 0
for index, metadata := range partsMetadata {
if errs[index] == nil {
mdata = metadata
onlineDisksCount++
}
}
if onlineDisksCount < xl.writeQuorum {
return errWriteQuorum
}
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
deleteMetaData := (onlineDisksCount == len(xl.storageDisks))
// Set higher version to indicate file operation
mdata.Stat.Version = higherVersion
mdata.Stat.Deleted = true
nsMutex.Lock(volume, path)
defer nsMutex.Unlock(volume, path)
// Loop through and delete each chunks. // Loop through and delete each chunks.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index)) if errs[index] != nil {
err := disk.DeleteFile(volume, erasureFilePart) continue
}
// TODO: errors can be allowed up to len(xl.storageDisks) - xl.writeQuorum
// TODO: Fix: meta data will be lost if deleteMetaData is true and anyone DeleteFile failure.
if deleteMetaData {
err = disk.DeleteFile(volume, xlMetaV1FilePath)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,
@ -558,8 +603,30 @@ func (xl XL) DeleteFile(volume, path string) error {
}).Errorf("DeleteFile failed with %s", err) }).Errorf("DeleteFile failed with %s", err)
return err return err
} }
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File) } else {
err = disk.DeleteFile(volume, xlMetaV1FilePath) var metadataWriter io.WriteCloser
metadataWriter, err = disk.CreateFile(volume, xlMetaV1FilePath)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("CreateFile failed with %s", err)
return err
}
err = mdata.Write(metadataWriter)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
"diskIndex": index,
}).Errorf("Writing metadata failed with %s", err)
return err
}
}
erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
err = disk.DeleteFile(volume, erasureFilePart)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,