minio/cmd/bucket-targets.go
Krishnan Parthasarathi c829e3a13b Support for remote tier management (#12090)
With this change, MinIO's ILM supports transitioning objects to a remote tier.
This change includes support for Azure Blob Storage, AWS S3 compatible object
storage incl. MinIO and Google Cloud Storage as remote tier storage backends.

Some new additions include:

 - Admin APIs remote tier configuration management

 - Simple journal to track remote objects to be 'collected'
   This is used by object API handlers which 'mutate' object versions by
   overwriting/replacing content (Put/CopyObject) or removing the version
   itself (e.g DeleteObjectVersion).

 - Rework of previous ILM transition to fit the new model
   In the new model, a storage class (a.k.a remote tier) is defined by the
   'remote' object storage type (one of s3, azure, GCS), bucket name and a
   prefix.

* Fixed bugs, review comments, and more unit-tests

- Leverage inline small object feature
- Migrate legacy objects to the latest object format before transitioning
- Fix restore to particular version if specified
- Extend SharedDataDirCount to handle transitioned and restored objects
- Restore-object should accept version-id for version-suspended bucket (#12091)
- Check if remote tier creds have sufficient permissions
- Bonus minor fixes to existing error messages

Co-authored-by: Poorna Krishnamoorthy <poorna@minio.io>
Co-authored-by: Krishna Srinivas <krishna@minio.io>
Signed-off-by: Harshavardhana <harsha@minio.io>
2021-04-23 11:58:53 -07:00

414 lines
12 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"net/http"
"sync"
"sync/atomic"
"time"
minio "github.com/minio/minio-go/v7"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio/cmd/crypto"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/versioning"
"github.com/minio/minio/pkg/madmin"
)
const (
defaultHealthCheckDuration = 100 * time.Second
)
// BucketTargetSys represents bucket targets subsystem
type BucketTargetSys struct {
sync.RWMutex
arnRemotesMap map[string]*TargetClient
targetsMap map[string][]madmin.BucketTarget
}
// ListTargets lists bucket targets across tenant or for individual bucket, and returns
// results filtered by arnType
func (sys *BucketTargetSys) ListTargets(ctx context.Context, bucket, arnType string) (targets []madmin.BucketTarget) {
if bucket != "" {
if ts, err := sys.ListBucketTargets(ctx, bucket); err == nil {
for _, t := range ts.Targets {
if string(t.Type) == arnType || arnType == "" {
targets = append(targets, t.Clone())
}
}
}
return targets
}
sys.RLock()
defer sys.RUnlock()
for _, tgts := range sys.targetsMap {
for _, t := range tgts {
if string(t.Type) == arnType || arnType == "" {
targets = append(targets, t.Clone())
}
}
}
return
}
// ListBucketTargets - gets list of bucket targets for this bucket.
func (sys *BucketTargetSys) ListBucketTargets(ctx context.Context, bucket string) (*madmin.BucketTargets, error) {
sys.RLock()
defer sys.RUnlock()
tgts, ok := sys.targetsMap[bucket]
if ok {
return &madmin.BucketTargets{Targets: tgts}, nil
}
return nil, BucketRemoteTargetNotFound{Bucket: bucket}
}
// SetTarget - sets a new minio-go client target for this bucket.
func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *madmin.BucketTarget, update bool) error {
if globalIsGateway {
return nil
}
if !tgt.Type.IsValid() && !update {
return BucketRemoteArnTypeInvalid{Bucket: bucket}
}
clnt, err := sys.getRemoteTargetClient(tgt)
if err != nil {
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
}
// validate if target credentials are ok
if _, err = clnt.BucketExists(ctx, tgt.TargetBucket); err != nil {
if minio.ToErrorResponse(err).Code == "NoSuchBucket" {
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
}
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
}
if tgt.Type == madmin.ReplicationService {
if !globalIsErasure {
return NotImplemented{Message: "Replication is not implemented in " + getMinioMode()}
}
if !globalBucketVersioningSys.Enabled(bucket) {
return BucketReplicationSourceNotVersioned{Bucket: bucket}
}
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
if err != nil {
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
}
if vcfg.Status != string(versioning.Enabled) {
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}
}
if tgt.ReplicationSync && tgt.BandwidthLimit > 0 {
return NotImplemented{Message: "Synchronous replication does not support bandwidth limits"}
}
}
sys.Lock()
defer sys.Unlock()
tgts := sys.targetsMap[bucket]
newtgts := make([]madmin.BucketTarget, len(tgts))
found := false
for idx, t := range tgts {
if t.Type == tgt.Type {
if t.Arn == tgt.Arn && !update {
return BucketRemoteAlreadyExists{Bucket: t.TargetBucket}
}
newtgts[idx] = *tgt
found = true
continue
}
newtgts[idx] = t
}
if !found && !update {
newtgts = append(newtgts, *tgt)
}
sys.targetsMap[bucket] = newtgts
sys.arnRemotesMap[tgt.Arn] = clnt
return nil
}
// RemoveTarget - removes a remote bucket target for this source bucket.
func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr string) error {
if globalIsGateway {
return nil
}
if arnStr == "" {
return BucketRemoteArnInvalid{Bucket: bucket}
}
arn, err := madmin.ParseARN(arnStr)
if err != nil {
return BucketRemoteArnInvalid{Bucket: bucket}
}
if arn.Type == madmin.ReplicationService {
if !globalIsErasure {
return NotImplemented{Message: "Replication is not implemented in " + getMinioMode()}
}
// reject removal of remote target if replication configuration is present
rcfg, err := getReplicationConfig(ctx, bucket)
if err == nil && rcfg.RoleArn == arnStr {
if _, ok := sys.arnRemotesMap[arnStr]; ok {
return BucketRemoteRemoveDisallowed{Bucket: bucket}
}
}
}
// delete ARN type from list of matching targets
sys.Lock()
defer sys.Unlock()
found := false
tgts, ok := sys.targetsMap[bucket]
if !ok {
return BucketRemoteTargetNotFound{Bucket: bucket}
}
targets := make([]madmin.BucketTarget, 0, len(tgts))
for _, tgt := range tgts {
if tgt.Arn != arnStr {
targets = append(targets, tgt)
continue
}
found = true
}
if !found {
return BucketRemoteTargetNotFound{Bucket: bucket}
}
sys.targetsMap[bucket] = targets
delete(sys.arnRemotesMap, arnStr)
return nil
}
// GetRemoteTargetClient returns minio-go client for replication target instance
func (sys *BucketTargetSys) GetRemoteTargetClient(ctx context.Context, arn string) *TargetClient {
sys.RLock()
defer sys.RUnlock()
return sys.arnRemotesMap[arn]
}
// NewBucketTargetSys - creates new replication system.
func NewBucketTargetSys() *BucketTargetSys {
return &BucketTargetSys{
arnRemotesMap: make(map[string]*TargetClient),
targetsMap: make(map[string][]madmin.BucketTarget),
}
}
// Init initializes the bucket targets subsystem for buckets which have targets configured.
func (sys *BucketTargetSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, bucket targets is not supported.
if globalIsGateway {
return nil
}
// Load bucket targets once during boot in background.
go sys.load(ctx, buckets, objAPI)
return nil
}
// UpdateAllTargets updates target to reflect metadata updates
func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketTargets) {
if sys == nil {
return
}
sys.Lock()
defer sys.Unlock()
if tgts == nil || tgts.Empty() {
// remove target and arn association
if tgts, ok := sys.targetsMap[bucket]; ok {
for _, t := range tgts {
delete(sys.arnRemotesMap, t.Arn)
}
}
delete(sys.targetsMap, bucket)
return
}
if len(tgts.Targets) > 0 {
sys.targetsMap[bucket] = tgts.Targets
}
for _, tgt := range tgts.Targets {
tgtClient, err := sys.getRemoteTargetClient(&tgt)
if err != nil {
continue
}
sys.arnRemotesMap[tgt.Arn] = tgtClient
}
sys.targetsMap[bucket] = tgts.Targets
}
// create minio-go clients for buckets having remote targets
func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
for _, bucket := range buckets {
cfg, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket.Name)
if err != nil {
logger.LogIf(ctx, err)
continue
}
if cfg == nil || cfg.Empty() {
continue
}
if len(cfg.Targets) > 0 {
sys.targetsMap[bucket.Name] = cfg.Targets
}
for _, tgt := range cfg.Targets {
tgtClient, err := sys.getRemoteTargetClient(&tgt)
if err != nil {
logger.LogIf(ctx, err)
continue
}
sys.arnRemotesMap[tgt.Arn] = tgtClient
}
sys.targetsMap[bucket.Name] = cfg.Targets
}
}
// getRemoteTargetInstanceTransport contains a singleton roundtripper.
var getRemoteTargetInstanceTransport http.RoundTripper
var getRemoteTargetInstanceTransportOnce sync.Once
// Returns a minio-go Client configured to access remote host described in replication target config.
func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*TargetClient, error) {
config := tcfg.Credentials
creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "")
getRemoteTargetInstanceTransportOnce.Do(func() {
getRemoteTargetInstanceTransport = NewRemoteTargetHTTPTransport()
})
api, err := minio.New(tcfg.Endpoint, &miniogo.Options{
Creds: creds,
Secure: tcfg.Secure,
Region: tcfg.Region,
Transport: getRemoteTargetInstanceTransport,
})
if err != nil {
return nil, err
}
hcDuration := defaultHealthCheckDuration
if tcfg.HealthCheckDuration >= 1 { // require minimum health check duration of 1 sec.
hcDuration = tcfg.HealthCheckDuration
}
tc := &TargetClient{
Client: api,
healthCheckDuration: hcDuration,
replicateSync: tcfg.ReplicationSync,
Bucket: tcfg.TargetBucket,
StorageClass: tcfg.StorageClass,
}
go tc.healthCheck()
return tc, nil
}
// getRemoteARN gets existing ARN for an endpoint or generates a new one.
func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget) string {
if target == nil {
return ""
}
tgts := sys.targetsMap[bucket]
for _, tgt := range tgts {
if tgt.Type == target.Type && tgt.TargetBucket == target.TargetBucket && target.URL().String() == tgt.URL().String() {
return tgt.Arn
}
}
if !madmin.ServiceType(target.Type).IsValid() {
return ""
}
return generateARN(target)
}
// generate ARN that is unique to this target type
func generateARN(t *madmin.BucketTarget) string {
hash := sha256.New()
hash.Write([]byte(t.Type))
hash.Write([]byte(t.Region))
hash.Write([]byte(t.TargetBucket))
hashSum := hex.EncodeToString(hash.Sum(nil))
arn := madmin.ARN{
Type: t.Type,
ID: hashSum,
Region: t.Region,
Bucket: t.TargetBucket,
}
return arn.String()
}
// Returns parsed target config. If KMS is configured, remote target is decrypted
func parseBucketTargetConfig(bucket string, cdata, cmetadata []byte) (*madmin.BucketTargets, error) {
var (
data []byte
err error
t madmin.BucketTargets
meta map[string]string
)
if len(cdata) == 0 {
return nil, nil
}
data = cdata
if len(cmetadata) != 0 {
if err := json.Unmarshal(cmetadata, &meta); err != nil {
return nil, err
}
if crypto.S3.IsEncrypted(meta) {
if data, err = decryptBucketMetadata(cdata, bucket, meta, crypto.Context{
bucket: bucket,
bucketTargetsFile: bucketTargetsFile,
}); err != nil {
return nil, err
}
}
}
if err = json.Unmarshal(data, &t); err != nil {
return nil, err
}
return &t, nil
}
// TargetClient is the struct for remote target client.
type TargetClient struct {
*miniogo.Client
up int32
healthCheckDuration time.Duration
Bucket string // remote bucket target
replicateSync bool
StorageClass string // storage class on remote
}
func (tc *TargetClient) isOffline() bool {
return atomic.LoadInt32(&tc.up) == 0
}
func (tc *TargetClient) healthCheck() {
for {
_, err := tc.BucketExists(GlobalContext, tc.Bucket)
if err != nil {
atomic.StoreInt32(&tc.up, 0)
time.Sleep(tc.healthCheckDuration)
continue
}
atomic.StoreInt32(&tc.up, 1)
time.Sleep(tc.healthCheckDuration)
}
}