XL: Bring in some modularity into format verification and healing. (#1832)

This commit is contained in:
Harshavardhana
2016-06-02 16:34:15 -07:00
parent aa1d769b1e
commit fb95c1fad3
18 changed files with 684 additions and 145 deletions

View File

@@ -92,6 +92,10 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk
// Update `uploads.json` for all the disks.
for index, disk := range storageDisks {
if disk == nil {
errs[index] = errDiskNotFound
continue
}
wg.Add(1)
// Update `uploads.json` in routine.
go func(index int, disk StorageAPI) {
@@ -120,13 +124,41 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk
// Wait for all the routines to finish updating `uploads.json`
wg.Wait()
// For only single disk return first error.
if len(storageDisks) == 1 {
return errs[0]
} // else count all the errors for quorum validation.
var errCount = 0
// Return for first error.
for _, err := range errs {
if err != nil {
return err
errCount++
}
}
// Count all the errors and validate if we have write quorum.
if errCount > len(storageDisks)-len(storageDisks)/2+3 {
// Validate if we have read quorum return success.
if errCount > len(storageDisks)-len(storageDisks)/2+1 {
return nil
}
// Rename `uploads.json` left over back to tmp location.
for index, disk := range storageDisks {
if disk == nil {
continue
}
// Undo rename `uploads.json` in parallel.
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if errs[index] != nil {
return
}
_ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
}(index, disk)
}
wg.Wait()
return errXLWriteQuorum
}
return nil
}
@@ -149,6 +181,9 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
var uploadsJSON uploadsV1
for _, disk := range storageDisks {
if disk == nil {
continue
}
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
break
}
@@ -170,6 +205,10 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
// Update `uploads.json` on all disks.
for index, disk := range storageDisks {
if disk == nil {
errs[index] = errDiskNotFound
continue
}
wg.Add(1)
// Update `uploads.json` in a routine.
go func(index int, disk StorageAPI) {
@@ -205,13 +244,41 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
// Wait for all the writes to finish.
wg.Wait()
// Return for first error encountered.
for _, err = range errs {
// For only single disk return first error.
if len(storageDisks) == 1 {
return errs[0]
} // else count all the errors for quorum validation.
var errCount = 0
// Return for first error.
for _, err := range errs {
if err != nil {
return err
errCount++
}
}
// Count all the errors and validate if we have write quorum.
if errCount > len(storageDisks)-len(storageDisks)/2+3 {
// Validate if we have read quorum return success.
if errCount > len(storageDisks)-len(storageDisks)/2+1 {
return nil
}
// Rename `uploads.json` left over back to tmp location.
for index, disk := range storageDisks {
if disk == nil {
continue
}
// Undo rename `uploads.json` in parallel.
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if errs[index] != nil {
return
}
_ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
}(index, disk)
}
wg.Wait()
return errXLWriteQuorum
}
return nil
}
@@ -225,6 +292,10 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora
// Cleanup uploadID for all disks.
for index, disk := range storageDisks {
if disk == nil {
errs[index] = errDiskNotFound
continue
}
wg.Add(1)
// Cleanup each uploadID in a routine.
go func(index int, disk StorageAPI) {
@@ -287,6 +358,9 @@ func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count
// Returns if the prefix is a multipart upload.
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
for _, disk := range xl.getLoadBalancedQuorumDisks() {
if disk == nil {
continue
}
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
if err != nil {
return false
@@ -299,6 +373,9 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
// listUploadsInfo - list all uploads info.
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) {
for _, disk := range xl.getLoadBalancedQuorumDisks() {
if disk == nil {
continue
}
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {
@@ -324,6 +401,9 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string)
curpartPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName)
wg := sync.WaitGroup{}
for i, disk := range xl.storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()