support proxying of tagging requests in replication (#18649)

support proxying of tagging requests in active-active replication

Note: even if proxying is successful, PutObjectTagging/DeleteObjectTagging
will continue to report a 404 since the object is not present locally.
This commit is contained in:
Poorna
2024-01-12 23:51:33 -08:00
committed by GitHub
parent cba3dd276b
commit b2b26d9c95
3 changed files with 200 additions and 3 deletions

View File

@@ -2255,7 +2255,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa
if rs != nil {
h, err := rs.ToHeader()
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err))
logger.LogIf(ctx, fmt.Errorf("invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err))
continue
}
gopts.Set(xhttp.Range, h)
@@ -2348,6 +2348,127 @@ func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc
}
}
// proxyTaggingToRepTarget proxies tagging requests to remote targets for
// active-active replicated setups
func proxyTaggingToRepTarget(ctx context.Context, bucket, object string, tags *tags.Tags, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (proxy proxyResult) {
// this option is set when active-active replication is in place between site A -> B,
// and request hits site B that does not have the object yet.
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
return proxy
}
var wg sync.WaitGroup
errs := make([]error, len(proxyTargets.Targets))
for idx, t := range proxyTargets.Targets {
tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn)
if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
continue
}
// if proxying explicitly disabled on remote target
if tgt.disableProxy {
continue
}
idx := idx
wg.Add(1)
go func(idx int, tgt *TargetClient) {
defer wg.Done()
var err error
if tags != nil {
popts := minio.PutObjectTaggingOptions{
VersionID: opts.VersionID,
Internal: minio.AdvancedObjectTaggingOptions{
ReplicationProxyRequest: "true",
},
}
err = tgt.PutObjectTagging(ctx, tgt.Bucket, object, tags, popts)
} else {
dopts := minio.RemoveObjectTaggingOptions{
VersionID: opts.VersionID,
Internal: minio.AdvancedObjectTaggingOptions{
ReplicationProxyRequest: "true",
},
}
err = tgt.RemoveObjectTagging(ctx, tgt.Bucket, object, dopts)
}
if err != nil {
errs[idx] = err
}
}(idx, tgt)
}
wg.Wait()
var (
terr error
taggedCount int
)
for _, err := range errs {
if err == nil {
taggedCount++
continue
}
errCode := minio.ToErrorResponse(err).Code
if errCode != "NoSuchKey" && errCode != "NoSuchVersion" {
terr = err
}
}
// don't return error if at least one target was tagged successfully
if taggedCount == 0 && terr != nil {
proxy.Err = terr
}
return proxy
}
// proxyGetTaggingToRepTarget proxies get tagging requests to remote targets for
// active-active replicated setups
func proxyGetTaggingToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgs *tags.Tags, proxy proxyResult) {
// this option is set when active-active replication is in place between site A -> B,
// and request hits site B that does not have the object yet.
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
return nil, proxy
}
var wg sync.WaitGroup
errs := make([]error, len(proxyTargets.Targets))
tagSlc := make([]map[string]string, len(proxyTargets.Targets))
for idx, t := range proxyTargets.Targets {
tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn)
if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
continue
}
// if proxying explicitly disabled on remote target
if tgt.disableProxy {
continue
}
idx := idx
wg.Add(1)
go func(idx int, tgt *TargetClient) {
defer wg.Done()
var err error
gopts := minio.GetObjectTaggingOptions{
VersionID: opts.VersionID,
Internal: minio.AdvancedObjectTaggingOptions{
ReplicationProxyRequest: "true",
},
}
tgs, err = tgt.GetObjectTagging(ctx, tgt.Bucket, object, gopts)
if err != nil {
errs[idx] = err
} else {
tagSlc[idx] = tgs.ToMap()
}
}(idx, tgt)
}
wg.Wait()
for idx, err := range errs {
errCode := minio.ToErrorResponse(err).Code
if err != nil && errCode != "NoSuchKey" && errCode != "NoSuchVersion" {
return nil, proxyResult{Err: err}
}
if err == nil {
tgs, _ = tags.MapToObjectTags(tagSlc[idx])
}
}
return tgs, proxy
}
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
globalReplicationPool.queueReplicaDeleteTask(dv)
for arn := range dv.ReplicationState.Targets {