mirror of
https://github.com/minio/minio.git
synced 2025-01-25 21:53:16 -05:00
Avoid removing 'tmp' directory inside '.minio.sys' (#3294)
This commit is contained in:
parent
2c3a2241e7
commit
ffbee70e04
@ -18,7 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"path"
|
||||
"sort"
|
||||
)
|
||||
|
||||
@ -93,17 +92,17 @@ func readFSMetadata(disk StorageAPI, bucket, filePath string) (fsMeta fsMetaV1,
|
||||
|
||||
// Write fsMeta to fs.json or fs-append.json.
|
||||
func writeFSMetadata(disk StorageAPI, bucket, filePath string, fsMeta fsMetaV1) error {
|
||||
tmpPath := path.Join(tmpMetaPrefix, getUUID())
|
||||
tmpPath := getUUID()
|
||||
metadataBytes, err := json.Marshal(fsMeta)
|
||||
if err != nil {
|
||||
return traceError(err)
|
||||
}
|
||||
if err = disk.AppendFile(minioMetaBucket, tmpPath, metadataBytes); err != nil {
|
||||
if err = disk.AppendFile(minioMetaTmpBucket, tmpPath, metadataBytes); err != nil {
|
||||
return traceError(err)
|
||||
}
|
||||
err = disk.RenameFile(minioMetaBucket, tmpPath, bucket, filePath)
|
||||
err = disk.RenameFile(minioMetaTmpBucket, tmpPath, bucket, filePath)
|
||||
if err != nil {
|
||||
err = disk.DeleteFile(minioMetaBucket, tmpPath)
|
||||
err = disk.DeleteFile(minioMetaTmpBucket, tmpPath)
|
||||
if err != nil {
|
||||
return traceError(err)
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool {
|
||||
func (fs fsObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error {
|
||||
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
||||
uniqueID := getUUID()
|
||||
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
||||
tmpUploadsPath := uniqueID
|
||||
|
||||
uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
|
||||
if errorCause(err) == errFileNotFound {
|
||||
|
@ -289,12 +289,12 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo,
|
||||
// Returns metadata path for the file holding info about the parts that
|
||||
// have been appended to the "append-file"
|
||||
func getFSAppendMetaPath(uploadID string) string {
|
||||
return path.Join(tmpMetaPrefix, uploadID+".json")
|
||||
return uploadID + ".json"
|
||||
}
|
||||
|
||||
// Returns path for the append-file.
|
||||
func getFSAppendDataPath(uploadID string) string {
|
||||
return path.Join(tmpMetaPrefix, uploadID+".data")
|
||||
return uploadID + ".data"
|
||||
}
|
||||
|
||||
// Append parts to fsAppendDataFile.
|
||||
@ -303,8 +303,8 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
|
||||
// In case of any error, cleanup the append data and json files
|
||||
// from the tmp so that we do not have any inconsistent append
|
||||
// data/json files.
|
||||
disk.DeleteFile(bucket, getFSAppendDataPath(uploadID))
|
||||
disk.DeleteFile(bucket, getFSAppendMetaPath(uploadID))
|
||||
disk.DeleteFile(minioMetaTmpBucket, getFSAppendDataPath(uploadID))
|
||||
disk.DeleteFile(minioMetaTmpBucket, getFSAppendMetaPath(uploadID))
|
||||
}
|
||||
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
||||
// fs-append.json path
|
||||
@ -322,11 +322,11 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
|
||||
}
|
||||
|
||||
// Lock fs-append.json so that there is no parallel append to the file.
|
||||
appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath)
|
||||
appendPathLock := nsMutex.NewNSLock(minioMetaTmpBucket, fsAppendMetaPath)
|
||||
appendPathLock.Lock()
|
||||
defer appendPathLock.Unlock()
|
||||
|
||||
fsAppendMeta, err := readFSMetadata(disk, minioMetaBucket, fsAppendMetaPath)
|
||||
fsAppendMeta, err := readFSMetadata(disk, minioMetaTmpBucket, fsAppendMetaPath)
|
||||
if err != nil {
|
||||
if errorCause(err) != errFileNotFound {
|
||||
cleanupAppendPaths()
|
||||
@ -362,7 +362,7 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
|
||||
var n int64
|
||||
n, err = disk.ReadFile(minioMetaBucket, partPath, offset, buf[:curLeft])
|
||||
if n > 0 {
|
||||
if err = disk.AppendFile(minioMetaBucket, fsAppendDataPath, buf[:n]); err != nil {
|
||||
if err = disk.AppendFile(minioMetaTmpBucket, fsAppendDataPath, buf[:n]); err != nil {
|
||||
cleanupAppendPaths()
|
||||
return
|
||||
}
|
||||
@ -379,7 +379,7 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
|
||||
}
|
||||
fsAppendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size)
|
||||
// Overwrite previous fs-append.json
|
||||
if err = writeFSMetadata(disk, minioMetaBucket, fsAppendMetaPath, fsAppendMeta); err != nil {
|
||||
if err = writeFSMetadata(disk, minioMetaTmpBucket, fsAppendMetaPath, fsAppendMeta); err != nil {
|
||||
cleanupAppendPaths()
|
||||
return
|
||||
}
|
||||
@ -419,7 +419,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
}
|
||||
|
||||
partSuffix := fmt.Sprintf("object%d", partID)
|
||||
tmpPartPath := path.Join(tmpMetaPrefix, uploadID+"."+getUUID()+"."+partSuffix)
|
||||
tmpPartPath := uploadID + "." + getUUID() + "." + partSuffix
|
||||
|
||||
// Initialize md5 writer.
|
||||
md5Writer := md5.New()
|
||||
@ -451,28 +451,28 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
|
||||
if size > 0 {
|
||||
// Prepare file to avoid disk fragmentation
|
||||
err := fs.storage.PrepareFile(minioMetaBucket, tmpPartPath, size)
|
||||
err := fs.storage.PrepareFile(minioMetaTmpBucket, tmpPartPath, size)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
|
||||
return "", toObjectErr(err, minioMetaTmpBucket, tmpPartPath)
|
||||
}
|
||||
}
|
||||
|
||||
bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tmpPartPath)
|
||||
bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaTmpBucket, tmpPartPath)
|
||||
if cErr != nil {
|
||||
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
|
||||
return "", toObjectErr(cErr, minioMetaBucket, tmpPartPath)
|
||||
fs.storage.DeleteFile(minioMetaTmpBucket, tmpPartPath)
|
||||
return "", toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath)
|
||||
}
|
||||
// Should return IncompleteBody{} error when reader has fewer
|
||||
// bytes than specified in request header.
|
||||
if bytesWritten < size {
|
||||
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
|
||||
fs.storage.DeleteFile(minioMetaTmpBucket, tmpPartPath)
|
||||
return "", traceError(IncompleteBody{})
|
||||
}
|
||||
|
||||
// Delete temporary part in case of failure. If
|
||||
// PutObjectPart succeeds then there would be nothing to
|
||||
// delete.
|
||||
defer fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
|
||||
defer fs.storage.DeleteFile(minioMetaTmpBucket, tmpPartPath)
|
||||
|
||||
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
||||
if md5Hex != "" {
|
||||
@ -506,7 +506,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
|
||||
|
||||
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
|
||||
err = fs.storage.RenameFile(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
|
||||
err = fs.storage.RenameFile(minioMetaTmpBucket, tmpPartPath, minioMetaBucket, partPath)
|
||||
if err != nil {
|
||||
return "", toObjectErr(traceError(err), minioMetaBucket, partPath)
|
||||
}
|
||||
@ -653,7 +653,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// fs-append.json path
|
||||
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
|
||||
// Lock fs-append.json so that no parallel appendParts() is being done.
|
||||
appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath)
|
||||
appendPathLock := nsMutex.NewNSLock(minioMetaTmpBucket, fsAppendMetaPath)
|
||||
appendPathLock.Lock()
|
||||
defer appendPathLock.Unlock()
|
||||
|
||||
@ -672,16 +672,16 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
|
||||
}
|
||||
|
||||
fsAppendMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsAppendMetaPath)
|
||||
fsAppendMeta, err := readFSMetadata(fs.storage, minioMetaTmpBucket, fsAppendMetaPath)
|
||||
if err == nil && isPartsSame(fsAppendMeta.Parts, parts) {
|
||||
if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil {
|
||||
return "", toObjectErr(traceError(err), minioMetaBucket, fsAppendDataPath)
|
||||
if err = fs.storage.RenameFile(minioMetaTmpBucket, fsAppendDataPath, bucket, object); err != nil {
|
||||
return "", toObjectErr(traceError(err), minioMetaTmpBucket, fsAppendDataPath)
|
||||
}
|
||||
} else {
|
||||
// Remove append data temporary file since it is no longer needed at this point
|
||||
fs.storage.DeleteFile(minioMetaBucket, fsAppendDataPath)
|
||||
fs.storage.DeleteFile(minioMetaTmpBucket, fsAppendDataPath)
|
||||
|
||||
tempObj := path.Join(tmpMetaPrefix, uploadID+"-"+"part.1")
|
||||
tempObj := uploadID + "-" + "part.1"
|
||||
|
||||
// Allocate staging buffer.
|
||||
var buf = make([]byte, readSizeV1)
|
||||
@ -693,7 +693,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
}
|
||||
if objSize > 0 {
|
||||
// Prepare file to avoid disk fragmentation
|
||||
err = fs.storage.PrepareFile(minioMetaBucket, tempObj, objSize)
|
||||
err = fs.storage.PrepareFile(minioMetaTmpBucket, tempObj, objSize)
|
||||
if err != nil {
|
||||
return "", traceError(err)
|
||||
}
|
||||
@ -729,8 +729,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
var n int64
|
||||
n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf[:curLeft])
|
||||
if n > 0 {
|
||||
if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil {
|
||||
return "", toObjectErr(traceError(err), minioMetaBucket, tempObj)
|
||||
if err = fs.storage.AppendFile(minioMetaTmpBucket, tempObj, buf[:n]); err != nil {
|
||||
return "", toObjectErr(traceError(err), minioMetaTmpBucket, tempObj)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@ -748,17 +748,17 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
}
|
||||
|
||||
// Rename the file back to original location, if not delete the temporary object.
|
||||
err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
|
||||
err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object)
|
||||
if err != nil {
|
||||
if dErr := fs.storage.DeleteFile(minioMetaBucket, tempObj); dErr != nil {
|
||||
return "", toObjectErr(traceError(dErr), minioMetaBucket, tempObj)
|
||||
if dErr := fs.storage.DeleteFile(minioMetaTmpBucket, tempObj); dErr != nil {
|
||||
return "", toObjectErr(traceError(dErr), minioMetaTmpBucket, tempObj)
|
||||
}
|
||||
return "", toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the append-file metadata file in tmp location as we no longer need it.
|
||||
fs.storage.DeleteFile(minioMetaBucket, fsAppendMetaPath)
|
||||
fs.storage.DeleteFile(minioMetaTmpBucket, fsAppendMetaPath)
|
||||
|
||||
// No need to save part info, since we have concatenated all parts.
|
||||
fsMeta.Parts = nil
|
||||
@ -853,7 +853,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
|
||||
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
|
||||
// Lock fs-append.json so that no parallel appendParts() is being done.
|
||||
appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath)
|
||||
appendPathLock := nsMutex.NewNSLock(minioMetaTmpBucket, fsAppendMetaPath)
|
||||
appendPathLock.Lock()
|
||||
defer appendPathLock.Unlock()
|
||||
|
||||
|
18
cmd/fs-v1.go
18
cmd/fs-v1.go
@ -353,7 +353,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
// Uploaded object will first be written to the temporary location which will eventually
|
||||
// be renamed to the actual location. It is first written to the temporary location
|
||||
// so that cleaning it up will be easy if the server goes down.
|
||||
tempObj := path.Join(tmpMetaPrefix, uniqueID)
|
||||
tempObj := uniqueID
|
||||
|
||||
// Initialize md5 writer.
|
||||
md5Writer := md5.New()
|
||||
@ -379,16 +379,16 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
|
||||
if size == 0 {
|
||||
// For size 0 we write a 0byte file.
|
||||
err = fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
|
||||
err = fs.storage.AppendFile(minioMetaTmpBucket, tempObj, []byte(""))
|
||||
if err != nil {
|
||||
fs.storage.DeleteFile(minioMetaBucket, tempObj)
|
||||
fs.storage.DeleteFile(minioMetaTmpBucket, tempObj)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
} else {
|
||||
|
||||
// Prepare file to avoid disk fragmentation
|
||||
if size > 0 {
|
||||
err = fs.storage.PrepareFile(minioMetaBucket, tempObj, size)
|
||||
err = fs.storage.PrepareFile(minioMetaTmpBucket, tempObj, size)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -402,9 +402,9 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
buf := make([]byte, int(bufSize))
|
||||
teeReader := io.TeeReader(limitDataReader, multiWriter)
|
||||
var bytesWritten int64
|
||||
bytesWritten, err = fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj)
|
||||
bytesWritten, err = fsCreateFile(fs.storage, teeReader, buf, minioMetaTmpBucket, tempObj)
|
||||
if err != nil {
|
||||
fs.storage.DeleteFile(minioMetaBucket, tempObj)
|
||||
fs.storage.DeleteFile(minioMetaTmpBucket, tempObj)
|
||||
errorIf(err, "Failed to create object %s/%s", bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -412,14 +412,14 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
// Should return IncompleteBody{} error when reader has fewer
|
||||
// bytes than specified in request header.
|
||||
if bytesWritten < size {
|
||||
fs.storage.DeleteFile(minioMetaBucket, tempObj)
|
||||
fs.storage.DeleteFile(minioMetaTmpBucket, tempObj)
|
||||
return ObjectInfo{}, traceError(IncompleteBody{})
|
||||
}
|
||||
}
|
||||
// Delete the temporary object in the case of a
|
||||
// failure. If PutObject succeeds, then there would be
|
||||
// nothing to delete.
|
||||
defer fs.storage.DeleteFile(minioMetaBucket, tempObj)
|
||||
defer fs.storage.DeleteFile(minioMetaTmpBucket, tempObj)
|
||||
|
||||
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
||||
// Update the md5sum if not set with the newly calculated one.
|
||||
@ -449,7 +449,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
|
||||
err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
|
||||
err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
|
@ -325,9 +325,9 @@ func testObjectAPIPutObjectStaleFiles(obj ObjectLayer, instanceType string, disk
|
||||
}
|
||||
|
||||
for _, disk := range disks {
|
||||
tmpMetaDir := path.Join(disk, minioMetaBucket, tmpMetaPrefix)
|
||||
tmpMetaDir := path.Join(disk, minioMetaTmpBucket)
|
||||
if !isDirEmpty(tmpMetaDir) {
|
||||
t.Fatalf("%s: expected: empty, got: non-empty", tmpMetaDir)
|
||||
t.Fatalf("%s: expected: empty, got: non-empty", minioMetaTmpBucket)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -392,7 +392,7 @@ func testObjectAPIMultipartPutObjectStaleFiles(obj ObjectLayer, instanceType str
|
||||
}
|
||||
|
||||
for _, disk := range disks {
|
||||
tmpMetaDir := path.Join(disk, minioMetaBucket, tmpMetaPrefix)
|
||||
tmpMetaDir := path.Join(disk, minioMetaTmpBucket)
|
||||
files, err := ioutil.ReadDir(tmpMetaDir)
|
||||
if err != nil {
|
||||
// Its OK to have non-existen tmpMetaDir.
|
||||
|
@ -79,7 +79,7 @@ func houseKeeping(storageDisks []StorageAPI) error {
|
||||
defer wg.Done()
|
||||
|
||||
// Cleanup all temp entries upon start.
|
||||
err := cleanupDir(disk, minioMetaBucket, tmpMetaPrefix)
|
||||
err := cleanupDir(disk, minioMetaTmpBucket, "")
|
||||
if err != nil {
|
||||
switch errorCause(err) {
|
||||
case errDiskNotFound, errVolumeNotFound, errFileNotFound:
|
||||
@ -98,7 +98,7 @@ func houseKeeping(storageDisks []StorageAPI) error {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
return toObjectErr(err, minioMetaBucket, tmpMetaPrefix)
|
||||
return toObjectErr(err, minioMetaTmpBucket, "*")
|
||||
}
|
||||
|
||||
// Return success here.
|
||||
@ -224,6 +224,17 @@ func initMetaVolume(storageDisks []StorageAPI) error {
|
||||
default:
|
||||
errs[index] = err
|
||||
}
|
||||
return
|
||||
}
|
||||
err = disk.MakeVol(minioMetaTmpBucket)
|
||||
if err != nil {
|
||||
switch err {
|
||||
// Ignored errors.
|
||||
case errVolumeExists, errDiskNotFound, errFaultyDisk:
|
||||
default:
|
||||
errs[index] = err
|
||||
}
|
||||
return
|
||||
}
|
||||
}(index, disk)
|
||||
}
|
||||
|
@ -66,11 +66,11 @@ func TestHouseKeeping(t *testing.T) {
|
||||
if errs[index] != nil {
|
||||
return
|
||||
}
|
||||
errs[index] = store.MakeVol(pathJoin(minioMetaBucket, tmpMetaPrefix))
|
||||
errs[index] = store.MakeVol(minioMetaTmpBucket)
|
||||
if errs[index] != nil {
|
||||
return
|
||||
}
|
||||
errs[index] = store.AppendFile(pathJoin(minioMetaBucket, tmpMetaPrefix), "hello.txt", []byte("hello"))
|
||||
errs[index] = store.AppendFile(minioMetaTmpBucket, "hello.txt", []byte("hello"))
|
||||
}(i, store)
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -113,12 +113,12 @@ func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI)
|
||||
|
||||
// Write `uploads.json` to disk. First to tmp location and
|
||||
// then rename.
|
||||
if wErr = disk.AppendFile(minioMetaBucket, tmpPath, uplBytes); wErr != nil {
|
||||
if wErr = disk.AppendFile(minioMetaTmpBucket, tmpPath, uplBytes); wErr != nil {
|
||||
return traceError(wErr)
|
||||
}
|
||||
wErr = disk.RenameFile(minioMetaBucket, tmpPath, minioMetaBucket, uploadsPath)
|
||||
wErr = disk.RenameFile(minioMetaTmpBucket, tmpPath, minioMetaBucket, uploadsPath)
|
||||
if wErr != nil {
|
||||
if dErr := disk.DeleteFile(minioMetaBucket, tmpPath); dErr != nil {
|
||||
if dErr := disk.DeleteFile(minioMetaTmpBucket, tmpPath); dErr != nil {
|
||||
// we return the most recent error.
|
||||
return traceError(dErr)
|
||||
}
|
||||
|
@ -34,8 +34,8 @@ const (
|
||||
minioMetaBucket = ".minio.sys"
|
||||
// Multipart meta prefix.
|
||||
mpartMetaPrefix = "multipart"
|
||||
// Tmp meta prefix.
|
||||
tmpMetaPrefix = "tmp"
|
||||
// Minio Tmp meta prefix.
|
||||
minioMetaTmpBucket = minioMetaBucket + "/tmp"
|
||||
)
|
||||
|
||||
// validBucket regexp.
|
||||
|
@ -293,7 +293,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string) error {
|
||||
// Heal the part file.
|
||||
checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
|
||||
bucket, pathJoin(object, partName),
|
||||
minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName),
|
||||
minioMetaTmpBucket, pathJoin(tmpID, partName),
|
||||
partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -319,7 +319,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string) error {
|
||||
}
|
||||
|
||||
// Generate and write `xl.json` generated from other disks.
|
||||
err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
|
||||
err := writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks))
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -335,7 +335,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string) error {
|
||||
return traceError(err)
|
||||
}
|
||||
// Attempt a rename now from healed data to final location.
|
||||
err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
|
||||
err = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket, retainSlash(object))
|
||||
if err != nil {
|
||||
return traceError(err)
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error {
|
||||
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
||||
uniqueID := getUUID()
|
||||
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
||||
tmpUploadsPath := uniqueID
|
||||
|
||||
// slice to store errors from disks
|
||||
errs := make([]error, len(xl.storageDisks))
|
||||
@ -75,7 +75,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
|
||||
if !isDelete[index] {
|
||||
errs[index] = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, disk)
|
||||
} else {
|
||||
wErr := disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
|
||||
wErr := disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaTmpBucket, tmpUploadsPath)
|
||||
if wErr != nil {
|
||||
errs[index] = traceError(wErr)
|
||||
}
|
||||
@ -115,7 +115,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
|
||||
)
|
||||
} else {
|
||||
_ = disk.RenameFile(
|
||||
minioMetaBucket, tmpUploadsPath,
|
||||
minioMetaTmpBucket, tmpUploadsPath,
|
||||
minioMetaBucket, uploadsPath,
|
||||
)
|
||||
}
|
||||
@ -135,7 +135,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
// isDelete[index] = true at this point.
|
||||
_ = disk.DeleteFile(minioMetaBucket, tmpUploadsPath)
|
||||
_ = disk.DeleteFile(minioMetaTmpBucket, tmpUploadsPath)
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
@ -236,10 +236,10 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
// Delete any dangling directories.
|
||||
defer disk.DeleteFile(minioMetaBucket, srcPrefix)
|
||||
defer disk.DeleteFile(minioMetaTmpBucket, srcPrefix)
|
||||
|
||||
// Renames `xl.json` from source prefix to destination prefix.
|
||||
rErr := disk.RenameFile(minioMetaBucket, srcJSONFile, minioMetaBucket, dstJSONFile)
|
||||
rErr := disk.RenameFile(minioMetaTmpBucket, srcJSONFile, minioMetaBucket, dstJSONFile)
|
||||
if rErr != nil {
|
||||
mErrs[index] = traceError(rErr)
|
||||
return
|
||||
|
@ -288,19 +288,19 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
|
||||
|
||||
uploadID := getUUID()
|
||||
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
||||
tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
|
||||
tempUploadIDPath := uploadID
|
||||
// Write updated `xl.json` to all disks.
|
||||
if err := writeSameXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
|
||||
if err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
|
||||
}
|
||||
// delete the tmp path later in case we fail to rename (ignore
|
||||
// returned errors) - this will be a no-op in case of a rename
|
||||
// success.
|
||||
defer xl.deleteObject(minioMetaBucket, tempUploadIDPath)
|
||||
defer xl.deleteObject(minioMetaTmpBucket, tempUploadIDPath)
|
||||
|
||||
// Attempt to rename temp upload object to actual upload path
|
||||
// object
|
||||
if rErr := renameObject(xl.storageDisks, minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath, xl.writeQuorum); rErr != nil {
|
||||
if rErr := renameObject(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath, xl.writeQuorum); rErr != nil {
|
||||
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
@ -391,7 +391,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
|
||||
partSuffix := fmt.Sprintf("part.%d", partID)
|
||||
tmpSuffix := getUUID()
|
||||
tmpPartPath := path.Join(tmpMetaPrefix, tmpSuffix)
|
||||
tmpPartPath := tmpSuffix
|
||||
|
||||
// Initialize md5 writer.
|
||||
md5Writer := md5.New()
|
||||
@ -421,19 +421,19 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
teeReader := io.TeeReader(lreader, mw)
|
||||
|
||||
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
|
||||
defer xl.deleteObject(minioMetaBucket, tmpPartPath)
|
||||
defer xl.deleteObject(minioMetaTmpBucket, tmpPartPath)
|
||||
|
||||
if size > 0 {
|
||||
for _, disk := range onlineDisks {
|
||||
if disk != nil {
|
||||
actualSize := xl.sizeOnDisk(size, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
|
||||
disk.PrepareFile(minioMetaBucket, tmpPartPath, actualSize)
|
||||
disk.PrepareFile(minioMetaTmpBucket, tmpPartPath, actualSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Erasure code data and write across all disks.
|
||||
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum)
|
||||
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -478,7 +478,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
|
||||
// Rename temporary part file to its final location.
|
||||
partPath := path.Join(uploadIDPath, partSuffix)
|
||||
err = renamePart(onlineDisks, minioMetaBucket, tmpPartPath, minioMetaBucket, partPath, xl.writeQuorum)
|
||||
err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaBucket, partPath, xl.writeQuorum)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, partPath)
|
||||
}
|
||||
@ -515,11 +515,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
|
||||
// Write all the checksum metadata.
|
||||
newUUID := getUUID()
|
||||
tempXLMetaPath := path.Join(tmpMetaPrefix, newUUID)
|
||||
tempXLMetaPath := newUUID
|
||||
|
||||
// Writes a unique `xl.json` each disk carrying new checksum related information.
|
||||
if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, tempXLMetaPath)
|
||||
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath)
|
||||
}
|
||||
rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum)
|
||||
if rErr != nil {
|
||||
@ -751,7 +751,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// Save successfully calculated md5sum.
|
||||
xlMeta.Meta["md5Sum"] = s3MD5
|
||||
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
||||
tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
|
||||
tempUploadIDPath := uploadID
|
||||
|
||||
// Update all xl metadata, make sure to not modify fields like
|
||||
// checksum which are different on each disks.
|
||||
@ -762,8 +762,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
}
|
||||
|
||||
// Write unique `xl.json` for each disk.
|
||||
if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
|
||||
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
|
||||
}
|
||||
rErr := commitXLMetadata(onlineDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum)
|
||||
if rErr != nil {
|
||||
@ -793,7 +793,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// NOTE: Do not use online disks slice here.
|
||||
// The reason is that existing object should be purged
|
||||
// regardless of `xl.json` status and rolled back in case of errors.
|
||||
err = renameObject(xl.storageDisks, bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID), xl.writeQuorum)
|
||||
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, uniqueID, xl.writeQuorum)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -818,7 +818,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
}
|
||||
|
||||
// Delete the previously successfully renamed object.
|
||||
xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID))
|
||||
xl.deleteObject(minioMetaTmpBucket, uniqueID)
|
||||
|
||||
// Hold the lock so that two parallel
|
||||
// complete-multipart-uploads do not leave a stale
|
||||
|
@ -383,8 +383,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
}
|
||||
|
||||
uniqueID := getUUID()
|
||||
tempErasureObj := path.Join(tmpMetaPrefix, uniqueID, "part.1")
|
||||
minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix)
|
||||
tempErasureObj := path.Join(uniqueID, "part.1")
|
||||
tempObj := uniqueID
|
||||
|
||||
// Initialize md5 writer.
|
||||
@ -449,15 +448,15 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
for _, disk := range onlineDisks {
|
||||
if disk != nil {
|
||||
actualSize := xl.sizeOnDisk(size, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
|
||||
disk.PrepareFile(minioMetaBucket, tempErasureObj, actualSize)
|
||||
disk.PrepareFile(minioMetaTmpBucket, tempErasureObj, actualSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Erasure code data and write across all disks.
|
||||
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum)
|
||||
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, tempErasureObj)
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempErasureObj)
|
||||
}
|
||||
// Should return IncompleteBody{} error when reader has fewer bytes
|
||||
// than specified in request header.
|
||||
|
Loading…
x
Reference in New Issue
Block a user