// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"bytes"
	"context"
	"fmt"
	"net/http"
	"net/url"
	"regexp"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/minio/madmin-go/v3"
	"github.com/minio/minio/internal/bucket/replication"
	"github.com/minio/minio/internal/crypto"
	xhttp "github.com/minio/minio/internal/http"
)

//go:generate msgp -file=$GOFILE

// replicatedTargetInfo struct represents replication info on a target
type replicatedTargetInfo struct {
	Arn                   string
	Size                  int64
	Duration              time.Duration
	ReplicationAction     replicationAction // full or metadata only
	OpType                replication.Type  // whether incoming replication, existing object, healing etc..
	ReplicationStatus     replication.StatusType
	PrevReplicationStatus replication.StatusType
	VersionPurgeStatus    VersionPurgeStatusType
	ResyncTimestamp       string
	ReplicationResynced   bool // true only if resync attempted for this target
	endpoint              string
	secure                bool
	Err                   error // replication error if any
}

// Empty returns true for a target if arn is empty
func (rt replicatedTargetInfo) Empty() bool {
	return rt.Arn == ""
}

type replicatedInfos struct {
	ReplicationTimeStamp time.Time
	Targets              []replicatedTargetInfo
}

func (ri replicatedInfos) CompletedSize() (sz int64) {
	for _, t := range ri.Targets {
		if t.Empty() {
			continue
		}
		if t.ReplicationStatus == replication.Completed && t.PrevReplicationStatus != replication.Completed {
			sz += t.Size
		}
	}
	return sz
}

// ReplicationAttempted returns true if replication was attempted on any of the targets for the object version
// queued
func (ri replicatedInfos) ReplicationResynced() bool {
	for _, t := range ri.Targets {
		if t.Empty() || !t.ReplicationResynced {
			continue
		}
		return true
	}
	return false
}

func (ri replicatedInfos) ReplicationStatusInternal() string {
	b := new(bytes.Buffer)
	for _, t := range ri.Targets {
		if t.Empty() {
			continue
		}
		fmt.Fprintf(b, "%s=%s;", t.Arn, t.ReplicationStatus.String())
	}
	return b.String()
}

func (ri replicatedInfos) ReplicationStatus() replication.StatusType {
	if len(ri.Targets) == 0 {
		return replication.StatusType("")
	}
	completed := 0
	for _, v := range ri.Targets {
		switch v.ReplicationStatus {
		case replication.Failed:
			return replication.Failed
		case replication.Completed:
			completed++
		}
	}
	if completed == len(ri.Targets) {
		return replication.Completed
	}
	return replication.Pending
}

func (ri replicatedInfos) VersionPurgeStatus() VersionPurgeStatusType {
	if len(ri.Targets) == 0 {
		return VersionPurgeStatusType("")
	}
	completed := 0
	for _, v := range ri.Targets {
		switch v.VersionPurgeStatus {
		case Failed:
			return Failed
		case Complete:
			completed++
		}
	}
	if completed == len(ri.Targets) {
		return Complete
	}
	return Pending
}

func (ri replicatedInfos) VersionPurgeStatusInternal() string {
	b := new(bytes.Buffer)
	for _, t := range ri.Targets {
		if t.Empty() {
			continue
		}
		if t.VersionPurgeStatus.Empty() {
			continue
		}
		fmt.Fprintf(b, "%s=%s;", t.Arn, t.VersionPurgeStatus)
	}
	return b.String()
}

func (ri replicatedInfos) Action() replicationAction {
	for _, t := range ri.Targets {
		if t.Empty() {
			continue
		}
		// rely on replication action from target that actually performed replication now.
		if t.PrevReplicationStatus != replication.Completed {
			return t.ReplicationAction
		}
	}
	return replicateNone
}

var replStatusRegex = regexp.MustCompile(`([^=].*?)=([^,].*?);`)

// TargetReplicationStatus - returns replication status of a target
func (ri ReplicateObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) {
	repStatMatches := replStatusRegex.FindAllStringSubmatch(ri.ReplicationStatusInternal, -1)
	for _, repStatMatch := range repStatMatches {
		if len(repStatMatch) != 3 {
			return
		}
		if repStatMatch[1] == arn {
			return replication.StatusType(repStatMatch[2])
		}
	}
	return
}

