xl/putObject: Should take care of the situation if an object already exists at the location. (#1606)

Fixes  #1598 #1594 #1595
This commit is contained in:
Krishna Srinivas 2016-05-14 00:22:36 +05:30 committed by Harshavardhana
parent d267696110
commit 8099396ff0
6 changed files with 301 additions and 176 deletions

View File

@ -17,6 +17,8 @@
package main
import (
"crypto/md5"
"encoding/hex"
"io"
"path/filepath"
"strings"
@ -143,7 +145,73 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// PutObject - create an object.
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
return putObjectCommon(fs.storage, bucket, object, size, data, metadata)
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
if !isBucketExist(fs.storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
}
fileWriter, err := fs.storage.CreateFile(bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", toObjectErr(err, bucket, object)
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", toObjectErr(err, bucket, object)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
// md5Hex representation.
var md5Hex string
if len(metadata) != 0 {
md5Hex = metadata["md5Sum"]
}
if md5Hex != "" {
if newMD5Hex != md5Hex {
if err = safeCloseAndRemove(fileWriter); err != nil {
return "", err
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
}
func (fs fsObjects) DeleteObject(bucket, object string) error {

View File

@ -17,10 +17,6 @@
package main
import (
"crypto/md5"
"encoding/hex"
"io"
"path"
"sort"
"strings"
@ -142,85 +138,6 @@ func deleteBucket(storage StorageAPI, bucket string) error {
return nil
}
// putObjectCommon - create an object, is a common function for both object layers.
func putObjectCommon(storage StorageAPI, bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
if !isBucketExist(storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
}
tempObj := path.Join(tmpMetaPrefix, bucket, object)
fileWriter, err := storage.CreateFile(minioMetaBucket, tempObj)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", toObjectErr(err, bucket, object)
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", toObjectErr(err, bucket, object)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
// md5Hex representation.
var md5Hex string
if len(metadata) != 0 {
md5Hex = metadata["md5Sum"]
}
if md5Hex != "" {
if newMD5Hex != md5Hex {
if err = safeCloseAndRemove(fileWriter); err != nil {
return "", err
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
err = storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
if derr := storage.DeleteFile(minioMetaBucket, tempObj); derr != nil {
return "", derr
}
return "", toObjectErr(err, bucket, object)
}
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
}
func listObjectsCommon(layer ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var storage StorageAPI
switch l := layer.(type) {

View File

@ -650,7 +650,30 @@ func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) err
}).Errorf("getVolumeDir failed with %s", err)
return err
}
if err = os.MkdirAll(slashpath.Join(dstVolumeDir, slashpath.Dir(dstPath)), 0755); err != nil {
srcIsDir := strings.HasSuffix(srcPath, slashSeparator)
dstIsDir := strings.HasSuffix(dstPath, slashSeparator)
// for XL src and dst are always directories.
// for FS src and dst are always files.
if !(srcIsDir && dstIsDir || !srcIsDir && !dstIsDir) {
// Either src and dst have to be directories or files, else return error.
log.Errorf("source and destination are not of same file type. source=%s, destination=%s", srcPath, dstPath)
return errFileAccessDenied
}
if srcIsDir {
// If source is a directory we expect the destination to be non-existent always.
_, err = os.Stat(slashpath.Join(dstVolumeDir, dstPath))
if err == nil {
log.Errorf("Source is a directory and destination exists. source=%s, destination=%s", srcPath, dstPath)
return errFileAccessDenied
}
if !os.IsNotExist(err) {
// Return error for any error other than ENOENT.
log.Errorf("Stat failed with %s", err)
return err
}
// Destination does not exist, hence proceed with the rename.
}
if err = os.MkdirAll(slashpath.Dir(slashpath.Join(dstVolumeDir, dstPath)), 0755); err != nil {
// File path cannot be verified since one of the parents is a file.
if strings.Contains(err.Error(), "not a directory") {
return errFileAccessDenied

View File

@ -628,54 +628,18 @@ func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
defer nsMutex.Unlock(dstVolume, dstPath)
errCount := 0
for index, disk := range xl.storageDisks {
// Make sure to rename all the files only, not directories.
srcErasurePartPath := slashpath.Join(srcPath, fmt.Sprintf("file.%d", index))
dstErasurePartPath := slashpath.Join(dstPath, fmt.Sprintf("file.%d", index))
err := disk.RenameFile(srcVolume, srcErasurePartPath, dstVolume, dstErasurePartPath)
if err != nil {
log.WithFields(logrus.Fields{
"srcVolume": srcVolume,
"srcPath": srcErasurePartPath,
"dstVolume": dstVolume,
"dstPath": dstErasurePartPath,
}).Errorf("RenameFile failed with %s", err)
errCount++
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
return err
}
srcXLMetaPath := slashpath.Join(srcPath, xlMetaV1File)
dstXLMetaPath := slashpath.Join(dstPath, xlMetaV1File)
err = disk.RenameFile(srcVolume, srcXLMetaPath, dstVolume, dstXLMetaPath)
if err != nil {
log.WithFields(logrus.Fields{
"srcVolume": srcVolume,
"srcPath": srcXLMetaPath,
"dstVolume": dstVolume,
"dstPath": dstXLMetaPath,
}).Errorf("RenameFile failed with %s", err)
errCount++
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
return err
}
err = disk.DeleteFile(srcVolume, srcPath)
for _, disk := range xl.storageDisks {
// Append "/" as srcPath and dstPath are either leaf-dirs or non-leaf-dris.
// If srcPath is an object instead of prefix we just rename the leaf-dir and
// not rename the part and metadata files separately.
err := disk.RenameFile(srcVolume, retainSlash(srcPath), dstVolume, retainSlash(dstPath))
if err != nil {
log.WithFields(logrus.Fields{
"srcVolume": srcVolume,
"srcPath": srcPath,
}).Errorf("DeleteFile failed with %s", err)
"dstVolume": dstVolume,
"dstPath": dstPath,
}).Errorf("RenameFile failed with %s", err)
errCount++
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum

View File

@ -92,6 +92,29 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
return listObjectPartsCommon(xl.storage, bucket, object, uploadID, partNumberMarker, maxParts)
}
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist.
func (xl xlObjects) parentDirIsObject(bucket, parent string) error {
var stat func(string) error
stat = func(p string) error {
if p == "." {
return nil
}
_, err := xl.getObjectInfo(bucket, p)
if err == nil {
// If there is already a file at prefix "p" return error.
return errFileAccessDenied
}
if err == errFileNotFound {
// Check if there is a file as one of the parent paths.
return stat(path.Dir(p))
}
return err
}
return stat(parent)
}
func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@ -149,13 +172,18 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
metadata.Size += fi.Size
}
// check if an object is present as one of the parent dir.
if err = xl.parentDirIsObject(bucket, path.Dir(object)); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Save successfully calculated md5sum.
metadata.MD5Sum = s3MD5
// Save modTime as well as the current time.
metadata.ModTime = time.Now().UTC()
// Create temporary multipart meta file to write and then rename.
tempMultipartMetaFile := path.Join(tmpMetaPrefix, bucket, object, multipartMetaFile)
tempMultipartMetaFile := path.Join(tmpMetaPrefix, bucket, object, uploadID, multipartMetaFile)
w, err := xl.storage.CreateFile(minioMetaBucket, tempMultipartMetaFile)
if err != nil {
return "", toObjectErr(err, bucket, object)
@ -164,21 +192,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
err = encoder.Encode(&metadata)
if err != nil {
if err = safeCloseAndRemove(w); err != nil {
return "", err
return "", toObjectErr(err, bucket, object)
}
return "", err
return "", toObjectErr(err, bucket, object)
}
// Close the writer.
if err = w.Close(); err != nil {
if err = safeCloseAndRemove(w); err != nil {
return "", err
return "", toObjectErr(err, bucket, object)
}
return "", err
return "", toObjectErr(err, bucket, object)
}
// Attempt a Rename of multipart meta file to final namespace.
multipartObjFile := path.Join(object, multipartMetaFile)
err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, bucket, multipartObjFile)
multipartObjFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, multipartMetaFile)
err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, minioMetaBucket, multipartObjFile)
if err != nil {
if derr := xl.storage.DeleteFile(minioMetaBucket, tempMultipartMetaFile); derr != nil {
return "", toObjectErr(err, minioMetaBucket, tempMultipartMetaFile)
@ -192,11 +220,11 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
go func(index int, part completePart) {
defer wg.Done()
partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber))
errs[index] = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix)
src := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
dst := path.Join(mpartMetaPrefix, bucket, object, uploadID, partNumToPartFileName(part.PartNumber))
errs[index] = xl.storage.RenameFile(minioMetaBucket, src, minioMetaBucket, dst)
if errs[index] != nil {
log.Errorf("Unable to rename file %s to %s, failed with %s", multipartPartFile, multipartObjSuffix, errs[index])
log.Errorf("Unable to rename file %s to %s, failed with %s", src, dst, errs[index])
}
}(index, part)
}
@ -218,6 +246,18 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Delete if an object already exists.
// FIXME: rename it to tmp file and delete only after
// the newly uploaded file is renamed from tmp location to
// the original location.
err = xl.deleteObject(bucket, object)
if err != nil && err != errFileNotFound {
return "", toObjectErr(err, bucket, object)
}
if err = xl.storage.RenameFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID), bucket, object); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'.
var entries []string
@ -226,6 +266,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return s3MD5, nil
}
}
uploadsJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
err = xl.storage.DeleteFile(minioMetaBucket, uploadsJSONPath)
if err != nil {

View File

@ -17,10 +17,13 @@
package main
import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"path"
"path/filepath"
"strings"
"sync"
@ -223,29 +226,19 @@ func getMultipartObjectInfo(storage StorageAPI, bucket, object string) (info Mul
return info, nil
}
// GetObjectInfo - get object info.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
if !isBucketExist(xl.storage, bucket) {
return ObjectInfo{}, BucketNotFound{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Return ObjectInfo.
func (xl xlObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) {
// First see if the object was a simple-PUT upload.
fi, err := xl.storage.StatFile(bucket, object)
if err != nil {
if err != errFileNotFound {
return ObjectInfo{}, toObjectErr(err, bucket, object)
return ObjectInfo{}, err
}
var info MultipartObjectInfo
// Check if the object was multipart upload.
info, err = getMultipartObjectInfo(xl.storage, bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
return ObjectInfo{}, err
}
fi.Size = info.Size
fi.ModTime = info.ModTime
@ -269,9 +262,118 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
}, nil
}
// GetObjectInfo - get object info.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
if !isBucketExist(xl.storage, bucket) {
return ObjectInfo{}, BucketNotFound{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
info, err := xl.getObjectInfo(bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return info, nil
}
// PutObject - create an object.
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
return putObjectCommon(xl.storage, bucket, object, size, data, metadata)
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
if !isBucketExist(xl.storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
}
tempObj := path.Join(tmpMetaPrefix, bucket, object)
fileWriter, err := xl.storage.CreateFile(minioMetaBucket, tempObj)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
// md5Hex representation.
var md5Hex string
if len(metadata) != 0 {
md5Hex = metadata["md5Sum"]
}
if md5Hex != "" {
if newMD5Hex != md5Hex {
if err = safeCloseAndRemove(fileWriter); err != nil {
return "", toObjectErr(err, bucket, object)
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
// check if an object is present as one of the parent dir.
if err = xl.parentDirIsObject(bucket, path.Dir(object)); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Delete if an object already exists.
// FIXME: rename it to tmp file and delete only after
// the newly uploaded file is renamed from tmp location to
// the original location.
err = xl.deleteObject(bucket, object)
if err != nil && err != errFileNotFound {
return "", toObjectErr(err, bucket, object)
}
err = xl.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
if derr := xl.storage.DeleteFile(minioMetaBucket, tempObj); derr != nil {
return "", toObjectErr(derr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
}
// isMultipartObject - verifies if an object is special multipart file.
@ -286,30 +388,21 @@ func isMultipartObject(storage StorageAPI, bucket, object string) (bool, error)
return true, nil
}
func (xl xlObjects) DeleteObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(xl.storage, bucket) {
return BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Deletes and object.
func (xl xlObjects) deleteObject(bucket, object string) error {
// Verify if the object is a multipart object.
if ok, err := isMultipartObject(xl.storage, bucket, object); err != nil {
return toObjectErr(err, bucket, object)
return err
} else if !ok {
if err = xl.storage.DeleteFile(bucket, object); err != nil {
return toObjectErr(err, bucket, object)
return err
}
return nil
}
// Get parts info.
info, err := getMultipartObjectInfo(xl.storage, bucket, object)
if err != nil {
return toObjectErr(err, bucket, object)
return err
}
// Range through all files and delete it.
var wg = &sync.WaitGroup{}
@ -325,16 +418,35 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
}
// Wait for all the deletes to finish.
wg.Wait()
// Loop through and validate if any errors, return back the first
// error occurred.
for index, err := range errs {
// Loop through and validate if any errors, if we are unable to remove any part return
// "unexpected" error as returning any other error might be misleading. For ex.
// if DeleteFile() had returned errFileNotFound and we return it, then client would see
// ObjectNotFound which is misleading.
for _, err := range errs {
if err != nil {
partFileName := partNumToPartFileName(info.Parts[index].PartNumber)
return toObjectErr(err, bucket, pathJoin(object, partFileName))
return errUnexpected
}
}
err = xl.storage.DeleteFile(bucket, pathJoin(object, multipartMetaFile))
if err != nil {
return err
}
return nil
}
// DeleteObject - delete the object.
func (xl xlObjects) DeleteObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(xl.storage, bucket) {
return BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
if err := xl.deleteObject(bucket, object); err != nil {
return toObjectErr(err, bucket, object)
}
return nil