xl: avoid sending Delete() remote call for fully successful runs

an optimization to avoid extra syscalls in PutObject(),
adds up to our PutObject response times.
This commit is contained in:
Harshavardhana
2021-03-24 14:19:52 -07:00
parent 906d68c356
commit d7f32ad649
5 changed files with 61 additions and 31 deletions

View File

@@ -42,6 +42,15 @@ var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnform
/// Object Operations
func countOnlineDisks(onlineDisks []StorageAPI) (online int) {
for _, onlineDisk := range onlineDisks {
if onlineDisk != nil && onlineDisk.IsOnline() {
online++
}
}
return online
}
// CopyObject - copy object source object to destination object.
// if source object and destination object are same we only
// update metadata.
@@ -116,8 +125,13 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
tempObj := mustGetUUID()
var online int
// Cleanup in case of xl.meta writing failure
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
defer func() {
if online != len(onlineDisks) {
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
}
}()
// Write unique `xl.meta` for each disk.
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
@@ -129,6 +143,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
return oi, toObjectErr(err, srcBucket, srcObject)
}
online = countOnlineDisks(onlineDisks)
return fi.ToObjectInfo(srcBucket, srcObject), nil
}
@@ -641,11 +657,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
writeQuorum++
}
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument, logger.Application)
@@ -713,6 +724,16 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
partName := "part.1"
tempErasureObj := pathJoin(uniqueID, fi.DataDir, partName)
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
var online int
defer func() {
if online != len(onlineDisks) {
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
}
}()
writers := make([]io.Writer, len(onlineDisks))
for i, disk := range onlineDisks {
if disk == nil {
@@ -806,6 +827,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
break
}
}
online = countOnlineDisks(onlineDisks)
return fi.ToObjectInfo(bucket, object), nil
}
@@ -859,8 +881,8 @@ func (er erasureObjects) deleteObject(ctx context.Context, bucket, object string
var err error
defer ObjectPathUpdated(pathJoin(bucket, object))
tmpObj := mustGetUUID()
disks := er.getDisks()
tmpObj := mustGetUUID()
if bucket == minioMetaTmpBucket {
tmpObj = object
} else {