// TargetReplicationStatus - returns replication status of a target
func (o ObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) {
	repStatMatches := replStatusRegex.FindAllStringSubmatch(o.ReplicationStatusInternal, -1)
	for _, repStatMatch := range repStatMatches {
		if len(repStatMatch) != 3 {
			return
		}
		if repStatMatch[1] == arn {
			return replication.StatusType(repStatMatch[2])
		}
	}
	return
}

type replicateTargetDecision struct {
	Replicate   bool   // Replicate to this target
	Synchronous bool   // Synchronous replication configured.
	Arn         string // ARN of replication target
	ID          string
}

func (t *replicateTargetDecision) String() string {
	return fmt.Sprintf("%t;%t;%s;%s", t.Replicate, t.Synchronous, t.Arn, t.ID)
}

func newReplicateTargetDecision(arn string, replicate bool, sync bool) replicateTargetDecision {
	d := replicateTargetDecision{
		Replicate:   replicate,
		Synchronous: sync,
		Arn:         arn,
	}
	return d
}

// ReplicateDecision represents replication decision for each target
type ReplicateDecision struct {
	targetsMap map[string]replicateTargetDecision
}

// ReplicateAny returns true if atleast one target qualifies for replication
func (d ReplicateDecision) ReplicateAny() bool {
	for _, t := range d.targetsMap {
		if t.Replicate {
			return true
		}
	}
	return false
}

// Synchronous returns true if atleast one target qualifies for synchronous replication
func (d ReplicateDecision) Synchronous() bool {
	for _, t := range d.targetsMap {
		if t.Synchronous {
			return true
		}
	}
	return false
}

func (d ReplicateDecision) String() string {
	b := new(bytes.Buffer)
	for key, value := range d.targetsMap {
		fmt.Fprintf(b, "%s=%s,", key, value.String())
	}
	return strings.TrimSuffix(b.String(), ",")
}

// Set updates ReplicateDecision with target's replication decision
func (d *ReplicateDecision) Set(t replicateTargetDecision) {
	if d.targetsMap == nil {
		d.targetsMap = make(map[string]replicateTargetDecision)
	}
	d.targetsMap[t.Arn] = t
}

// PendingStatus returns a stringified representation of internal replication status with all targets marked as `PENDING`
func (d ReplicateDecision) PendingStatus() string {
	b := new(bytes.Buffer)
	for _, k := range d.targetsMap {
		if k.Replicate {
			fmt.Fprintf(b, "%s=%s;", k.Arn, replication.Pending.String())
		}
	}
	return b.String()
}

// ResyncDecision is a struct representing a map with target's individual resync decisions
type ResyncDecision struct {
	targets map[string]ResyncTargetDecision
}

// Empty returns true if no targets with resync decision present
func (r ResyncDecision) Empty() bool {
	return r.targets == nil
}

func (r ResyncDecision) mustResync() bool {
	for _, v := range r.targets {
		if v.Replicate {
			return true
		}
	}
	return false
}

func (r ResyncDecision) mustResyncTarget(tgtArn string) bool {
	if r.targets == nil {
		return false
	}
	v, ok := r.targets[tgtArn]
	return ok && v.Replicate
}

// ResyncTargetDecision is struct that represents resync decision for this target
type ResyncTargetDecision struct {
	Replicate       bool
	ResetID         string
	ResetBeforeDate time.Time
}

var errInvalidReplicateDecisionFormat = fmt.Errorf("ReplicateDecision has invalid format")

