mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
298 lines
8.9 KiB
Go
298 lines
8.9 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package replication
|
|
|
|
import (
|
|
"encoding/xml"
|
|
"io"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
var (
|
|
errReplicationTooManyRules = Errorf("Replication configuration allows a maximum of 1000 rules")
|
|
errReplicationNoRule = Errorf("Replication configuration should have at least one rule")
|
|
errReplicationUniquePriority = Errorf("Replication configuration has duplicate priority")
|
|
errRoleArnMissingLegacy = Errorf("Missing required parameter `Role` in ReplicationConfiguration")
|
|
errDestinationArnMissing = Errorf("Missing required parameter `Destination` in Replication rule")
|
|
errInvalidSourceSelectionCriteria = Errorf("Invalid ReplicaModification status")
|
|
errRoleArnPresentForMultipleTargets = Errorf("`Role` should be empty in ReplicationConfiguration for multiple targets")
|
|
)
|
|
|
|
// Config - replication configuration specified in
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
|
|
type Config struct {
|
|
XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"`
|
|
Rules []Rule `xml:"Rule" json:"Rules"`
|
|
// RoleArn is being reused for MinIO replication ARN
|
|
RoleArn string `xml:"Role" json:"Role"`
|
|
}
|
|
|
|
// Maximum 2MiB size per replication config.
|
|
const maxReplicationConfigSize = 2 << 20
|
|
|
|
// ParseConfig parses ReplicationConfiguration from xml
|
|
func ParseConfig(reader io.Reader) (*Config, error) {
|
|
config := Config{}
|
|
if err := xml.NewDecoder(io.LimitReader(reader, maxReplicationConfigSize)).Decode(&config); err != nil {
|
|
return nil, err
|
|
}
|
|
// By default, set replica modification to enabled if unset.
|
|
for i := range config.Rules {
|
|
if len(config.Rules[i].SourceSelectionCriteria.ReplicaModifications.Status) == 0 {
|
|
config.Rules[i].SourceSelectionCriteria = SourceSelectionCriteria{
|
|
ReplicaModifications: ReplicaModifications{
|
|
Status: Enabled,
|
|
},
|
|
}
|
|
}
|
|
// Default DeleteReplication to disabled if unset.
|
|
if len(config.Rules[i].DeleteReplication.Status) == 0 {
|
|
config.Rules[i].DeleteReplication = DeleteReplication{
|
|
Status: Disabled,
|
|
}
|
|
}
|
|
}
|
|
return &config, nil
|
|
}
|
|
|
|
// Validate - validates the replication configuration
|
|
func (c Config) Validate(bucket string, sameTarget bool) error {
|
|
// replication config can't have more than 1000 rules
|
|
if len(c.Rules) > 1000 {
|
|
return errReplicationTooManyRules
|
|
}
|
|
// replication config should have at least one rule
|
|
if len(c.Rules) == 0 {
|
|
return errReplicationNoRule
|
|
}
|
|
|
|
// Validate all the rules in the replication config
|
|
targetMap := make(map[string]struct{})
|
|
priorityMap := make(map[string]struct{})
|
|
var legacyArn bool
|
|
for _, r := range c.Rules {
|
|
if _, ok := targetMap[r.Destination.Bucket]; !ok {
|
|
targetMap[r.Destination.Bucket] = struct{}{}
|
|
}
|
|
if err := r.Validate(bucket, sameTarget); err != nil {
|
|
return err
|
|
}
|
|
if _, ok := priorityMap[strconv.Itoa(r.Priority)]; ok {
|
|
return errReplicationUniquePriority
|
|
}
|
|
priorityMap[strconv.Itoa(r.Priority)] = struct{}{}
|
|
|
|
if r.Destination.LegacyArn() {
|
|
legacyArn = true
|
|
}
|
|
if c.RoleArn == "" && !r.Destination.TargetArn() {
|
|
return errDestinationArnMissing
|
|
}
|
|
}
|
|
// disallow combining old replication configuration which used RoleArn as target ARN with multiple
|
|
// destination replication
|
|
if c.RoleArn != "" && len(targetMap) > 1 {
|
|
return errRoleArnPresentForMultipleTargets
|
|
}
|
|
// validate RoleArn if destination used legacy ARN format.
|
|
if c.RoleArn == "" && legacyArn {
|
|
return errRoleArnMissingLegacy
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Type - replication type enum
|
|
type Type int
|
|
|
|
// Types of replication
|
|
const (
|
|
UnsetReplicationType Type = 0 + iota
|
|
ObjectReplicationType
|
|
DeleteReplicationType
|
|
MetadataReplicationType
|
|
HealReplicationType
|
|
ExistingObjectReplicationType
|
|
ResyncReplicationType
|
|
AllReplicationType
|
|
)
|
|
|
|
// Valid returns true if replication type is set
|
|
func (t Type) Valid() bool {
|
|
return t > 0
|
|
}
|
|
|
|
// IsDataReplication returns true if content being replicated
|
|
func (t Type) IsDataReplication() bool {
|
|
switch t {
|
|
case ObjectReplicationType, HealReplicationType, ExistingObjectReplicationType:
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ObjectOpts provides information to deduce whether replication
|
|
// can be triggered on the resultant object.
|
|
type ObjectOpts struct {
|
|
Name string
|
|
UserTags string
|
|
VersionID string
|
|
DeleteMarker bool
|
|
SSEC bool
|
|
OpType Type
|
|
Replica bool
|
|
ExistingObject bool
|
|
TargetArn string
|
|
}
|
|
|
|
// HasExistingObjectReplication returns true if any of the rule returns 'ExistingObjects' replication.
|
|
func (c Config) HasExistingObjectReplication(arn string) bool {
|
|
for _, rule := range c.Rules {
|
|
if rule.Destination.ARN == arn || c.RoleArn == arn {
|
|
if rule.ExistingObjectReplication.Status == Enabled {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// FilterActionableRules returns the rules actions that need to be executed
|
|
// after evaluating prefix/tag filtering
|
|
func (c Config) FilterActionableRules(obj ObjectOpts) []Rule {
|
|
if obj.Name == "" && !(obj.OpType == ResyncReplicationType || obj.OpType == AllReplicationType) {
|
|
return nil
|
|
}
|
|
var rules []Rule
|
|
for _, rule := range c.Rules {
|
|
if rule.Status == Disabled {
|
|
continue
|
|
}
|
|
|
|
if obj.TargetArn != "" && rule.Destination.ARN != obj.TargetArn && c.RoleArn != obj.TargetArn {
|
|
continue
|
|
}
|
|
// Ignore other object level and prefix filters for resyncing target/listing bucket targets
|
|
if obj.OpType == ResyncReplicationType || obj.OpType == AllReplicationType {
|
|
rules = append(rules, rule)
|
|
continue
|
|
}
|
|
if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled {
|
|
continue
|
|
}
|
|
if !strings.HasPrefix(obj.Name, rule.Prefix()) {
|
|
continue
|
|
}
|
|
if rule.Filter.TestTags(obj.UserTags) {
|
|
rules = append(rules, rule)
|
|
}
|
|
}
|
|
sort.Slice(rules, func(i, j int) bool {
|
|
return rules[i].Priority > rules[j].Priority && rules[i].Destination.String() == rules[j].Destination.String()
|
|
})
|
|
|
|
return rules
|
|
}
|
|
|
|
// GetDestination returns destination bucket and storage class.
|
|
func (c Config) GetDestination() Destination {
|
|
if len(c.Rules) > 0 {
|
|
return c.Rules[0].Destination
|
|
}
|
|
return Destination{}
|
|
}
|
|
|
|
// Replicate returns true if the object should be replicated.
|
|
func (c Config) Replicate(obj ObjectOpts) bool {
|
|
if obj.SSEC {
|
|
return false
|
|
}
|
|
for _, rule := range c.FilterActionableRules(obj) {
|
|
if rule.Status == Disabled {
|
|
continue
|
|
}
|
|
if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled {
|
|
return false
|
|
}
|
|
if obj.OpType == DeleteReplicationType {
|
|
switch {
|
|
case obj.VersionID != "":
|
|
// check MinIO extension for versioned deletes
|
|
return rule.DeleteReplication.Status == Enabled
|
|
default:
|
|
return rule.DeleteMarkerReplication.Status == Enabled
|
|
}
|
|
} // regular object/metadata replication
|
|
return rule.MetadataReplicate(obj)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// HasActiveRules - returns whether replication policy has active rules
|
|
// Optionally a prefix can be supplied.
|
|
// If recursive is specified the function will also return true if any level below the
|
|
// prefix has active rules. If no prefix is specified recursive is effectively true.
|
|
func (c Config) HasActiveRules(prefix string, recursive bool) bool {
|
|
if len(c.Rules) == 0 {
|
|
return false
|
|
}
|
|
for _, rule := range c.Rules {
|
|
if rule.Status == Disabled {
|
|
continue
|
|
}
|
|
if len(prefix) > 0 && len(rule.Filter.Prefix) > 0 {
|
|
// incoming prefix must be in rule prefix
|
|
if !recursive && !strings.HasPrefix(prefix, rule.Filter.Prefix) {
|
|
continue
|
|
}
|
|
// If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix
|
|
// does not match
|
|
if recursive && !strings.HasPrefix(rule.Prefix(), prefix) && !strings.HasPrefix(prefix, rule.Prefix()) {
|
|
continue
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// FilterTargetArns returns a slice of distinct target arns in the config
|
|
func (c Config) FilterTargetArns(obj ObjectOpts) []string {
|
|
var arns []string
|
|
|
|
tgtsMap := make(map[string]struct{})
|
|
rules := c.FilterActionableRules(obj)
|
|
for _, rule := range rules {
|
|
if rule.Status == Disabled {
|
|
continue
|
|
}
|
|
if c.RoleArn != "" {
|
|
arns = append(arns, c.RoleArn) // use legacy RoleArn if present
|
|
return arns
|
|
}
|
|
if _, ok := tgtsMap[rule.Destination.ARN]; !ok {
|
|
tgtsMap[rule.Destination.ARN] = struct{}{}
|
|
}
|
|
}
|
|
for k := range tgtsMap {
|
|
arns = append(arns, k)
|
|
}
|
|
return arns
|
|
}
|