mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
Use new gofumpt (#21613)
Update tinylib. Should fix CI. `gofumpt -w .&&go generate ./...`
This commit is contained in:
@@ -253,31 +253,31 @@ func getMustReplicateOptions(userDefined map[string]string, userTags string, sta
|
||||
func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (dsc ReplicateDecision) {
|
||||
// object layer not initialized we return with no decision.
|
||||
if newObjectLayerFn() == nil {
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
|
||||
// Disable server-side replication on object prefixes which are excluded
|
||||
// from versioning via the MinIO bucket versioning extension.
|
||||
if !globalBucketVersioningSys.PrefixEnabled(bucket, object) {
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
|
||||
replStatus := mopts.ReplicationStatus()
|
||||
if replStatus == replication.Replica && !mopts.isMetadataReplication() {
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
|
||||
if mopts.replicationRequest { // incoming replication request on target cluster
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
if cfg == nil {
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
|
||||
opts := replication.ObjectOpts{
|
||||
@@ -348,16 +348,16 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
|
||||
rcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil || rcfg == nil {
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
// If incoming request is a replication request, it does not need to be re-replicated.
|
||||
if delOpts.ReplicationRequest {
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
// Skip replication if this object's prefix is excluded from being
|
||||
// versioned.
|
||||
if !delOpts.Versioned {
|
||||
return
|
||||
return dsc
|
||||
}
|
||||
opts := replication.ObjectOpts{
|
||||
Name: dobj.ObjectName,
|
||||
@@ -617,10 +617,10 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
|
||||
|
||||
if dobj.VersionID == "" && rinfo.PrevReplicationStatus == replication.Completed && dobj.OpType != replication.ExistingObjectReplicationType {
|
||||
rinfo.ReplicationStatus = rinfo.PrevReplicationStatus
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
if dobj.VersionID != "" && rinfo.VersionPurgeStatus == replication.VersionPurgeComplete {
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||
replLogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", dobj.Bucket, tgt.ARN), "replication-target-offline-delete-"+tgt.ARN)
|
||||
@@ -641,7 +641,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
|
||||
} else {
|
||||
rinfo.VersionPurgeStatus = replication.VersionPurgeFailed
|
||||
}
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
// early return if already replicated delete marker for existing object replication/ healing delete markers
|
||||
if dobj.DeleteMarkerVersionID != "" {
|
||||
@@ -658,13 +658,13 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
|
||||
// delete marker already replicated
|
||||
if dobj.VersionID == "" && rinfo.VersionPurgeStatus.Empty() {
|
||||
rinfo.ReplicationStatus = replication.Completed
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
case isErrObjectNotFound(serr), isErrVersionNotFound(serr):
|
||||
// version being purged is already not found on target.
|
||||
if !rinfo.VersionPurgeStatus.Empty() {
|
||||
rinfo.VersionPurgeStatus = replication.VersionPurgeComplete
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
case isErrReadQuorum(serr), isErrWriteQuorum(serr):
|
||||
// destination has some quorum issues, perform removeObject() anyways
|
||||
@@ -678,7 +678,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
|
||||
if err != nil && !toi.ReplicationReady {
|
||||
rinfo.ReplicationStatus = replication.Failed
|
||||
rinfo.Err = err
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -709,7 +709,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
|
||||
rinfo.VersionPurgeStatus = replication.VersionPurgeComplete
|
||||
}
|
||||
}
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
func getCopyObjMetadata(oi ObjectInfo, sc string) map[string]string {
|
||||
@@ -910,7 +910,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (put
|
||||
}
|
||||
putOpts.ServerSideEncryption = sseEnc
|
||||
}
|
||||
return
|
||||
return putOpts, isMP, err
|
||||
}
|
||||
|
||||
type replicationAction string
|
||||
@@ -1208,7 +1208,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
if ri.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
|
||||
rinfo.ReplicationStatus = replication.Completed
|
||||
rinfo.ReplicationResynced = true
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||
@@ -1220,7 +1220,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
UserAgent: "Internal: [Replication]",
|
||||
Host: globalLocalNodeName,
|
||||
})
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
|
||||
@@ -1244,7 +1244,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
})
|
||||
replLogOnceIf(ctx, fmt.Errorf("unable to read source object %s/%s(%s): %w", bucket, object, objInfo.VersionID, err), object+":"+objInfo.VersionID)
|
||||
}
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
defer gr.Close()
|
||||
|
||||
@@ -1268,7 +1268,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
UserAgent: "Internal: [Replication]",
|
||||
Host: globalLocalNodeName,
|
||||
})
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1307,7 +1307,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
UserAgent: "Internal: [Replication]",
|
||||
Host: globalLocalNodeName,
|
||||
})
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
var headerSize int
|
||||
@@ -1344,7 +1344,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
||||
}
|
||||
}
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
// replicateAll replicates metadata for specified version of the object to destination bucket
|
||||
@@ -1380,7 +1380,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
UserAgent: "Internal: [Replication]",
|
||||
Host: globalLocalNodeName,
|
||||
})
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
|
||||
@@ -1405,7 +1405,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
})
|
||||
replLogIf(ctx, fmt.Errorf("unable to replicate to target %s for %s/%s(%s): %w", tgt.EndpointURL(), bucket, object, objInfo.VersionID, err))
|
||||
}
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
defer gr.Close()
|
||||
|
||||
@@ -1418,7 +1418,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
|
||||
rinfo.ReplicationStatus = replication.Completed
|
||||
rinfo.ReplicationResynced = true
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
size, err := objInfo.GetActualSize()
|
||||
@@ -1431,7 +1431,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
UserAgent: "Internal: [Replication]",
|
||||
Host: globalLocalNodeName,
|
||||
})
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
// Set the encrypted size for SSE-C objects
|
||||
@@ -1494,7 +1494,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
rinfo.ReplicationAction = rAction
|
||||
rinfo.ReplicationStatus = replication.Completed
|
||||
}
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
} else {
|
||||
// SSEC objects will refuse HeadObject without the decryption key.
|
||||
@@ -1528,7 +1528,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
UserAgent: "Internal: [Replication]",
|
||||
Host: globalLocalNodeName,
|
||||
})
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
}
|
||||
applyAction:
|
||||
@@ -1594,7 +1594,7 @@ applyAction:
|
||||
UserAgent: "Internal: [Replication]",
|
||||
Host: globalLocalNodeName,
|
||||
})
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
var headerSize int
|
||||
for k, v := range putOpts.Header() {
|
||||
@@ -1631,7 +1631,7 @@ applyAction:
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return rinfo
|
||||
}
|
||||
|
||||
func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts minio.PutObjectOptions) (err error) {
|
||||
@@ -2677,7 +2677,7 @@ func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
|
||||
// Resync returns true if replication reset is requested
|
||||
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
|
||||
if c.Empty() {
|
||||
return
|
||||
return r
|
||||
}
|
||||
|
||||
// Now overlay existing object replication choices for target
|
||||
@@ -2693,7 +2693,7 @@ func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc Replic
|
||||
tgtArns := c.Config.FilterTargetArns(opts)
|
||||
// indicates no matching target with Existing object replication enabled.
|
||||
if len(tgtArns) == 0 {
|
||||
return
|
||||
return r
|
||||
}
|
||||
for _, t := range tgtArns {
|
||||
opts.TargetArn = t
|
||||
@@ -2719,7 +2719,7 @@ func (c replicationConfig) resync(oi ObjectInfo, dsc ReplicateDecision, tgtStatu
|
||||
targets: make(map[string]ResyncTargetDecision, len(dsc.targetsMap)),
|
||||
}
|
||||
if c.remotes == nil {
|
||||
return
|
||||
return r
|
||||
}
|
||||
for _, tgt := range c.remotes.Targets {
|
||||
d, ok := dsc.targetsMap[tgt.Arn]
|
||||
@@ -2731,7 +2731,7 @@ func (c replicationConfig) resync(oi ObjectInfo, dsc ReplicateDecision, tgtStatu
|
||||
}
|
||||
r.targets[d.Arn] = resyncTarget(oi, tgt.Arn, tgt.ResetID, tgt.ResetBeforeDate, tgtStatuses[tgt.Arn])
|
||||
}
|
||||
return
|
||||
return r
|
||||
}
|
||||
|
||||
func targetResetHeader(arn string) string {
|
||||
@@ -2750,28 +2750,28 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim
|
||||
if !ok { // existing object replication is enabled and object version is unreplicated so far.
|
||||
if resetID != "" && oi.ModTime.Before(resetBeforeDate) { // trigger replication if `mc replicate reset` requested
|
||||
rd.Replicate = true
|
||||
return
|
||||
return rd
|
||||
}
|
||||
// For existing object reset - this condition is needed
|
||||
rd.Replicate = tgtStatus == ""
|
||||
return
|
||||
return rd
|
||||
}
|
||||
if resetID == "" || resetBeforeDate.Equal(timeSentinel) { // no reset in progress
|
||||
return
|
||||
return rd
|
||||
}
|
||||
|
||||
// if already replicated, return true if a new reset was requested.
|
||||
splits := strings.SplitN(rs, ";", 2)
|
||||
if len(splits) != 2 {
|
||||
return
|
||||
return rd
|
||||
}
|
||||
newReset := splits[1] != resetID
|
||||
if !newReset && tgtStatus == replication.Completed {
|
||||
// already replicated and no reset requested
|
||||
return
|
||||
return rd
|
||||
}
|
||||
rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate)
|
||||
return
|
||||
return rd
|
||||
}
|
||||
|
||||
const resyncTimeInterval = time.Minute * 1
|
||||
@@ -3422,12 +3422,12 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
|
||||
roi = getHealReplicateObjectInfo(oi, rcfg)
|
||||
roi.RetryCount = uint32(retryCount)
|
||||
if !roi.Dsc.ReplicateAny() {
|
||||
return
|
||||
return roi
|
||||
}
|
||||
// early return if replication already done, otherwise we need to determine if this
|
||||
// version is an existing object that needs healing.
|
||||
if oi.ReplicationStatus == replication.Completed && oi.VersionPurgeStatus.Empty() && !roi.ExistingObjResync.mustResync() {
|
||||
return
|
||||
return roi
|
||||
}
|
||||
|
||||
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
|
||||
@@ -3457,14 +3457,14 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
|
||||
roi.ReplicationStatus == replication.Failed ||
|
||||
roi.VersionPurgeStatus == replication.VersionPurgeFailed || roi.VersionPurgeStatus == replication.VersionPurgePending {
|
||||
globalReplicationPool.Get().queueReplicaDeleteTask(dv)
|
||||
return
|
||||
return roi
|
||||
}
|
||||
// if replication status is Complete on DeleteMarker and existing object resync required
|
||||
if roi.ExistingObjResync.mustResync() && (roi.ReplicationStatus == replication.Completed || roi.ReplicationStatus.Empty()) {
|
||||
queueReplicateDeletesWrapper(dv, roi.ExistingObjResync)
|
||||
return
|
||||
return roi
|
||||
}
|
||||
return
|
||||
return roi
|
||||
}
|
||||
if roi.ExistingObjResync.mustResync() {
|
||||
roi.OpType = replication.ExistingObjectReplicationType
|
||||
@@ -3473,13 +3473,13 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
|
||||
case replication.Pending, replication.Failed:
|
||||
roi.EventType = ReplicateHeal
|
||||
globalReplicationPool.Get().queueReplicaTask(roi)
|
||||
return
|
||||
return roi
|
||||
}
|
||||
if roi.ExistingObjResync.mustResync() {
|
||||
roi.EventType = ReplicateExisting
|
||||
globalReplicationPool.Get().queueReplicaTask(roi)
|
||||
}
|
||||
return
|
||||
return roi
|
||||
}
|
||||
|
||||
const (
|
||||
|
||||
Reference in New Issue
Block a user