// parse k-v pairs of target ARN to stringified ReplicateTargetDecision delimited by ',' into a
// ReplicateDecision struct
func parseReplicateDecision(ctx context.Context, bucket, s string) (r ReplicateDecision, err error) {
	r = ReplicateDecision{
		targetsMap: make(map[string]replicateTargetDecision),
	}
	if len(s) == 0 {
		return
	}
	for _, p := range strings.Split(s, ",") {
		if p == "" {
			continue
		}
		slc := strings.Split(p, "=")
		if len(slc) != 2 {
			return r, errInvalidReplicateDecisionFormat
		}
		tgtStr := strings.TrimSuffix(strings.TrimPrefix(slc[1], `"`), `"`)
		tgt := strings.Split(tgtStr, ";")
		if len(tgt) != 4 {
			return r, errInvalidReplicateDecisionFormat
		}
		r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: tgt[0] == "true", Synchronous: tgt[1] == "true", Arn: tgt[2], ID: tgt[3]}
	}
	return
}

// ReplicationState represents internal replication state
type ReplicationState struct {
	ReplicaTimeStamp          time.Time              // timestamp when last replica update was received
	ReplicaStatus             replication.StatusType // replica statusstringis
	DeleteMarker              bool                   // represents DeleteMarker replication state
	ReplicationTimeStamp      time.Time              // timestamp when last replication activity happened
	ReplicationStatusInternal string                 // stringified representation of all replication activity
	// VersionPurgeStatusInternal is internally in the format "arn1=PENDING;arn2=COMMPLETED;"
	VersionPurgeStatusInternal string                            // stringified representation of all version purge statuses
	ReplicateDecisionStr       string                            // stringified representation of replication decision for each target
	Targets                    map[string]replication.StatusType // map of ARN->replication status for ongoing replication activity
	PurgeTargets               map[string]VersionPurgeStatusType // map of ARN->VersionPurgeStatus for all the targets
	ResetStatusesMap           map[string]string                 // map of ARN-> stringified reset id and timestamp for all the targets
}

// Equal returns true if replication state is identical for version purge statuses and (replica)tion statuses.
func (rs *ReplicationState) Equal(o ReplicationState) bool {
	return rs.ReplicaStatus == o.ReplicaStatus &&
		rs.ReplicationStatusInternal == o.ReplicationStatusInternal &&
		rs.VersionPurgeStatusInternal == o.VersionPurgeStatusInternal
}

// CompositeReplicationStatus returns overall replication status for the object version being replicated.
func (rs *ReplicationState) CompositeReplicationStatus() (st replication.StatusType) {
	switch {
	case rs.ReplicationStatusInternal != "":
		switch replication.StatusType(rs.ReplicationStatusInternal) {
		case replication.Pending, replication.Completed, replication.Failed, replication.Replica: // for backward compatibility
			return replication.StatusType(rs.ReplicationStatusInternal)
		default:
			replStatus := getCompositeReplicationStatus(rs.Targets)
			// return REPLICA status if replica received timestamp is later than replication timestamp
			// provided object replication completed for all targets.
			if rs.ReplicaTimeStamp.Equal(timeSentinel) || rs.ReplicaTimeStamp.IsZero() {
				return replStatus
			}
			if replStatus == replication.Completed && rs.ReplicaTimeStamp.After(rs.ReplicationTimeStamp) {
				return rs.ReplicaStatus
			}
			return replStatus
		}
	case !rs.ReplicaStatus.Empty():
		return rs.ReplicaStatus
	default:
		return
	}
}

// CompositeVersionPurgeStatus returns overall replication purge status for the permanent delete being replicated.
func (rs *ReplicationState) CompositeVersionPurgeStatus() VersionPurgeStatusType {
	switch VersionPurgeStatusType(rs.VersionPurgeStatusInternal) {
	case Pending, Complete, Failed: // for backward compatibility
		return VersionPurgeStatusType(rs.VersionPurgeStatusInternal)
	default:
		return getCompositeVersionPurgeStatus(rs.PurgeTargets)
	}
}

// TargetState returns replicatedInfos struct initialized with the previous state of replication
func (rs *ReplicationState) targetState(arn string) (r replicatedTargetInfo) {
	return replicatedTargetInfo{
		Arn:                   arn,
		PrevReplicationStatus: rs.Targets[arn],
		VersionPurgeStatus:    rs.PurgeTargets[arn],
		ResyncTimestamp:       rs.ResetStatusesMap[arn],
	}
}

