mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
use object modTime for the event sequencer ID (#18285)
always set modTime after lock is acquired in completemultipart stage to make sure that the modTime is not racy.
This commit is contained in:
parent
aa703dc903
commit
bbfea29c2b
@ -1169,6 +1169,16 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
return oi, err
|
||||
}
|
||||
}
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx)
|
||||
|
||||
if checksumType.IsSet() {
|
||||
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
|
||||
cs := hash.NewChecksumFromData(checksumType, checksumCombined)
|
||||
@ -1212,15 +1222,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
}
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx)
|
||||
|
||||
// Remove parts that weren't present in CompleteMultipartUpload request.
|
||||
for _, curpart := range currentFI.Parts {
|
||||
// Remove part.meta which is not needed anymore.
|
||||
|
@ -210,6 +210,9 @@ type eventArgs struct {
|
||||
func (args eventArgs) ToEvent(escape bool) event.Event {
|
||||
eventTime := UTCNow()
|
||||
uniqueID := fmt.Sprintf("%X", eventTime.UnixNano())
|
||||
if !args.Object.ModTime.IsZero() {
|
||||
uniqueID = fmt.Sprintf("%X", args.Object.ModTime.UnixNano())
|
||||
}
|
||||
|
||||
respElements := map[string]string{
|
||||
"x-amz-request-id": args.RespElements["requestId"],
|
||||
|
@ -194,8 +194,6 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts
|
||||
Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceMTime, err),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
opts.MTime = UTCNow()
|
||||
}
|
||||
return opts, nil
|
||||
}
|
||||
@ -225,7 +223,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
|
||||
}
|
||||
|
||||
mtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceMTime))
|
||||
mtime := UTCNow()
|
||||
var mtime time.Time
|
||||
if mtimeStr != "" {
|
||||
mtime, err = time.Parse(time.RFC3339Nano, mtimeStr)
|
||||
if err != nil {
|
||||
@ -237,7 +235,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
|
||||
}
|
||||
}
|
||||
retaintimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceObjectRetentionTimestamp))
|
||||
retaintimestmp := mtime
|
||||
var retaintimestmp time.Time
|
||||
if retaintimeStr != "" {
|
||||
retaintimestmp, err = time.Parse(time.RFC3339, retaintimeStr)
|
||||
if err != nil {
|
||||
@ -250,7 +248,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
|
||||
}
|
||||
|
||||
lholdtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceObjectLegalHoldTimestamp))
|
||||
lholdtimestmp := mtime
|
||||
var lholdtimestmp time.Time
|
||||
if lholdtimeStr != "" {
|
||||
lholdtimestmp, err = time.Parse(time.RFC3339, lholdtimeStr)
|
||||
if err != nil {
|
||||
@ -262,7 +260,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
|
||||
}
|
||||
}
|
||||
tagtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceTaggingTimestamp))
|
||||
taggingtimestmp := mtime
|
||||
var taggingtimestmp time.Time
|
||||
if tagtimeStr != "" {
|
||||
taggingtimestmp, err = time.Parse(time.RFC3339, tagtimeStr)
|
||||
if err != nil {
|
||||
@ -353,7 +351,7 @@ func copySrcOpts(ctx context.Context, r *http.Request, bucket, object string) (O
|
||||
// get ObjectOptions for CompleteMultipart calls
|
||||
func completeMultipartOpts(ctx context.Context, r *http.Request, bucket, object string) (opts ObjectOptions, err error) {
|
||||
mtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceMTime))
|
||||
mtime := UTCNow()
|
||||
var mtime time.Time
|
||||
if mtimeStr != "" {
|
||||
mtime, err = time.Parse(time.RFC3339Nano, mtimeStr)
|
||||
if err != nil {
|
||||
|
@ -1375,7 +1375,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
srcTimestamp := dstOpts.ReplicationSourceLegalholdTimestamp
|
||||
if !srcTimestamp.IsZero() {
|
||||
ondiskTimestamp, err := time.Parse(lastLegalHoldTimestamp, time.RFC3339Nano)
|
||||
// update legalhold metadata only if replica timestamp is newer than what's on disk
|
||||
// update legalhold metadata only if replica timestamp is newer than what's on disk
|
||||
if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
|
||||
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
|
||||
srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = srcTimestamp.Format(time.RFC3339Nano)
|
||||
|
Loading…
Reference in New Issue
Block a user