mirror of https://github.com/minio/minio.git
mark replication target offline if network timeouts seen (#17907)
regular target liveness check every 5 secs will toggle state back as target returns online.
This commit is contained in:
parent
a2f0771fd3
commit
4a6af93c83
|
@ -632,6 +632,9 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||||
|
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
||||||
|
}
|
||||||
// mark delete marker replication as failed if target cluster not ready to receive
|
// mark delete marker replication as failed if target cluster not ready to receive
|
||||||
// this request yet (object version not replicated yet)
|
// this request yet (object version not replicated yet)
|
||||||
if err != nil && !toi.ReplicationReady {
|
if err != nil && !toi.ReplicationReady {
|
||||||
|
@ -656,6 +659,9 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
|
||||||
rinfo.VersionPurgeStatus = Failed
|
rinfo.VersionPurgeStatus = Failed
|
||||||
}
|
}
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", tgt.Bucket, dobj.ObjectName, versionID, rmErr))
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", tgt.Bucket, dobj.ObjectName, versionID, rmErr))
|
||||||
|
if rmErr != nil && minio.IsNetworkOrHostDown(rmErr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||||
|
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if dobj.VersionID == "" {
|
if dobj.VersionID == "" {
|
||||||
rinfo.ReplicationStatus = replication.Completed
|
rinfo.ReplicationStatus = replication.Completed
|
||||||
|
@ -1217,7 +1223,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||||
}
|
}
|
||||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||||
if objInfo.isMultipart() {
|
if objInfo.isMultipart() {
|
||||||
if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
|
if err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
|
||||||
r, objInfo, putOpts); err != nil {
|
r, objInfo, putOpts); err != nil {
|
||||||
if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
|
if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
|
||||||
rinfo.ReplicationStatus = replication.Failed
|
rinfo.ReplicationStatus = replication.Failed
|
||||||
|
@ -1232,6 +1238,9 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||||
|
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1375,6 +1384,10 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||||
}
|
}
|
||||||
// if target returns error other than NoSuchKey, defer replication attempt
|
// if target returns error other than NoSuchKey, defer replication attempt
|
||||||
if cerr != nil {
|
if cerr != nil {
|
||||||
|
if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||||
|
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
||||||
|
}
|
||||||
|
|
||||||
errResp := minio.ToErrorResponse(cerr)
|
errResp := minio.ToErrorResponse(cerr)
|
||||||
switch errResp.Code {
|
switch errResp.Code {
|
||||||
case "NoSuchKey", "NoSuchVersion", "SlowDownRead":
|
case "NoSuchKey", "NoSuchVersion", "SlowDownRead":
|
||||||
|
@ -1446,7 +1459,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||||
}
|
}
|
||||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||||
if objInfo.isMultipart() {
|
if objInfo.isMultipart() {
|
||||||
if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
|
if err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
|
||||||
r, objInfo, putOpts); err != nil {
|
r, objInfo, putOpts); err != nil {
|
||||||
if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
|
if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
|
||||||
rinfo.ReplicationStatus = replication.Failed
|
rinfo.ReplicationStatus = replication.Failed
|
||||||
|
@ -1465,6 +1478,9 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||||
|
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,6 +69,16 @@ func (sys *BucketTargetSys) isOffline(ep *url.URL) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// markOffline sets endpoint to offline if network i/o timeout seen.
|
||||||
|
func (sys *BucketTargetSys) markOffline(ep *url.URL) {
|
||||||
|
sys.hMutex.Lock()
|
||||||
|
defer sys.hMutex.Unlock()
|
||||||
|
if h, ok := sys.hc[ep.Host]; ok {
|
||||||
|
h.Online = false
|
||||||
|
sys.hc[ep.Host] = h
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sys *BucketTargetSys) initHC(ep *url.URL) {
|
func (sys *BucketTargetSys) initHC(ep *url.URL) {
|
||||||
sys.hMutex.Lock()
|
sys.hMutex.Lock()
|
||||||
sys.hc[ep.Host] = epHealth{
|
sys.hc[ep.Host] = epHealth{
|
||||||
|
|
Loading…
Reference in New Issue