// getReplicationState returns replication state using target replicated info for the targets
func getReplicationState(rinfos replicatedInfos, prevState ReplicationState, vID string) ReplicationState {
	rs := ReplicationState{
		ReplicateDecisionStr: prevState.ReplicateDecisionStr,
		ResetStatusesMap:     prevState.ResetStatusesMap,
		ReplicaTimeStamp:     prevState.ReplicaTimeStamp,
		ReplicaStatus:        prevState.ReplicaStatus,
	}
	var replStatuses, vpurgeStatuses string
	replStatuses = rinfos.ReplicationStatusInternal()
	rs.Targets = replicationStatusesMap(replStatuses)
	rs.ReplicationStatusInternal = replStatuses
	rs.ReplicationTimeStamp = rinfos.ReplicationTimeStamp

	vpurgeStatuses = rinfos.VersionPurgeStatusInternal()
	rs.VersionPurgeStatusInternal = vpurgeStatuses
	rs.PurgeTargets = versionPurgeStatusesMap(vpurgeStatuses)

	for _, rinfo := range rinfos.Targets {
		if rinfo.ResyncTimestamp != "" {
			rs.ResetStatusesMap[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp
		}
	}
	return rs
}

// constructs a replication status map from string representation
func replicationStatusesMap(s string) map[string]replication.StatusType {
	targets := make(map[string]replication.StatusType)
	repStatMatches := replStatusRegex.FindAllStringSubmatch(s, -1)
	for _, repStatMatch := range repStatMatches {
		if len(repStatMatch) != 3 {
			continue
		}
		status := replication.StatusType(repStatMatch[2])
		targets[repStatMatch[1]] = status
	}
	return targets
}

// constructs a version purge status map from string representation
func versionPurgeStatusesMap(s string) map[string]VersionPurgeStatusType {
	targets := make(map[string]VersionPurgeStatusType)
	purgeStatusMatches := replStatusRegex.FindAllStringSubmatch(s, -1)
	for _, purgeStatusMatch := range purgeStatusMatches {
		if len(purgeStatusMatch) != 3 {
			continue
		}
		targets[purgeStatusMatch[1]] = VersionPurgeStatusType(purgeStatusMatch[2])
	}
	return targets
}

// return the overall replication status for all the targets
func getCompositeReplicationStatus(m map[string]replication.StatusType) replication.StatusType {
	if len(m) == 0 {
		return replication.StatusType("")
	}
	completed := 0
	for _, v := range m {
		switch v {
		case replication.Failed:
			return replication.Failed
		case replication.Completed:
			completed++
		}
	}
	if completed == len(m) {
		return replication.Completed
	}
	return replication.Pending
}

// return the overall version purge status for all the targets
func getCompositeVersionPurgeStatus(m map[string]VersionPurgeStatusType) VersionPurgeStatusType {
	if len(m) == 0 {
		return VersionPurgeStatusType("")
	}
	completed := 0
	for _, v := range m {
		switch v {
		case Failed:
			return Failed
		case Complete:
			completed++
		}
	}
	if completed == len(m) {
		return Complete
	}
	return Pending
}

// getHealReplicateObjectInfo returns info needed by heal replication in ReplicateObjectInfo
func getHealReplicateObjectInfo(oi ObjectInfo, rcfg replicationConfig) ReplicateObjectInfo {
	userDefined := cloneMSS(oi.UserDefined)
	if rcfg.Config != nil && rcfg.Config.RoleArn != "" {
		// For backward compatibility of objects pending/failed replication.
		// Save replication related statuses in the new internal representation for
		// compatible behavior.
		if !oi.ReplicationStatus.Empty() {
			oi.ReplicationStatusInternal = fmt.Sprintf("%s=%s;", rcfg.Config.RoleArn, oi.ReplicationStatus)
		}
		if !oi.VersionPurgeStatus.Empty() {
			oi.VersionPurgeStatusInternal = fmt.Sprintf("%s=%s;", rcfg.Config.RoleArn, oi.VersionPurgeStatus)
		}
		for k, v := range userDefined {
			if strings.EqualFold(k, ReservedMetadataPrefixLower+ReplicationReset) {
				delete(userDefined, k)
				userDefined[targetResetHeader(rcfg.Config.RoleArn)] = v
			}
		}
	}

	var dsc ReplicateDecision
	if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
		dsc = checkReplicateDelete(GlobalContext, oi.Bucket, ObjectToDelete{
			ObjectV: ObjectV{
				ObjectName: oi.Name,
				VersionID:  oi.VersionID,
			},
		}, oi, ObjectOptions{
			Versioned:        globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name),
			VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name),
		}, nil)
	} else {
		dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(userDefined, oi.UserTags, "", replication.HealReplicationType, ObjectOptions{}))
	}

	tgtStatuses := replicationStatusesMap(oi.ReplicationStatusInternal)
	purgeStatuses := versionPurgeStatusesMap(oi.VersionPurgeStatusInternal)
	existingObjResync := rcfg.Resync(GlobalContext, oi, dsc, tgtStatuses)
	tm, _ := time.Parse(time.RFC3339Nano, userDefined[ReservedMetadataPrefixLower+ReplicationTimestamp])
	rstate := oi.ReplicationState()
	rstate.ReplicateDecisionStr = dsc.String()
	asz, _ := oi.GetActualSize()

	return ReplicateObjectInfo{
		Name:                       oi.Name,
		Size:                       oi.Size,
		ActualSize:                 asz,
		Bucket:                     oi.Bucket,
		VersionID:                  oi.VersionID,
		ETag:                       oi.ETag,
		ModTime:                    oi.ModTime,
		ReplicationStatus:          oi.ReplicationStatus,
		ReplicationStatusInternal:  oi.ReplicationStatusInternal,
		DeleteMarker:               oi.DeleteMarker,
		VersionPurgeStatusInternal: oi.VersionPurgeStatusInternal,
		VersionPurgeStatus:         oi.VersionPurgeStatus,

		ReplicationState:     rstate,
		OpType:               replication.HealReplicationType,
		Dsc:                  dsc,
		ExistingObjResync:    existingObjResync,
		TargetStatuses:       tgtStatuses,
		TargetPurgeStatuses:  purgeStatuses,
		ReplicationTimestamp: tm,
		SSEC:                 crypto.SSEC.IsEncrypted(oi.UserDefined),
		UserTags:             oi.UserTags,
	}
}

