minio/internal/bucket/replication/replication.go
Poorna 6b9fd256e1
Persist in-memory replication stats to disk (#15594)
to avoid relying on scanner-calculated replication metrics.
This will improve the accuracy of the replication stats reported.

This PR also adds on to #15556 by handing replication
traffic that could not be queued by available workers to the 
MRF queue so that entries in `PENDING` status are healed faster.
2022-09-12 12:40:02 -07:00

298 lines
8.9 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package replication
import (
"encoding/xml"
"io"
"sort"
"strconv"
"strings"
)
var (
errReplicationTooManyRules = Errorf("Replication configuration allows a maximum of 1000 rules")
errReplicationNoRule = Errorf("Replication configuration should have at least one rule")
errReplicationUniquePriority = Errorf("Replication configuration has duplicate priority")
errRoleArnMissingLegacy = Errorf("Missing required parameter `Role` in ReplicationConfiguration")
errDestinationArnMissing = Errorf("Missing required parameter `Destination` in Replication rule")
errInvalidSourceSelectionCriteria = Errorf("Invalid ReplicaModification status")
errRoleArnPresentForMultipleTargets = Errorf("`Role` should be empty in ReplicationConfiguration for multiple targets")
)
// Config - replication configuration specified in
// https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
type Config struct {
XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"`
Rules []Rule `xml:"Rule" json:"Rules"`
// RoleArn is being reused for MinIO replication ARN
RoleArn string `xml:"Role" json:"Role"`
}
// Maximum 2MiB size per replication config.
const maxReplicationConfigSize = 2 << 20
// ParseConfig parses ReplicationConfiguration from xml
func ParseConfig(reader io.Reader) (*Config, error) {
config := Config{}
if err := xml.NewDecoder(io.LimitReader(reader, maxReplicationConfigSize)).Decode(&config); err != nil {
return nil, err
}
// By default, set replica modification to enabled if unset.
for i := range config.Rules {
if len(config.Rules[i].SourceSelectionCriteria.ReplicaModifications.Status) == 0 {
config.Rules[i].SourceSelectionCriteria = SourceSelectionCriteria{
ReplicaModifications: ReplicaModifications{
Status: Enabled,
},
}
}
// Default DeleteReplication to disabled if unset.
if len(config.Rules[i].DeleteReplication.Status) == 0 {
config.Rules[i].DeleteReplication = DeleteReplication{
Status: Disabled,
}
}
}
return &config, nil
}
// Validate - validates the replication configuration
func (c Config) Validate(bucket string, sameTarget bool) error {
// replication config can't have more than 1000 rules
if len(c.Rules) > 1000 {
return errReplicationTooManyRules
}
// replication config should have at least one rule
if len(c.Rules) == 0 {
return errReplicationNoRule
}
// Validate all the rules in the replication config
targetMap := make(map[string]struct{})
priorityMap := make(map[string]struct{})
var legacyArn bool
for _, r := range c.Rules {
if _, ok := targetMap[r.Destination.Bucket]; !ok {
targetMap[r.Destination.Bucket] = struct{}{}
}
if err := r.Validate(bucket, sameTarget); err != nil {
return err
}
if _, ok := priorityMap[strconv.Itoa(r.Priority)]; ok {
return errReplicationUniquePriority
}
priorityMap[strconv.Itoa(r.Priority)] = struct{}{}
if r.Destination.LegacyArn() {
legacyArn = true
}
if c.RoleArn == "" && !r.Destination.TargetArn() {
return errDestinationArnMissing
}
}
// disallow combining old replication configuration which used RoleArn as target ARN with multiple
// destination replication
if c.RoleArn != "" && len(targetMap) > 1 {
return errRoleArnPresentForMultipleTargets
}
// validate RoleArn if destination used legacy ARN format.
if c.RoleArn == "" && legacyArn {
return errRoleArnMissingLegacy
}
return nil
}
// Type - replication type enum
type Type int
// Types of replication
const (
UnsetReplicationType Type = 0 + iota
ObjectReplicationType
DeleteReplicationType
MetadataReplicationType
HealReplicationType
ExistingObjectReplicationType
ResyncReplicationType
AllReplicationType
)
// Valid returns true if replication type is set
func (t Type) Valid() bool {
return t > 0
}
// IsDataReplication returns true if content being replicated
func (t Type) IsDataReplication() bool {
switch t {
case ObjectReplicationType, HealReplicationType, ExistingObjectReplicationType:
return true
}
return false
}
// ObjectOpts provides information to deduce whether replication
// can be triggered on the resultant object.
type ObjectOpts struct {
Name string
UserTags string
VersionID string
DeleteMarker bool
SSEC bool
OpType Type
Replica bool
ExistingObject bool
TargetArn string
}
// HasExistingObjectReplication returns true if any of the rule returns 'ExistingObjects' replication.
func (c Config) HasExistingObjectReplication(arn string) bool {
for _, rule := range c.Rules {
if rule.Destination.ARN == arn || c.RoleArn == arn {
if rule.ExistingObjectReplication.Status == Enabled {
return true
}
}
}
return false
}
// FilterActionableRules returns the rules actions that need to be executed
// after evaluating prefix/tag filtering
func (c Config) FilterActionableRules(obj ObjectOpts) []Rule {
if obj.Name == "" && !(obj.OpType == ResyncReplicationType || obj.OpType == AllReplicationType) {
return nil
}
var rules []Rule
for _, rule := range c.Rules {
if rule.Status == Disabled {
continue
}
if obj.TargetArn != "" && rule.Destination.ARN != obj.TargetArn && c.RoleArn != obj.TargetArn {
continue
}
// Ignore other object level and prefix filters for resyncing target/listing bucket targets
if obj.OpType == ResyncReplicationType || obj.OpType == AllReplicationType {
rules = append(rules, rule)
continue
}
if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled {
continue
}
if !strings.HasPrefix(obj.Name, rule.Prefix()) {
continue
}
if rule.Filter.TestTags(strings.Split(obj.UserTags, "&")) {
rules = append(rules, rule)
}
}
sort.Slice(rules, func(i, j int) bool {
return rules[i].Priority > rules[j].Priority && rules[i].Destination.String() == rules[j].Destination.String()
})
return rules
}
// GetDestination returns destination bucket and storage class.
func (c Config) GetDestination() Destination {
if len(c.Rules) > 0 {
return c.Rules[0].Destination
}
return Destination{}
}
// Replicate returns true if the object should be replicated.
func (c Config) Replicate(obj ObjectOpts) bool {
if obj.SSEC {
return false
}
for _, rule := range c.FilterActionableRules(obj) {
if rule.Status == Disabled {
continue
}
if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled {
return false
}
if obj.OpType == DeleteReplicationType {
switch {
case obj.VersionID != "":
// check MinIO extension for versioned deletes
return rule.DeleteReplication.Status == Enabled
default:
return rule.DeleteMarkerReplication.Status == Enabled
}
} // regular object/metadata replication
return rule.MetadataReplicate(obj)
}
return false
}
// HasActiveRules - returns whether replication policy has active rules
// Optionally a prefix can be supplied.
// If recursive is specified the function will also return true if any level below the
// prefix has active rules. If no prefix is specified recursive is effectively true.
func (c Config) HasActiveRules(prefix string, recursive bool) bool {
if len(c.Rules) == 0 {
return false
}
for _, rule := range c.Rules {
if rule.Status == Disabled {
continue
}
if len(prefix) > 0 && len(rule.Filter.Prefix) > 0 {
// incoming prefix must be in rule prefix
if !recursive && !strings.HasPrefix(prefix, rule.Filter.Prefix) {
continue
}
// If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix
// does not match
if recursive && !strings.HasPrefix(rule.Prefix(), prefix) && !strings.HasPrefix(prefix, rule.Prefix()) {
continue
}
}
return true
}
return false
}
// FilterTargetArns returns a slice of distinct target arns in the config
func (c Config) FilterTargetArns(obj ObjectOpts) []string {
var arns []string
tgtsMap := make(map[string]struct{})
rules := c.FilterActionableRules(obj)
for _, rule := range rules {
if rule.Status == Disabled {
continue
}
if c.RoleArn != "" {
arns = append(arns, c.RoleArn) // use legacy RoleArn if present
return arns
}
if _, ok := tgtsMap[rule.Destination.ARN]; !ok {
tgtsMap[rule.Destination.ARN] = struct{}{}
}
}
for k := range tgtsMap {
arns = append(arns, k)
}
return arns
}