remove double reads updating object metadata (#13542)

Removes RLock/RUnlock for updating metadata,
since we already take a write lock to update
metadata, this change removes reading of xl.meta
as well as an additional lock, the performance gain
should increase 3x theoretically for

- PutObjectRetention
- PutObjectLegalHold

This optimization is mainly for Veeam like
workloads that require a certain level of iops
from these API calls, we were losing iops.
This commit is contained in:
Harshavardhana
2021-10-30 08:22:04 -07:00
committed by GitHub
parent 2af5445309
commit 4ed0eb7012
7 changed files with 118 additions and 118 deletions

View File

@@ -3521,53 +3521,39 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
popts := ObjectOptions{
MTime: opts.MTime,
VersionID: opts.VersionID,
EvalMetadataFn: func(oi ObjectInfo) error {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
return nil
},
}
objInfo, err := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if objInfo.DeleteMarker {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
objInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
objInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
// if version-id is not specified retention is supposed to be set on the latest object.
if opts.VersionID == "" {
opts.VersionID = objInfo.VersionID
}
popts := ObjectOptions{
MTime: opts.MTime,
VersionID: opts.VersionID,
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
}
for k, v := range objInfo.UserDefined {
popts.UserDefined[k] = v
}
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.MetadataReplicationType)
}
writeSuccessResponseHeadersOnly(w)
// Notify object event.
@@ -3703,50 +3689,37 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
objInfo, s3Err := enforceRetentionBypassForPut(ctx, r, bucket, object, getObjectInfo, objRetention, cred, owner)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if objInfo.DeleteMarker {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
if objRetention.Mode.Valid() {
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode)
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = objRetention.RetainUntilDate.UTC().Format(time.RFC3339)
} else {
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = ""
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
}
objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
objInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
objInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
// if version-id is not specified retention is supposed to be set on the latest object.
if opts.VersionID == "" {
opts.VersionID = objInfo.VersionID
}
popts := ObjectOptions{
MTime: opts.MTime,
VersionID: opts.VersionID,
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
MTime: opts.MTime,
VersionID: opts.VersionID,
EvalMetadataFn: func(oi ObjectInfo) error {
if err := enforceRetentionBypassForPut(ctx, r, oi, objRetention, cred, owner); err != nil {
return err
}
if objRetention.Mode.Valid() {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode)
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = objRetention.RetainUntilDate.UTC().Format(time.RFC3339)
} else {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = ""
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
}
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
return nil
},
}
for k, v := range objInfo.UserDefined {
popts.UserDefined[k] = v
}
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
objInfo, err := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.MetadataReplicationType)
}