// ReplicationState - returns replication state using other internal replication metadata in ObjectInfo
func (o ObjectInfo) ReplicationState() ReplicationState {
	rs := ReplicationState{
		ReplicationStatusInternal:  o.ReplicationStatusInternal,
		VersionPurgeStatusInternal: o.VersionPurgeStatusInternal,
		ReplicateDecisionStr:       o.replicationDecision,
		Targets:                    make(map[string]replication.StatusType),
		PurgeTargets:               make(map[string]VersionPurgeStatusType),
		ResetStatusesMap:           make(map[string]string),
	}
	rs.Targets = replicationStatusesMap(o.ReplicationStatusInternal)
	rs.PurgeTargets = versionPurgeStatusesMap(o.VersionPurgeStatusInternal)
	for k, v := range o.UserDefined {
		if strings.HasPrefix(k, ReservedMetadataPrefixLower+ReplicationReset) {
			arn := strings.TrimPrefix(k, fmt.Sprintf("%s-", ReservedMetadataPrefixLower+ReplicationReset))
			rs.ResetStatusesMap[arn] = v
		}
	}
	return rs
}

// ReplicationState returns replication state using other internal replication metadata in ObjectToDelete
func (o ObjectToDelete) ReplicationState() ReplicationState {
	r := ReplicationState{
		ReplicationStatusInternal:  o.DeleteMarkerReplicationStatus,
		VersionPurgeStatusInternal: o.VersionPurgeStatuses,
		ReplicateDecisionStr:       o.ReplicateDecisionStr,
	}

	r.Targets = replicationStatusesMap(o.DeleteMarkerReplicationStatus)
	r.PurgeTargets = versionPurgeStatusesMap(o.VersionPurgeStatuses)
	return r
}

