mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
This PR fixes a regression introduced in https://github.com/minio/minio/pull/19797 by restoring the healing ability of transitioned objects Bonus: support for transitioned objects to carry original The object name is for future reverse lookups if necessary. Also fix parity calculation for tiered objects to n/2 for n/2 == (parity)
This commit is contained in:
@@ -392,8 +392,8 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
||||
if metaErrs[i] != nil {
|
||||
continue
|
||||
}
|
||||
meta := partsMetadata[i]
|
||||
|
||||
meta := partsMetadata[i]
|
||||
if meta.Deleted || meta.IsRemote() {
|
||||
continue
|
||||
}
|
||||
@@ -442,13 +442,17 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
||||
}
|
||||
|
||||
for i, onlineDisk := range onlineDisks {
|
||||
if metaErrs[i] == nil && !hasPartErr(dataErrsByDisk[i]) {
|
||||
// All parts verified, mark it as all data available.
|
||||
availableDisks[i] = onlineDisk
|
||||
} else {
|
||||
// upon errors just make that disk's fileinfo invalid
|
||||
partsMetadata[i] = FileInfo{}
|
||||
if metaErrs[i] == nil {
|
||||
meta := partsMetadata[i]
|
||||
if meta.Deleted || meta.IsRemote() || !hasPartErr(dataErrsByDisk[i]) {
|
||||
// All parts verified, mark it as all data available.
|
||||
availableDisks[i] = onlineDisk
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// upon errors just make that disk's fileinfo invalid
|
||||
partsMetadata[i] = FileInfo{}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -514,8 +514,10 @@ func listObjectParities(partsMetadata []FileInfo, errs []error) (parities []int)
|
||||
if metadata.Deleted || metadata.Size == 0 {
|
||||
parities[index] = totalShards / 2
|
||||
} else if metadata.TransitionStatus == lifecycle.TransitionComplete {
|
||||
// For tiered objects, read quorum is N/2+1 to ensure simple majority on xl.meta. It is not equal to EcM because the data integrity is entrusted with the warm tier.
|
||||
parities[index] = totalShards - (totalShards/2 + 1)
|
||||
// For tiered objects, read quorum is N/2+1 to ensure simple majority on xl.meta.
|
||||
// It is not equal to EcM because the data integrity is entrusted with the warm tier.
|
||||
// However, we never go below EcM, in case of a EcM=EcN setup.
|
||||
parities[index] = max(totalShards-(totalShards/2+1), metadata.Erasure.ParityBlocks)
|
||||
} else {
|
||||
parities[index] = metadata.Erasure.ParityBlocks
|
||||
}
|
||||
|
||||
@@ -2376,7 +2376,11 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
|
||||
}()
|
||||
|
||||
var rv remoteVersionID
|
||||
rv, err = tgtClient.Put(ctx, destObj, pr, fi.Size)
|
||||
rv, err = tgtClient.PutWithMeta(ctx, destObj, pr, fi.Size, map[string]string{
|
||||
"name": object, // preserve the original name of the object on the remote tier object metadata.
|
||||
// this is just for future reverse lookup() purposes (applies only for new objects)
|
||||
// does not apply retro-actively on already transitioned objects.
|
||||
})
|
||||
pr.CloseWithError(err)
|
||||
if err != nil {
|
||||
traceFn(ILMTransition, nil, err)
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
@@ -59,10 +60,15 @@ func (az *warmBackendAzure) getDest(object string) string {
|
||||
return destObj
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
|
||||
func (az *warmBackendAzure) PutWithMeta(ctx context.Context, object string, r io.Reader, length int64, meta map[string]string) (remoteVersionID, error) {
|
||||
azMeta := map[string]*string{}
|
||||
for k, v := range meta {
|
||||
azMeta[k] = to.Ptr(v)
|
||||
}
|
||||
resp, err := az.clnt.UploadStream(ctx, az.Bucket, az.getDest(object), io.LimitReader(r, length), &azblob.UploadStreamOptions{
|
||||
Concurrency: 4,
|
||||
AccessTier: az.tier(), // set tier if specified
|
||||
Metadata: azMeta,
|
||||
})
|
||||
if err != nil {
|
||||
return "", azureToObjectError(err, az.Bucket, az.getDest(object))
|
||||
@@ -74,6 +80,10 @@ func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader,
|
||||
return remoteVersionID(vid), nil
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
|
||||
return az.PutWithMeta(ctx, object, r, length, map[string]string{})
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) {
|
||||
if opts.startOffset < 0 {
|
||||
return nil, InvalidRange{}
|
||||
|
||||
@@ -47,16 +47,17 @@ func (gcs *warmBackendGCS) getDest(object string) string {
|
||||
return destObj
|
||||
}
|
||||
|
||||
// FIXME: add support for remote version ID in GCS remote tier and remove this.
|
||||
// Currently it's a no-op.
|
||||
|
||||
func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) (remoteVersionID, error) {
|
||||
func (gcs *warmBackendGCS) PutWithMeta(ctx context.Context, key string, data io.Reader, length int64, meta map[string]string) (remoteVersionID, error) {
|
||||
object := gcs.client.Bucket(gcs.Bucket).Object(gcs.getDest(key))
|
||||
// TODO: set storage class
|
||||
w := object.NewWriter(ctx)
|
||||
if gcs.StorageClass != "" {
|
||||
w.ObjectAttrs.StorageClass = gcs.StorageClass
|
||||
}
|
||||
w.ObjectAttrs.Metadata = meta
|
||||
if _, err := xioutil.Copy(w, data); err != nil {
|
||||
return "", gcsToObjectError(err, gcs.Bucket, key)
|
||||
}
|
||||
|
||||
if _, err := xioutil.Copy(w, data); err != nil {
|
||||
return "", gcsToObjectError(err, gcs.Bucket, key)
|
||||
}
|
||||
@@ -64,6 +65,12 @@ func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader,
|
||||
return "", w.Close()
|
||||
}
|
||||
|
||||
// FIXME: add support for remote version ID in GCS remote tier and remove this.
|
||||
// Currently it's a no-op.
|
||||
func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) (remoteVersionID, error) {
|
||||
return gcs.PutWithMeta(ctx, key, data, length, map[string]string{})
|
||||
}
|
||||
|
||||
func (gcs *warmBackendGCS) Get(ctx context.Context, key string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) {
|
||||
// GCS storage decompresses a gzipped object by default and returns the data.
|
||||
// Refer to https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding
|
||||
|
||||
@@ -78,7 +78,7 @@ func optimalPartSize(objectSize int64) (partSize int64, err error) {
|
||||
return partSize, nil
|
||||
}
|
||||
|
||||
func (m *warmBackendMinIO) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
|
||||
func (m *warmBackendMinIO) PutWithMeta(ctx context.Context, object string, r io.Reader, length int64, meta map[string]string) (remoteVersionID, error) {
|
||||
partSize, err := optimalPartSize(length)
|
||||
if err != nil {
|
||||
return remoteVersionID(""), err
|
||||
@@ -87,10 +87,15 @@ func (m *warmBackendMinIO) Put(ctx context.Context, object string, r io.Reader,
|
||||
StorageClass: m.StorageClass,
|
||||
PartSize: uint64(partSize),
|
||||
DisableContentSha256: true,
|
||||
UserMetadata: meta,
|
||||
})
|
||||
return remoteVersionID(res.VersionID), m.ToObjectError(err, object)
|
||||
}
|
||||
|
||||
func (m *warmBackendMinIO) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
|
||||
return m.PutWithMeta(ctx, object, r, length, map[string]string{})
|
||||
}
|
||||
|
||||
func newWarmBackendMinIO(conf madmin.TierMinIO, tier string) (*warmBackendMinIO, error) {
|
||||
// Validation of credentials
|
||||
if conf.AccessKey == "" || conf.SecretKey == "" {
|
||||
|
||||
@@ -56,14 +56,19 @@ func (s3 *warmBackendS3) getDest(object string) string {
|
||||
return destObj
|
||||
}
|
||||
|
||||
func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
|
||||
func (s3 *warmBackendS3) PutWithMeta(ctx context.Context, object string, r io.Reader, length int64, meta map[string]string) (remoteVersionID, error) {
|
||||
res, err := s3.client.PutObject(ctx, s3.Bucket, s3.getDest(object), r, length, minio.PutObjectOptions{
|
||||
SendContentMd5: true,
|
||||
StorageClass: s3.StorageClass,
|
||||
UserMetadata: meta,
|
||||
})
|
||||
return remoteVersionID(res.VersionID), s3.ToObjectError(err, object)
|
||||
}
|
||||
|
||||
func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
|
||||
return s3.PutWithMeta(ctx, object, r, length, map[string]string{})
|
||||
}
|
||||
|
||||
func (s3 *warmBackendS3) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error) {
|
||||
gopts := minio.GetObjectOptions{}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ type WarmBackendGetOpts struct {
|
||||
// WarmBackend provides interface to be implemented by remote tier backends
|
||||
type WarmBackend interface {
|
||||
Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error)
|
||||
PutWithMeta(ctx context.Context, object string, r io.Reader, length int64, meta map[string]string) (remoteVersionID, error)
|
||||
Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error)
|
||||
Remove(ctx context.Context, object string, rv remoteVersionID) error
|
||||
InUse(ctx context.Context) (bool, error)
|
||||
|
||||
Reference in New Issue
Block a user