mirror of
https://github.com/minio/minio.git
synced 2024-12-26 07:05:55 -05:00
ensure metadata updates go to same pool where version exists (#17451)
This PR also returns the replication status in proxy calls and defers replication attempt if HEAD on object version returned a error different from NoSuchKey
This commit is contained in:
parent
142a5b0dcd
commit
c4d0c49a5f
@ -1384,6 +1384,18 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// if target returns error other than NoSuchKey, defer replication attempt
|
||||||
|
if cerr != nil && minio.ToErrorResponse(cerr).Code != "NoSuchKey" && minio.ToErrorResponse(cerr).Code != "NoSuchVersion" {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("unable to replicate %s/%s (%s). Target returned %s error on HEAD", bucket, object, objInfo.VersionID, cerr))
|
||||||
|
sendEvent(eventArgs{
|
||||||
|
EventName: event.ObjectReplicationNotTracked,
|
||||||
|
BucketName: bucket,
|
||||||
|
Object: objInfo,
|
||||||
|
UserAgent: "Internal: [Replication]",
|
||||||
|
Host: globalLocalNodeName,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
rinfo.ReplicationStatus = replication.Completed
|
rinfo.ReplicationStatus = replication.Completed
|
||||||
rinfo.Size = size
|
rinfo.Size = size
|
||||||
rinfo.ReplicationAction = rAction
|
rinfo.ReplicationAction = rAction
|
||||||
@ -2094,6 +2106,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa
|
|||||||
StorageClass: objInfo.StorageClass,
|
StorageClass: objInfo.StorageClass,
|
||||||
ReplicationStatusInternal: objInfo.ReplicationStatus,
|
ReplicationStatusInternal: objInfo.ReplicationStatus,
|
||||||
UserTags: tags.String(),
|
UserTags: tags.String(),
|
||||||
|
ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus),
|
||||||
}
|
}
|
||||||
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
|
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
|
||||||
for k, v := range objInfo.Metadata {
|
for k, v := range objInfo.Metadata {
|
||||||
|
@ -411,7 +411,9 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
|
|||||||
}
|
}
|
||||||
// do not remove this check as it can lead to inconsistencies
|
// do not remove this check as it can lead to inconsistencies
|
||||||
// for all callers of bucket replication.
|
// for all callers of bucket replication.
|
||||||
|
if !opts.MetadataChg {
|
||||||
opts.VersionID = ""
|
opts.VersionID = ""
|
||||||
|
}
|
||||||
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts)
|
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts)
|
||||||
poolObjInfos[i] = pinfo
|
poolObjInfos[i] = pinfo
|
||||||
}(i, pool, poolOpts[i])
|
}(i, pool, poolOpts[i])
|
||||||
@ -443,7 +445,7 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
|
|||||||
// found a pool
|
// found a pool
|
||||||
return pinfo, nil
|
return pinfo, nil
|
||||||
}
|
}
|
||||||
if isErrReadQuorum(pinfo.Err) {
|
if isErrReadQuorum(pinfo.Err) && !opts.MetadataChg {
|
||||||
// read quorum is returned when the object is visibly
|
// read quorum is returned when the object is visibly
|
||||||
// present but its unreadable, we simply ask the writes to
|
// present but its unreadable, we simply ask the writes to
|
||||||
// schedule to this pool instead. If there is no quorum
|
// schedule to this pool instead. If there is no quorum
|
||||||
@ -2268,6 +2270,7 @@ func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, obje
|
|||||||
return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts)
|
return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts.MetadataChg = true
|
||||||
// We don't know the size here set 1GiB atleast.
|
// We don't know the size here set 1GiB atleast.
|
||||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2284,6 +2287,8 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s
|
|||||||
return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts)
|
return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts.MetadataChg = true
|
||||||
|
|
||||||
// We don't know the size here set 1GiB atleast.
|
// We don't know the size here set 1GiB atleast.
|
||||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2300,6 +2305,8 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec
|
|||||||
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
|
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts.MetadataChg = true
|
||||||
|
|
||||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
|
@ -99,6 +99,8 @@ type ObjectOptions struct {
|
|||||||
IndexCB func() []byte
|
IndexCB func() []byte
|
||||||
|
|
||||||
InclFreeVersions bool
|
InclFreeVersions bool
|
||||||
|
|
||||||
|
MetadataChg bool // is true if it is a metadata update operation.
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExpirationOptions represents object options for object expiration at objectLayer.
|
// ExpirationOptions represents object options for object expiration at objectLayer.
|
||||||
|
Loading…
Reference in New Issue
Block a user