// VersionPurgeStatus returns a composite version purge status across targets
func (d *DeletedObject) VersionPurgeStatus() VersionPurgeStatusType {
	return d.ReplicationState.CompositeVersionPurgeStatus()
}

// DeleteMarkerReplicationStatus return composite replication status of delete marker across targets
func (d *DeletedObject) DeleteMarkerReplicationStatus() replication.StatusType {
	return d.ReplicationState.CompositeReplicationStatus()
}

// ResyncTargetsInfo holds a slice of targets with resync info per target
type ResyncTargetsInfo struct {
	Targets []ResyncTarget `json:"target,omitempty"`
}

// ResyncTarget is a struct representing the Target reset ID where target is identified by its Arn
type ResyncTarget struct {
	Arn       string    `json:"arn"`
	ResetID   string    `json:"resetid"`
	StartTime time.Time `json:"startTime"`
	EndTime   time.Time `json:"endTime"`
	// Status of resync operation
	ResyncStatus string `json:"resyncStatus,omitempty"`
	// Completed size in bytes
	ReplicatedSize int64 `json:"completedReplicationSize"`
	// Failed size in bytes
	FailedSize int64 `json:"failedReplicationSize"`
	// Total number of failed operations
	FailedCount int64 `json:"failedReplicationCount"`
	// Total number of failed operations
	ReplicatedCount int64 `json:"replicationCount"`
	// Last bucket/object replicated.
	Bucket string `json:"bucket,omitempty"`
	Object string `json:"object,omitempty"`
}

// VersionPurgeStatusType represents status of a versioned delete or permanent delete w.r.t bucket replication
type VersionPurgeStatusType string

const (
	// Pending - versioned delete replication is pending.
	Pending VersionPurgeStatusType = "PENDING"

	// Complete - versioned delete replication is now complete, erase version on disk.
	Complete VersionPurgeStatusType = "COMPLETE"

	// Failed - versioned delete replication failed.
	Failed VersionPurgeStatusType = "FAILED"
)

// Empty returns true if purge status was not set.
func (v VersionPurgeStatusType) Empty() bool {
	return string(v) == ""
}

// Pending returns true if the version is pending purge.
func (v VersionPurgeStatusType) Pending() bool {
	return v == Pending || v == Failed
}

type replicationResyncer struct {
	// map of bucket to their resync status
	statusMap      map[string]BucketReplicationResyncStatus
	workerSize     int
	resyncCancelCh chan struct{}
	workerCh       chan struct{}
	sync.RWMutex
}

const (
	replicationDir      = ".replication"
	resyncFileName      = "resync.bin"
	resyncMetaFormat    = 1
	resyncMetaVersionV1 = 1
	resyncMetaVersion   = resyncMetaVersionV1
)

type resyncOpts struct {
	bucket       string
	arn          string
	resyncID     string
	resyncBefore time.Time
}

// ResyncStatusType status of resync operation
type ResyncStatusType int

const (
	// NoResync - no resync in progress
	NoResync ResyncStatusType = iota
	// ResyncPending - resync pending
	ResyncPending
	// ResyncCanceled - resync canceled
	ResyncCanceled
	// ResyncStarted -  resync in progress
	ResyncStarted
	// ResyncCompleted -  resync finished
	ResyncCompleted
	// ResyncFailed -  resync failed
	ResyncFailed
)

func (rt ResyncStatusType) isValid() bool {
	return rt != NoResync
}

func (rt ResyncStatusType) String() string {
	switch rt {
	case ResyncStarted:
		return "Ongoing"
	case ResyncCompleted:
		return "Completed"
	case ResyncFailed:
		return "Failed"
	case ResyncPending:
		return "Pending"
	case ResyncCanceled:
		return "Canceled"
	default:
		return ""
	}
}

// TargetReplicationResyncStatus status of resync of bucket for a specific target
type TargetReplicationResyncStatus struct {
	StartTime  time.Time `json:"startTime" msg:"st"`
	LastUpdate time.Time `json:"lastUpdated" msg:"lst"`
	// Resync ID assigned to this reset
	ResyncID string `json:"resyncID" msg:"id"`
	// ResyncBeforeDate - resync all objects created prior to this date
	ResyncBeforeDate time.Time `json:"resyncBeforeDate" msg:"rdt"`
	// Status of resync operation
	ResyncStatus ResyncStatusType `json:"resyncStatus" msg:"rst"`
	// Failed size in bytes
	FailedSize int64 `json:"failedReplicationSize"  msg:"fs"`
	// Total number of failed operations
	FailedCount int64 `json:"failedReplicationCount"  msg:"frc"`
	// Completed size in bytes
	ReplicatedSize int64 `json:"completedReplicationSize"  msg:"rs"`
	// Total number of failed operations
	ReplicatedCount int64 `json:"replicationCount"  msg:"rrc"`
	// Last bucket/object replicated.
	Bucket string `json:"-" msg:"bkt"`
	Object string `json:"-" msg:"obj"`
	Error  error  `json:"-" msg:"-"`
}

// BucketReplicationResyncStatus captures current replication resync status
type BucketReplicationResyncStatus struct {
	Version int `json:"version" msg:"v"`
	// map of remote arn to their resync status for a bucket
	TargetsMap map[string]TargetReplicationResyncStatus `json:"resyncMap,omitempty" msg:"brs"`
	ID         int                                      `json:"id" msg:"id"`
	LastUpdate time.Time                                `json:"lastUpdate" msg:"lu"`
}

func (rs *BucketReplicationResyncStatus) cloneTgtStats() (m map[string]TargetReplicationResyncStatus) {
	m = make(map[string]TargetReplicationResyncStatus)
	for arn, st := range rs.TargetsMap {
		m[arn] = st
	}
	return
}

func newBucketResyncStatus(bucket string) BucketReplicationResyncStatus {
	return BucketReplicationResyncStatus{
		TargetsMap: make(map[string]TargetReplicationResyncStatus),
		Version:    resyncMetaVersion,
	}
}

var contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`)

// parse size from content-range header
func parseSizeFromContentRange(h http.Header) (sz int64, err error) {
	cr := h.Get(xhttp.ContentRange)
	if cr == "" {
		return sz, fmt.Errorf("Content-Range not set")
	}
	parts := contentRangeRegexp.FindStringSubmatch(cr)
	if len(parts) != 4 {
		return sz, fmt.Errorf("invalid Content-Range header %s", cr)
	}
	if parts[3] == "*" {
		return -1, nil
	}
	var usz uint64
	usz, err = strconv.ParseUint(parts[3], 10, 64)
	if err != nil {
		return sz, err
	}
	return int64(usz), nil
}

func extractReplicateDiffOpts(q url.Values) (opts madmin.ReplDiffOpts) {
	opts.Verbose = q.Get("verbose") == "true"
	opts.ARN = q.Get("arn")
	opts.Prefix = q.Get("prefix")
	return
}

const (
	replicationMRFDir = bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + "mrf"
	mrfMetaFormat     = 1
	mrfMetaVersionV1  = 1
	mrfMetaVersion    = mrfMetaVersionV1
)

// MRFReplicateEntry mrf entry to save to disk
type MRFReplicateEntry struct {
	Bucket     string `json:"bucket" msg:"b"`
	Object     string `json:"object" msg:"o"`
	versionID  string `json:"-"`
	RetryCount int    `json:"retryCount" msg:"rc"`
	sz         int64  `json:"-"`
}

// MRFReplicateEntries has the map of MRF entries to save to disk
type MRFReplicateEntries struct {
	Entries map[string]MRFReplicateEntry `json:"entries" msg:"e"`
	Version int                          `json:"version" msg:"v"`
}

// ToMRFEntry returns the relevant info needed by MRF
func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry {
	return MRFReplicateEntry{
		Bucket:     ri.Bucket,
		Object:     ri.Name,
		versionID:  ri.VersionID,
		sz:         ri.Size,
		RetryCount: int(ri.RetryCount),
	}
}