fs: optimize multipart clean work (#4944)

This commit is contained in:
fangyuxiang 2017-09-28 23:09:28 +08:00 committed by Dee Koder
parent 3c836b5f34
commit a5fbe1e16c
2 changed files with 29 additions and 8 deletions

View File

@ -309,6 +309,7 @@ func fsRemoveUploadIDPath(basePath, uploadIDPath string) error {
} }
} }
fsRemoveDir(uploadIDPath)
return nil return nil
} }

View File

@ -62,11 +62,11 @@ func (fs fsObjects) deleteUploadsJSON(bucket, object, uploadID string) error {
// Removes the uploadID, called either by CompleteMultipart of AbortMultipart. If the resuling uploads // Removes the uploadID, called either by CompleteMultipart of AbortMultipart. If the resuling uploads
// slice is empty then we remove/purge the file. // slice is empty then we remove/purge the file.
func (fs fsObjects) removeUploadID(bucket, object, uploadID string, rwlk *lock.LockedFile) error { func (fs fsObjects) removeUploadID(bucket, object, uploadID string, rwlk *lock.LockedFile) (bool, error) {
uploadIDs := uploadsV1{} uploadIDs := uploadsV1{}
_, err := uploadIDs.ReadFrom(rwlk) _, err := uploadIDs.ReadFrom(rwlk)
if err != nil { if err != nil {
return err return false, err
} }
// Removes upload id from the uploads list. // Removes upload id from the uploads list.
@ -75,12 +75,12 @@ func (fs fsObjects) removeUploadID(bucket, object, uploadID string, rwlk *lock.L
// Check this is the last entry. // Check this is the last entry.
if uploadIDs.IsEmpty() { if uploadIDs.IsEmpty() {
// No more uploads left, so we delete `uploads.json` file. // No more uploads left, so we delete `uploads.json` file.
return fs.deleteUploadsJSON(bucket, object, uploadID) return true, fs.deleteUploadsJSON(bucket, object, uploadID)
} // else not empty } // else not empty
// Write update `uploads.json`. // Write update `uploads.json`.
_, err = uploadIDs.WriteTo(rwlk) _, err = uploadIDs.WriteTo(rwlk)
return err return false, err
} }
// Adds a new uploadID if no previous `uploads.json` is // Adds a new uploadID if no previous `uploads.json` is
@ -656,6 +656,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
uploadIDPath := pathJoin(bucket, object, uploadID) uploadIDPath := pathJoin(bucket, object, uploadID)
var removeObjectDir bool
// Hold the lock so that two parallel complete-multipart-uploads // Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind. // do not leave a stale uploads.json behind.
@ -663,7 +664,15 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return oi, err return oi, err
} }
defer objectMPartPathLock.Unlock()
defer func() {
if removeObjectDir {
basePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket)
derr := fsDeleteFile(basePath, pathJoin(basePath, object))
errorIf(derr, "unable to remove %s in %s", pathJoin(basePath, object), basePath)
}
objectMPartPathLock.Unlock()
}()
fsMetaPathMultipart := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) fsMetaPathMultipart := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
rlk, err := fs.rwPool.Open(fsMetaPathMultipart) rlk, err := fs.rwPool.Open(fsMetaPathMultipart)
@ -830,7 +839,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Remove entry from `uploads.json`. // Remove entry from `uploads.json`.
if err = fs.removeUploadID(bucket, object, uploadID, rwlk); err != nil { removeObjectDir, err = fs.removeUploadID(bucket, object, uploadID, rwlk)
if err != nil {
return oi, toObjectErr(err, minioMetaMultipartBucket, pathutil.Join(bucket, object)) return oi, toObjectErr(err, minioMetaMultipartBucket, pathutil.Join(bucket, object))
} }
@ -865,6 +875,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
} }
uploadIDPath := pathJoin(bucket, object, uploadID) uploadIDPath := pathJoin(bucket, object, uploadID)
var removeObjectDir bool
// Hold the lock so that two parallel complete-multipart-uploads // Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind. // do not leave a stale uploads.json behind.
@ -873,7 +884,15 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return err return err
} }
defer objectMPartPathLock.Unlock()
defer func() {
if removeObjectDir {
basePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket)
derr := fsDeleteFile(basePath, pathJoin(basePath, object))
errorIf(derr, "unable to remove %s in %s", pathJoin(basePath, object), basePath)
}
objectMPartPathLock.Unlock()
}()
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
if _, err := fs.rwPool.Open(fsMetaPath); err != nil { if _, err := fs.rwPool.Open(fsMetaPath); err != nil {
@ -910,7 +929,8 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
} }
// Remove entry from `uploads.json`. // Remove entry from `uploads.json`.
if err = fs.removeUploadID(bucket, object, uploadID, rwlk); err != nil { removeObjectDir, err = fs.removeUploadID(bucket, object, uploadID, rwlk)
if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }