feat: add API to return list of objects waiting to be replicated (#15091)

This commit is contained in:
Poorna 2022-07-21 11:05:44 -07:00 committed by GitHub
parent be8c4cb24a
commit cab8d3d568
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1423 additions and 18 deletions

View File

@ -383,6 +383,7 @@ func (a adminAPIHandlers) ExportBucketMetadataHandler(w http.ResponseWriter, r *
if objectAPI == nil {
return
}
var (
buckets []BucketInfo
err error
@ -1060,3 +1061,79 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
}
}
}
// ReplicationDiffHandler - POST returns info on unreplicated versions for a remote target ARN
// to the connected HTTP client. This is a MinIO only extension
func (a adminAPIHandlers) ReplicationDiffHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ReplicationDiff")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
// check if user has permissions to perform this operation
if s3Error := checkRequestAuthType(ctx, r, policy.ListBucketVersionsAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
// Check if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
opts := extractReplicateDiffOpts(r.Form)
if opts.ARN != "" {
tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, opts.ARN)
if tgt.Empty() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, fmt.Errorf("invalid arn : '%s'", opts.ARN)), r.URL)
return
}
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
diffCh, err := getReplicationDiff(ctx, objectAPI, bucket, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
enc := json.NewEncoder(w)
for {
select {
case entry, ok := <-diffCh:
if !ok {
return
}
if err := enc.Encode(entry); err != nil {
return
}
if len(diffCh) == 0 {
// Flush if nothing is queued
w.(http.Flusher).Flush()
}
case <-keepAliveTicker.C:
if len(diffCh) > 0 {
continue
}
if _, err := w.Write([]byte(" ")); err != nil {
return
}
w.(http.Flusher).Flush()
case <-ctx.Done():
return
}
}
}

View File

@ -202,6 +202,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
// RemoveRemoteTargetHandler
adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-remote-target").HandlerFunc(
gz(httpTraceHdrs(adminAPI.RemoveRemoteTargetHandler))).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}")
// ReplicationDiff - MinIO extension API
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/replication/diff").HandlerFunc(
gz(httpTraceHdrs(adminAPI.ReplicationDiffHandler))).Queries("bucket", "{bucket:.*}")
// Bucket migration operations
// ExportBucketMetaHandler

View File

@ -21,12 +21,14 @@ import (
"bytes"
"fmt"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
)
@ -503,6 +505,8 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl
}
var dsc ReplicateDecision
var tgtStatuses map[string]replication.StatusType
var purgeStatuses map[string]VersionPurgeStatusType
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
dsc = checkReplicateDelete(GlobalContext, oi.Bucket, ObjectToDelete{
ObjectV: ObjectV{
@ -516,15 +520,17 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl
}, replication.HealReplicationType, ObjectOptions{}))
}
tgtStatuses = replicationStatusesMap(oi.ReplicationStatusInternal)
purgeStatuses = versionPurgeStatusesMap(oi.VersionPurgeStatusInternal)
existingObjResync := rcfg.Resync(GlobalContext, oi, &dsc, tgtStatuses)
tm, _ := time.Parse(time.RFC3339Nano, oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp])
return ReplicateObjectInfo{
ObjectInfo: oi,
OpType: replication.HealReplicationType,
Dsc: dsc,
ExistingObjResync: existingObjResync,
TargetStatuses: tgtStatuses,
TargetPurgeStatuses: purgeStatuses,
ReplicationTimestamp: tm,
}
}
@ -724,3 +730,10 @@ func parseSizeFromContentRange(h http.Header) (sz int64, err error) {
}
return int64(usz), nil
}
func extractReplicateDiffOpts(q url.Values) (opts madmin.ReplDiffOpts) {
opts.Verbose = q.Get("verbose") == "true"
opts.ARN = q.Get("arn")
opts.Prefix = q.Get("prefix")
return
}

View File

@ -479,6 +479,10 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
eventName = event.ObjectReplicationFailed
}
drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID)
if replicationStatus != prevStatus {
drs.ReplicationTimeStamp = UTCNow()
}
dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
VersionID: versionID,
MTime: dobj.DeleteMarkerMTime.Time,
@ -2295,3 +2299,89 @@ func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationR
configFile := path.Join(bucketMetaPrefix, bucket, replicationDir, resyncFileName)
return saveConfig(ctx, objectAPI, configFile, buf)
}
// getReplicationDiff returns unreplicated objects in a channel
func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (diffCh chan madmin.DiffInfo, err error) {
objInfoCh := make(chan ObjectInfo)
if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil {
logger.LogIf(ctx, err)
return diffCh, err
}
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return diffCh, err
}
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return diffCh, err
}
rcfg := replicationConfig{
Config: cfg,
remotes: tgts,
}
diffCh = make(chan madmin.DiffInfo, 4000)
go func() {
defer close(diffCh)
for obj := range objInfoCh {
// Ignore object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension.
if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) {
continue
}
roi := getHealReplicateObjectInfo(obj, rcfg)
switch roi.ReplicationStatus {
case replication.Completed, replication.Replica:
if !opts.Verbose {
continue
}
fallthrough
default:
// ignore pre-existing objects that don't satisfy replication rule(s)
if roi.ReplicationStatus.Empty() && !roi.ExistingObjResync.mustResync() {
continue
}
tgtsMap := make(map[string]madmin.TgtDiffInfo)
for arn, st := range roi.TargetStatuses {
if opts.ARN == "" || opts.ARN == arn {
if !opts.Verbose && (st == replication.Completed || st == replication.Replica) {
continue
}
tgtsMap[arn] = madmin.TgtDiffInfo{
ReplicationStatus: st.String(),
}
}
}
for arn, st := range roi.TargetPurgeStatuses {
if opts.ARN == "" || opts.ARN == arn {
if !opts.Verbose && st == Complete {
continue
}
t, ok := tgtsMap[arn]
if !ok {
t = madmin.TgtDiffInfo{}
}
t.DeleteReplicationStatus = string(st)
tgtsMap[arn] = t
}
}
select {
case diffCh <- madmin.DiffInfo{
Object: obj.Name,
VersionID: obj.VersionID,
LastModified: obj.ModTime,
IsDeleteMarker: obj.DeleteMarker,
ReplicationStatus: string(roi.ReplicationStatus),
DeleteReplicationStatus: string(roi.VersionPurgeStatus),
ReplicationTimestamp: roi.ReplicationTimestamp,
Targets: tgtsMap,
}:
case <-ctx.Done():
return
}
}
}
}()
return diffCh, nil
}

View File

@ -269,6 +269,8 @@ type ReplicateObjectInfo struct {
ExistingObjResync ResyncDecision
TargetArn string
TargetStatuses map[string]replication.StatusType
TargetPurgeStatuses map[string]VersionPurgeStatusType
ReplicationTimestamp time.Time
}
// MultipartInfo captures metadata information about the uploadId

View File

@ -448,8 +448,12 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error)
VersionID: versionID,
Deleted: true,
}
fi.ReplicationState = GetInternalReplicationState(j.MetaSys)
fi.Metadata = make(map[string]string, len(j.MetaSys))
for k, v := range j.MetaSys {
fi.Metadata[k] = string(v)
}
fi.ReplicationState = GetInternalReplicationState(j.MetaSys)
if j.FreeVersion() {
fi.SetTierFreeVersion()
fi.TransitionTier = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionTier])
@ -1220,10 +1224,10 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, error) {
switch fi.DeleteMarkerReplicationStatus() {
case replication.Replica:
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(string(fi.ReplicationState.ReplicaStatus))
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(http.TimeFormat))
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(time.RFC3339Nano))
default:
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationStatus] = []byte(fi.ReplicationState.ReplicationStatusInternal)
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(http.TimeFormat))
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(time.RFC3339Nano))
}
}
if !fi.VersionPurgeStatus().Empty() {

1216
go.sum

File diff suppressed because it is too large Load Diff