mirror of
https://github.com/minio/minio.git
synced 2025-01-24 21:23:15 -05:00
285a94d2c0
Deletion of tmp files where xl metadata was saved before the commit operation doesn't change the error returned to the caller. So, it is to be ignored.
309 lines
8.6 KiB
Go
309 lines
8.6 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
|
|
func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1) (err error) {
|
|
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
|
uniqueID := getUUID()
|
|
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
|
var errs = make([]error, len(xl.storageDisks))
|
|
var wg = &sync.WaitGroup{}
|
|
|
|
// Update `uploads.json` for all the disks.
|
|
for index, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
errs[index] = errDiskNotFound
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
// Update `uploads.json` in routine.
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
uploadsBytes, wErr := json.Marshal(uploadsJSON)
|
|
if wErr != nil {
|
|
errs[index] = wErr
|
|
return
|
|
}
|
|
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil {
|
|
errs[index] = wErr
|
|
return
|
|
}
|
|
if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil {
|
|
errs[index] = wErr
|
|
return
|
|
}
|
|
}(index, disk)
|
|
}
|
|
|
|
// Wait for all the routines to finish updating `uploads.json`
|
|
wg.Wait()
|
|
|
|
// Count all the errors and validate if we have write quorum.
|
|
if !isQuorum(errs, xl.writeQuorum) {
|
|
// Rename `uploads.json` left over back to tmp location.
|
|
for index, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
// Undo rename `uploads.json` in parallel.
|
|
wg.Add(1)
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
if errs[index] != nil {
|
|
return
|
|
}
|
|
_ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
|
|
}(index, disk)
|
|
}
|
|
wg.Wait()
|
|
return errXLWriteQuorum
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeUploadJSON - create `uploads.json` or update it with new uploadID.
|
|
func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated time.Time) (err error) {
|
|
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
|
uniqueID := getUUID()
|
|
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
|
|
|
var errs = make([]error, len(xl.storageDisks))
|
|
var wg = &sync.WaitGroup{}
|
|
|
|
var uploadsJSON uploadsV1
|
|
for _, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
|
|
break
|
|
}
|
|
if err != nil {
|
|
// For any other errors.
|
|
if err != errFileNotFound {
|
|
return err
|
|
}
|
|
// Set uploads format to `xl` otherwise.
|
|
uploadsJSON = newUploadsV1("xl")
|
|
}
|
|
// Add a new upload id.
|
|
uploadsJSON.AddUploadID(uploadID, initiated)
|
|
|
|
// Update `uploads.json` on all disks.
|
|
for index, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
errs[index] = errDiskNotFound
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
// Update `uploads.json` in a routine.
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON)
|
|
if wErr != nil {
|
|
errs[index] = wErr
|
|
return
|
|
}
|
|
// Write `uploads.json` to disk.
|
|
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil {
|
|
errs[index] = wErr
|
|
return
|
|
}
|
|
wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
|
|
if wErr != nil {
|
|
if dErr := disk.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil {
|
|
errs[index] = dErr
|
|
return
|
|
}
|
|
errs[index] = wErr
|
|
return
|
|
}
|
|
errs[index] = nil
|
|
}(index, disk)
|
|
}
|
|
|
|
// Wait here for all the writes to finish.
|
|
wg.Wait()
|
|
|
|
// Count all the errors and validate if we have write quorum.
|
|
if !isQuorum(errs, xl.writeQuorum) {
|
|
// Rename `uploads.json` left over back to tmp location.
|
|
for index, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
// Undo rename `uploads.json` in parallel.
|
|
wg.Add(1)
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
if errs[index] != nil {
|
|
return
|
|
}
|
|
_ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
|
|
}(index, disk)
|
|
}
|
|
wg.Wait()
|
|
return errXLWriteQuorum
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Returns if the prefix is a multipart upload.
|
|
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
|
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
|
|
if err != nil {
|
|
// For any reason disk was deleted or goes offline, continue
|
|
if err == errDiskNotFound || err == errFaultyDisk {
|
|
continue
|
|
}
|
|
return false
|
|
}
|
|
break
|
|
}
|
|
return true
|
|
}
|
|
|
|
// listUploadsInfo - list all uploads info.
|
|
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) {
|
|
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
|
|
var uploadsJSON uploadsV1
|
|
uploadsJSON, err = readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
|
|
if err != nil {
|
|
// For any reason disk was deleted or goes offline, continue
|
|
if err == errDiskNotFound || err == errFaultyDisk {
|
|
continue
|
|
}
|
|
if err == errFileNotFound {
|
|
return []uploadInfo{}, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
uploadsInfo = uploadsJSON.Uploads
|
|
break
|
|
}
|
|
return uploadsInfo, nil
|
|
}
|
|
|
|
// isUploadIDExists - verify if a given uploadID exists and is valid.
|
|
func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool {
|
|
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
|
return xl.isObject(minioMetaBucket, uploadIDPath)
|
|
}
|
|
|
|
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
|
|
func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) {
|
|
curpartPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName)
|
|
wg := sync.WaitGroup{}
|
|
for i, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
|
|
// requests. xl.json is the authoritative source of truth on which parts constitute
|
|
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
|
|
_ = disk.DeleteFile(minioMetaBucket, curpartPath)
|
|
}(i, disk)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// statPart - returns fileInfo structure for a successful stat on part file.
|
|
func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) {
|
|
partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName)
|
|
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
fileInfo, err = disk.StatFile(minioMetaBucket, partNamePath)
|
|
if err != nil {
|
|
// For any reason disk was deleted or goes offline, continue
|
|
if err == errDiskNotFound {
|
|
continue
|
|
}
|
|
return FileInfo{}, err
|
|
}
|
|
break
|
|
}
|
|
return fileInfo, nil
|
|
}
|
|
|
|
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix.
|
|
func (xl xlObjects) commitXLMetadata(srcPrefix, dstPrefix string) error {
|
|
var wg = &sync.WaitGroup{}
|
|
var mErrs = make([]error, len(xl.storageDisks))
|
|
|
|
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
|
|
dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile)
|
|
|
|
// Rename `xl.json` to all disks in parallel.
|
|
for index, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
mErrs[index] = errDiskNotFound
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
// Rename `xl.json` in a routine.
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
// Delete any dangling directories.
|
|
defer disk.DeleteFile(minioMetaBucket, srcPrefix)
|
|
|
|
// Renames `xl.json` from source prefix to destination prefix.
|
|
rErr := disk.RenameFile(minioMetaBucket, srcJSONFile, minioMetaBucket, dstJSONFile)
|
|
if rErr != nil {
|
|
mErrs[index] = rErr
|
|
return
|
|
}
|
|
mErrs[index] = nil
|
|
}(index, disk)
|
|
}
|
|
// Wait for all the routines.
|
|
wg.Wait()
|
|
|
|
// Do we have write quorum?.
|
|
if !isQuorum(mErrs, xl.writeQuorum) {
|
|
return errXLWriteQuorum
|
|
}
|
|
// For all other errors return.
|
|
for _, err := range mErrs {
|
|
if err != nil && err != errDiskNotFound {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|