mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
Add support for multi site replication (#12880)
This commit is contained in:
committed by
GitHub
parent
0b8c5a6872
commit
c4373ef290
@@ -452,9 +452,10 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
}
|
||||
|
||||
var (
|
||||
hasLockEnabled, replicateSync bool
|
||||
goi ObjectInfo
|
||||
gerr error
|
||||
hasLockEnabled bool
|
||||
dsc ReplicateDecision
|
||||
goi ObjectInfo
|
||||
gerr error
|
||||
)
|
||||
replicateDeletes := hasReplicationRules(ctx, bucket, deleteObjects.Objects)
|
||||
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled {
|
||||
@@ -514,17 +515,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
}
|
||||
|
||||
if replicateDeletes {
|
||||
replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
|
||||
dsc = checkReplicateDelete(ctx, bucket, ObjectToDelete{
|
||||
ObjectName: object.ObjectName,
|
||||
VersionID: object.VersionID,
|
||||
}, goi, gerr)
|
||||
replicateSync = repsync
|
||||
if replicate {
|
||||
}, goi, opts, gerr)
|
||||
if dsc.ReplicateAny() {
|
||||
if object.VersionID != "" {
|
||||
object.VersionPurgeStatus = Pending
|
||||
object.VersionPurgeStatuses = dsc.PendingStatus()
|
||||
} else {
|
||||
object.DeleteMarkerReplicationStatus = string(replication.Pending)
|
||||
object.DeleteMarkerReplicationStatus = dsc.PendingStatus()
|
||||
}
|
||||
object.ReplicateDecisionStr = dsc.String()
|
||||
}
|
||||
}
|
||||
if object.VersionID != "" && hasLockEnabled {
|
||||
@@ -570,14 +572,15 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
objToDel := ObjectToDelete{
|
||||
ObjectName: dObjects[i].ObjectName,
|
||||
VersionID: dObjects[i].VersionID,
|
||||
VersionPurgeStatus: dObjects[i].VersionPurgeStatus,
|
||||
DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus,
|
||||
VersionPurgeStatus: dObjects[i].VersionPurgeStatus(),
|
||||
VersionPurgeStatuses: dObjects[i].ReplicationState.VersionPurgeStatusInternal,
|
||||
DeleteMarkerReplicationStatus: dObjects[i].ReplicationState.ReplicationStatusInternal,
|
||||
ReplicateDecisionStr: dObjects[i].ReplicationState.ReplicateDecisionStr,
|
||||
}
|
||||
dindex := objectsToDelete[objToDel]
|
||||
if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) {
|
||||
if replicateDeletes {
|
||||
dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus
|
||||
dObjects[i].VersionPurgeStatus = deleteList[i].VersionPurgeStatus
|
||||
dObjects[i].ReplicationState = deleteList[i].ReplicationState()
|
||||
}
|
||||
deletedObjects[dindex] = dObjects[i]
|
||||
continue
|
||||
@@ -610,12 +613,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
}
|
||||
|
||||
if replicateDeletes {
|
||||
if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending {
|
||||
if dobj.DeleteMarkerReplicationStatus() == replication.Pending || dobj.VersionPurgeStatus() == Pending {
|
||||
dv := DeletedObjectReplicationInfo{
|
||||
DeletedObject: dobj,
|
||||
Bucket: bucket,
|
||||
}
|
||||
scheduleReplicationDelete(ctx, dv, objectAPI, replicateSync)
|
||||
scheduleReplicationDelete(ctx, dv, objectAPI)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1639,32 +1642,19 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
bucketStats := globalNotificationSys.GetClusterBucketStats(r.Context(), bucket)
|
||||
bucketReplStats := BucketReplicationStats{}
|
||||
// sum up metrics from each node in the cluster
|
||||
for _, bucketStat := range bucketStats {
|
||||
bucketReplStats.FailedCount += bucketStat.ReplicationStats.FailedCount
|
||||
bucketReplStats.FailedSize += bucketStat.ReplicationStats.FailedSize
|
||||
bucketReplStats.PendingCount += bucketStat.ReplicationStats.PendingCount
|
||||
bucketReplStats.PendingSize += bucketStat.ReplicationStats.PendingSize
|
||||
bucketReplStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize
|
||||
bucketReplStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize
|
||||
var usageInfo BucketUsageInfo
|
||||
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
|
||||
if err == nil && !dataUsageInfo.LastUpdate.IsZero() {
|
||||
usageInfo = dataUsageInfo.BucketsUsage[bucket]
|
||||
}
|
||||
// add initial usage from the time of cluster up
|
||||
usageStat := globalReplicationStats.GetInitialUsage(bucket)
|
||||
bucketReplStats.FailedCount += usageStat.FailedCount
|
||||
bucketReplStats.FailedSize += usageStat.FailedSize
|
||||
bucketReplStats.PendingCount += usageStat.PendingCount
|
||||
bucketReplStats.PendingSize += usageStat.PendingSize
|
||||
bucketReplStats.ReplicaSize += usageStat.ReplicaSize
|
||||
bucketReplStats.ReplicatedSize += usageStat.ReplicatedSize
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&bucketReplStats); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
bucketReplStats := getLatestReplicationStats(bucket, usageInfo)
|
||||
jsonData, err := json.Marshal(bucketReplStats)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
w.(http.Flusher).Flush()
|
||||
writeSuccessResponseJSON(w, jsonData)
|
||||
}
|
||||
|
||||
// ResetBucketReplicationStateHandler - starts a replication reset for all objects in a bucket which
|
||||
@@ -1673,12 +1663,16 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW
|
||||
// remote target is entirely lost,and previously replicated objects need to be re-synced.
|
||||
func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ResetBucketReplicationState")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
durationStr := r.Form.Get("older-than")
|
||||
durationStr := r.URL.Query().Get("older-than")
|
||||
arn := r.URL.Query().Get("arn")
|
||||
resetID := r.URL.Query().Get("reset-id")
|
||||
if resetID == "" {
|
||||
resetID = mustGetUUID()
|
||||
}
|
||||
var (
|
||||
days time.Duration
|
||||
err error
|
||||
@@ -1719,9 +1713,31 @@ func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseW
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNoMatchingRuleError), r.URL)
|
||||
return
|
||||
}
|
||||
target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, config.RoleArn)
|
||||
tgtArns := config.FilterTargetArns(
|
||||
replication.ObjectOpts{
|
||||
OpType: replication.ResyncReplicationType,
|
||||
TargetArn: arn})
|
||||
|
||||
if len(tgtArns) == 0 {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
||||
Bucket: bucket,
|
||||
Err: fmt.Errorf("Remote target ARN %s missing/not eligible for replication resync", arn),
|
||||
}), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if len(tgtArns) > 1 && arn == "" {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
||||
Bucket: bucket,
|
||||
Err: fmt.Errorf("ARN should be specified for replication reset"),
|
||||
}), r.URL)
|
||||
return
|
||||
}
|
||||
var rinfo ResyncTargetsInfo
|
||||
target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArns[0])
|
||||
target.ResetBeforeDate = UTCNow().AddDate(0, 0, -1*int(days/24))
|
||||
target.ResetID = mustGetUUID()
|
||||
target.ResetID = resetID
|
||||
rinfo.Targets = append(rinfo.Targets, ResyncTarget{Arn: tgtArns[0], ResetID: target.ResetID})
|
||||
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil {
|
||||
switch err.(type) {
|
||||
case BucketRemoteConnectionErr:
|
||||
@@ -1745,7 +1761,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseW
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
data, err := json.Marshal(target.ResetID)
|
||||
data, err := json.Marshal(rinfo)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user