mirror of https://github.com/minio/minio.git
reduce logging in bucket replication in retry scenarios (#17820)
This commit is contained in:
parent
b6b6d6e8d8
commit
b732a673dc
|
@ -19,6 +19,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -31,6 +32,7 @@ import (
|
||||||
"github.com/minio/madmin-go/v3"
|
"github.com/minio/madmin-go/v3"
|
||||||
"github.com/minio/minio/internal/bucket/replication"
|
"github.com/minio/minio/internal/bucket/replication"
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
|
"github.com/minio/minio/internal/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate msgp -file=$GOFILE
|
//go:generate msgp -file=$GOFILE
|
||||||
|
@ -180,6 +182,7 @@ type replicateTargetDecision struct {
|
||||||
Synchronous bool // Synchronous replication configured.
|
Synchronous bool // Synchronous replication configured.
|
||||||
Arn string // ARN of replication target
|
Arn string // ARN of replication target
|
||||||
ID string
|
ID string
|
||||||
|
Tgt *TargetClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *replicateTargetDecision) String() string {
|
func (t *replicateTargetDecision) String() string {
|
||||||
|
@ -288,7 +291,7 @@ var errInvalidReplicateDecisionFormat = fmt.Errorf("ReplicateDecision has invali
|
||||||
|
|
||||||
// parse k-v pairs of target ARN to stringified ReplicateTargetDecision delimited by ',' into a
|
// parse k-v pairs of target ARN to stringified ReplicateTargetDecision delimited by ',' into a
|
||||||
// ReplicateDecision struct
|
// ReplicateDecision struct
|
||||||
func parseReplicateDecision(s string) (r ReplicateDecision, err error) {
|
func parseReplicateDecision(ctx context.Context, bucket, s string) (r ReplicateDecision, err error) {
|
||||||
r = ReplicateDecision{
|
r = ReplicateDecision{
|
||||||
targetsMap: make(map[string]replicateTargetDecision),
|
targetsMap: make(map[string]replicateTargetDecision),
|
||||||
}
|
}
|
||||||
|
@ -317,7 +320,13 @@ func parseReplicateDecision(s string) (r ReplicateDecision, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return r, err
|
return r, err
|
||||||
}
|
}
|
||||||
r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: replicate, Synchronous: sync, Arn: tgt[2], ID: tgt[3]}
|
tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, slc[0])
|
||||||
|
if tgtClnt == nil {
|
||||||
|
// Skip stale targets if any and log them to be missing atleast once.
|
||||||
|
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, slc[0]), slc[0])
|
||||||
|
// We save the targetDecision even when its not configured or stale.
|
||||||
|
}
|
||||||
|
r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: replicate, Synchronous: sync, Arn: tgt[2], ID: tgt[3], Tgt: tgtClnt}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/bucket/replication"
|
"github.com/minio/minio/internal/bucket/replication"
|
||||||
|
@ -183,7 +184,7 @@ var parseReplicationDecisionTest = []struct {
|
||||||
|
|
||||||
func TestParseReplicateDecision(t *testing.T) {
|
func TestParseReplicateDecision(t *testing.T) {
|
||||||
for i, test := range parseReplicationDecisionTest {
|
for i, test := range parseReplicationDecisionTest {
|
||||||
dsc, err := parseReplicateDecision(test.expDsc.String())
|
dsc, err := parseReplicateDecision(context.Background(), "bucket", test.expDsc.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if test.expErr != err {
|
if test.expErr != err {
|
||||||
t.Errorf("Test%d (%s): Expected parse error got %t , want %t", i+1, test.name, err, test.expErr)
|
t.Errorf("Test%d (%s): Expected parse error got %t , want %t", i+1, test.name, err, test.expErr)
|
||||||
|
|
|
@ -427,7 +427,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
|
||||||
|
|
||||||
rcfg, err := getReplicationConfig(ctx, bucket)
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
||||||
if err != nil || rcfg == nil {
|
if err != nil || rcfg == nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogOnceIf(ctx, fmt.Errorf("unable to obtain replication config for bucket: %s: err: %s", bucket, err), bucket)
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
BucketName: bucket,
|
BucketName: bucket,
|
||||||
Object: ObjectInfo{
|
Object: ObjectInfo{
|
||||||
|
@ -442,9 +442,10 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dsc, err := parseReplicateDecision(dobj.ReplicationState.ReplicateDecisionStr)
|
dsc, err := parseReplicateDecision(ctx, bucket, dobj.ReplicationState.ReplicateDecisionStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogOnceIf(ctx, fmt.Errorf("unable to parse replication decision parameters for bucket: %s, err: %s, decision: %s",
|
||||||
|
bucket, err, dobj.ReplicationState.ReplicateDecisionStr), dobj.ReplicationState.ReplicateDecisionStr)
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
BucketName: bucket,
|
BucketName: bucket,
|
||||||
Object: ObjectInfo{
|
Object: ObjectInfo{
|
||||||
|
@ -466,7 +467,6 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
|
||||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
|
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
|
||||||
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, dobj.TargetArn))
|
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
BucketName: bucket,
|
BucketName: bucket,
|
||||||
Object: ObjectInfo{
|
Object: ObjectInfo{
|
||||||
|
@ -488,38 +488,23 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
|
||||||
var rinfos replicatedInfos
|
var rinfos replicatedInfos
|
||||||
rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap))
|
rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap))
|
||||||
idx := -1
|
idx := -1
|
||||||
for tgtArn := range dsc.targetsMap {
|
for _, tgtEntry := range dsc.targetsMap {
|
||||||
idx++
|
idx++
|
||||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
|
if tgtEntry.Tgt == nil {
|
||||||
if tgt == nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
|
|
||||||
sendEvent(eventArgs{
|
|
||||||
BucketName: bucket,
|
|
||||||
Object: ObjectInfo{
|
|
||||||
Bucket: bucket,
|
|
||||||
Name: dobj.ObjectName,
|
|
||||||
VersionID: versionID,
|
|
||||||
DeleteMarker: dobj.DeleteMarker,
|
|
||||||
},
|
|
||||||
UserAgent: "Internal: [Replication]",
|
|
||||||
Host: globalLocalNodeName,
|
|
||||||
EventName: event.ObjectReplicationNotTracked,
|
|
||||||
})
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if tgt := dsc.targetsMap[tgtArn]; !tgt.Replicate {
|
if !tgtEntry.Replicate {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
|
// if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
|
||||||
if dobj.TargetArn != "" && dobj.TargetArn != tgt.ARN {
|
if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(index int, tgt *TargetClient) {
|
go func(index int, tgt *TargetClient) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
rinfo := replicateDeleteToTarget(ctx, dobj, tgt)
|
rinfos.Targets[index] = replicateDeleteToTarget(ctx, dobj, tgt)
|
||||||
rinfos.Targets[index] = rinfo
|
}(idx, tgtEntry.Tgt)
|
||||||
}(idx, tgt)
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
@ -1005,7 +990,6 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
||||||
Host: globalLocalNodeName,
|
Host: globalLocalNodeName,
|
||||||
})
|
})
|
||||||
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
|
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
|
||||||
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, ri.TargetArn))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx = lkctx.Context()
|
ctx = lkctx.Context()
|
||||||
|
@ -1017,7 +1001,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
||||||
for i, tgtArn := range tgtArns {
|
for i, tgtArn := range tgtArns {
|
||||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn)
|
||||||
if tgt == nil {
|
if tgt == nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn))
|
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn)
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
EventName: event.ObjectReplicationNotTracked,
|
EventName: event.ObjectReplicationNotTracked,
|
||||||
BucketName: bucket,
|
BucketName: bucket,
|
||||||
|
@ -1156,7 +1140,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||||
UserAgent: "Internal: [Replication]",
|
UserAgent: "Internal: [Replication]",
|
||||||
Host: globalLocalNodeName,
|
Host: globalLocalNodeName,
|
||||||
})
|
})
|
||||||
logger.LogIf(ctx, fmt.Errorf("unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
|
logger.LogOnceIf(ctx, fmt.Errorf("unable to read source object %s/%s(%s): %w", bucket, object, objInfo.VersionID, err), object+":"+objInfo.VersionID)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue