// Copyright (c) 2015-2021 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package replication import ( "encoding/xml" "io" "sort" "strconv" "strings" ) var ( errReplicationTooManyRules = Errorf("Replication configuration allows a maximum of 1000 rules") errReplicationNoRule = Errorf("Replication configuration should have at least one rule") errReplicationUniquePriority = Errorf("Replication configuration has duplicate priority") errRoleArnMissingLegacy = Errorf("Missing required parameter `Role` in ReplicationConfiguration") errDestinationArnMissing = Errorf("Missing required parameter `Destination` in Replication rule") errInvalidSourceSelectionCriteria = Errorf("Invalid ReplicaModification status") errRoleArnPresentForMultipleTargets = Errorf("`Role` should be empty in ReplicationConfiguration for multiple targets") ) // Config - replication configuration specified in // https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html type Config struct { XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"` Rules []Rule `xml:"Rule" json:"Rules"` // RoleArn is being reused for MinIO replication ARN RoleArn string `xml:"Role" json:"Role"` } // Maximum 2MiB size per replication config. const maxReplicationConfigSize = 2 << 20 // ParseConfig parses ReplicationConfiguration from xml func ParseConfig(reader io.Reader) (*Config, error) { config := Config{} if err := xml.NewDecoder(io.LimitReader(reader, maxReplicationConfigSize)).Decode(&config); err != nil { return nil, err } // By default, set replica modification to enabled if unset. for i := range config.Rules { if len(config.Rules[i].SourceSelectionCriteria.ReplicaModifications.Status) == 0 { config.Rules[i].SourceSelectionCriteria = SourceSelectionCriteria{ ReplicaModifications: ReplicaModifications{ Status: Enabled, }, } } } return &config, nil } // Validate - validates the replication configuration func (c Config) Validate(bucket string, sameTarget bool) error { // replication config can't have more than 1000 rules if len(c.Rules) > 1000 { return errReplicationTooManyRules } // replication config should have at least one rule if len(c.Rules) == 0 { return errReplicationNoRule } // Validate all the rules in the replication config targetMap := make(map[string]struct{}) priorityMap := make(map[string]struct{}) var legacyArn bool for _, r := range c.Rules { if _, ok := targetMap[r.Destination.Bucket]; !ok { targetMap[r.Destination.Bucket] = struct{}{} } if err := r.Validate(bucket, sameTarget); err != nil { return err } if _, ok := priorityMap[strconv.Itoa(r.Priority)]; ok { return errReplicationUniquePriority } priorityMap[strconv.Itoa(r.Priority)] = struct{}{} if r.Destination.LegacyArn() { legacyArn = true } if c.RoleArn == "" && !r.Destination.TargetArn() { return errDestinationArnMissing } } // disallow combining old replication configuration which used RoleArn as target ARN with multiple // destination replication if c.RoleArn != "" && len(targetMap) > 1 { return errRoleArnPresentForMultipleTargets } // validate RoleArn if destination used legacy ARN format. if c.RoleArn == "" && legacyArn { return errRoleArnMissingLegacy } return nil } // Type - replication type enum type Type int // Types of replication const ( UnsetReplicationType Type = 0 + iota ObjectReplicationType DeleteReplicationType MetadataReplicationType HealReplicationType ExistingObjectReplicationType ResyncReplicationType ) // Valid returns true if replication type is set func (t Type) Valid() bool { return t > 0 } // ObjectOpts provides information to deduce whether replication // can be triggered on the resultant object. type ObjectOpts struct { Name string UserTags string VersionID string DeleteMarker bool SSEC bool OpType Type Replica bool ExistingObject bool TargetArn string } // FilterActionableRules returns the rules actions that need to be executed // after evaluating prefix/tag filtering func (c Config) FilterActionableRules(obj ObjectOpts) []Rule { if obj.Name == "" && obj.OpType != ResyncReplicationType { return nil } var rules []Rule for _, rule := range c.Rules { if rule.Status == Disabled { continue } if obj.TargetArn != "" && rule.Destination.ARN != obj.TargetArn && c.RoleArn != obj.TargetArn { continue } // Ignore other object level and prefix filters for resyncing target if obj.OpType == ResyncReplicationType { rules = append(rules, rule) continue } if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled { continue } if !strings.HasPrefix(obj.Name, rule.Prefix()) { continue } if rule.Filter.TestTags(strings.Split(obj.UserTags, "&")) { rules = append(rules, rule) } } sort.Slice(rules[:], func(i, j int) bool { return rules[i].Priority > rules[j].Priority && rules[i].Destination.String() == rules[j].Destination.String() }) return rules } // GetDestination returns destination bucket and storage class. func (c Config) GetDestination() Destination { if len(c.Rules) > 0 { return c.Rules[0].Destination } return Destination{} } // Replicate returns true if the object should be replicated. func (c Config) Replicate(obj ObjectOpts) bool { if obj.SSEC { return false } for _, rule := range c.FilterActionableRules(obj) { if rule.Status == Disabled { continue } if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled { return false } if obj.OpType == DeleteReplicationType { switch { case obj.VersionID != "": // check MinIO extension for versioned deletes return rule.DeleteReplication.Status == Enabled default: return rule.DeleteMarkerReplication.Status == Enabled } } // regular object/metadata replication return rule.MetadataReplicate(obj) } return false } // HasActiveRules - returns whether replication policy has active rules // Optionally a prefix can be supplied. // If recursive is specified the function will also return true if any level below the // prefix has active rules. If no prefix is specified recursive is effectively true. func (c Config) HasActiveRules(prefix string, recursive bool) bool { if len(c.Rules) == 0 { return false } for _, rule := range c.Rules { if rule.Status == Disabled { continue } if len(prefix) > 0 && len(rule.Filter.Prefix) > 0 { // incoming prefix must be in rule prefix if !recursive && !strings.HasPrefix(prefix, rule.Filter.Prefix) { continue } // If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix // does not match if recursive && !strings.HasPrefix(rule.Prefix(), prefix) && !strings.HasPrefix(prefix, rule.Prefix()) { continue } } return true } return false } // FilterTargetArns returns a slice of distinct target arns in the config func (c Config) FilterTargetArns(obj ObjectOpts) []string { var arns []string tgtsMap := make(map[string]struct{}) rules := c.FilterActionableRules(obj) for _, rule := range rules { if rule.Status == Disabled { continue } if c.RoleArn != "" { arns = append(arns, c.RoleArn) // use legacy RoleArn if present return arns } if _, ok := tgtsMap[rule.Destination.ARN]; !ok { tgtsMap[rule.Destination.ARN] = struct{}{} } } for k := range tgtsMap { arns = append(arns, k) } return arns }