mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
api: Introduce metadata update APIs to update only metadata (#11962)
Current implementation heavily relies on readAllFileInfo but with the advent of xl.meta inlined with data, we cannot easily avoid reading data when we are only interested is updating metadata, this leads to invariably write amplification during metadata updates, repeatedly reading data when we are only interested in updating metadata. This PR ensures that we implement a metadata only update API at storage layer, that handles updates to metadata alone for any given version - given the version is valid and present. This helps reduce the chattiness for following calls.. - PutObjectTags - DeleteObjectTags - PutObjectLegalHold - PutObjectRetention - ReplicateObject (updates metadata on replication status)
This commit is contained in:
parent
8a9d15ace2
commit
d46386246f
@ -571,11 +571,6 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio
|
||||
// replicateObject replicates the specified version of the object to destination bucket
|
||||
// The source object is then updated to reflect the replication status.
|
||||
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
|
||||
z, ok := objectAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
bucket := objInfo.Bucket
|
||||
object := objInfo.Name
|
||||
|
||||
@ -734,17 +729,27 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
|
||||
eventName = event.ObjectReplicationFailed
|
||||
}
|
||||
|
||||
z, ok := objectAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// This lower level implementation is necessary to avoid write locks from CopyObject.
|
||||
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
} else {
|
||||
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, objInfo.UserDefined, ObjectOptions{
|
||||
VersionID: objInfo.VersionID,
|
||||
}); err != nil {
|
||||
fi := FileInfo{}
|
||||
fi.VersionID = objInfo.VersionID
|
||||
fi.Metadata = make(map[string]string, len(objInfo.UserDefined))
|
||||
for k, v := range objInfo.UserDefined {
|
||||
fi.Metadata[k] = v
|
||||
}
|
||||
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
}
|
||||
}
|
||||
|
||||
opType := replication.MetadataReplicationType
|
||||
if rtype == replicateAll {
|
||||
opType = replication.ObjectReplicationType
|
||||
|
@ -452,7 +452,7 @@ func lookupConfigs(s config.Config, setDriveCounts []int) {
|
||||
// if we validated all setDriveCounts and it was successful
|
||||
// proceed to store the correct storage class globally.
|
||||
if i == len(setDriveCounts)-1 {
|
||||
globalStorageClass = sc
|
||||
globalStorageClass.Update(sc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/cmd/config"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
@ -90,6 +91,9 @@ type StorageClass struct {
|
||||
Parity int
|
||||
}
|
||||
|
||||
// ConfigLock is a global lock for storage-class config
|
||||
var ConfigLock = sync.RWMutex{}
|
||||
|
||||
// Config storage class configuration
|
||||
type Config struct {
|
||||
Standard StorageClass `json:"standard"`
|
||||
@ -232,6 +236,8 @@ func validateParity(ssParity, rrsParity, setDriveCount int) (err error) {
|
||||
// is returned, the caller is expected to choose the right parity
|
||||
// at that point.
|
||||
func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
||||
ConfigLock.RLock()
|
||||
defer ConfigLock.RUnlock()
|
||||
switch strings.TrimSpace(sc) {
|
||||
case RRS:
|
||||
// set the rrs parity if available
|
||||
@ -244,8 +250,19 @@ func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
||||
}
|
||||
}
|
||||
|
||||
// Update update storage-class with new config
|
||||
func (sCfg Config) Update(newCfg Config) {
|
||||
ConfigLock.Lock()
|
||||
defer ConfigLock.Unlock()
|
||||
sCfg.RRS = newCfg.RRS
|
||||
sCfg.DMA = newCfg.DMA
|
||||
sCfg.Standard = newCfg.Standard
|
||||
}
|
||||
|
||||
// GetDMA - returns DMA configuration.
|
||||
func (sCfg Config) GetDMA() string {
|
||||
ConfigLock.RLock()
|
||||
defer ConfigLock.RUnlock()
|
||||
return sCfg.DMA
|
||||
}
|
||||
|
||||
|
@ -96,7 +96,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
}
|
||||
|
||||
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)
|
||||
|
||||
versionID := srcInfo.VersionID
|
||||
if srcInfo.versionOnly {
|
||||
versionID = dstOpts.VersionID
|
||||
@ -1201,6 +1200,56 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
var err error
|
||||
// Lock the object before updating tags.
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
||||
disks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
|
||||
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
_, modTime := listOnlineDisks(disks, metaArr, errs)
|
||||
|
||||
// Pick latest valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
if fi.Deleted {
|
||||
if opts.VersionID == "" {
|
||||
return ObjectInfo{}, toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
}
|
||||
|
||||
for k, v := range opts.UserDefined {
|
||||
fi.Metadata[k] = v
|
||||
}
|
||||
fi.ModTime = opts.MTime
|
||||
fi.VersionID = opts.VersionID
|
||||
|
||||
if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
objInfo := fi.ToObjectInfo(bucket, object)
|
||||
return objInfo, nil
|
||||
|
||||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
var err error
|
||||
@ -1215,15 +1264,15 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
||||
disks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true)
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
|
||||
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
|
||||
_, modTime := listOnlineDisks(disks, metaArr, errs)
|
||||
|
||||
// Pick latest valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
||||
@ -1237,118 +1286,43 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
||||
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
}
|
||||
|
||||
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)
|
||||
for i, metaFi := range metaArr {
|
||||
if metaFi.IsValid() {
|
||||
// clean fi.Meta of tag key, before updating the new tags
|
||||
delete(metaFi.Metadata, xhttp.AmzObjectTagging)
|
||||
// Don't update for empty tags
|
||||
if tags != "" {
|
||||
metaFi.Metadata[xhttp.AmzObjectTagging] = tags
|
||||
}
|
||||
for k, v := range opts.UserDefined {
|
||||
metaFi.Metadata[k] = v
|
||||
}
|
||||
metaArr[i].Metadata = metaFi.Metadata
|
||||
}
|
||||
}
|
||||
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
var online int
|
||||
// Cleanup in case of xl.meta writing failure
|
||||
defer func() {
|
||||
if online != len(onlineDisks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
|
||||
}
|
||||
}()
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Atomically rename metadata from tmp location to destination for each disk.
|
||||
if onlineDisks, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
online = countOnlineDisks(onlineDisks)
|
||||
|
||||
objInfo := fi.ToObjectInfo(bucket, object)
|
||||
objInfo.UserTags = tags
|
||||
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
// updateObjectMeta will update the metadata of a file.
|
||||
func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, meta map[string]string, opts ObjectOptions) error {
|
||||
if len(meta) == 0 {
|
||||
return nil
|
||||
}
|
||||
disks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true)
|
||||
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
_, modTime := listOnlineDisks(disks, metaArr, errs)
|
||||
|
||||
// Pick latest valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Update metadata
|
||||
for k, v := range meta {
|
||||
fi.Metadata[xhttp.AmzObjectTagging] = tags
|
||||
for k, v := range opts.UserDefined {
|
||||
fi.Metadata[k] = v
|
||||
}
|
||||
|
||||
if fi.Deleted {
|
||||
if opts.VersionID == "" {
|
||||
return toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
return toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
for i := range metaArr {
|
||||
if errs[i] != nil {
|
||||
// Avoid disks where loading metadata fail
|
||||
continue
|
||||
}
|
||||
return fi.ToObjectInfo(bucket, object), nil
|
||||
}
|
||||
|
||||
metaArr[i].Metadata = fi.Metadata
|
||||
// updateObjectMeta will update the metadata of a file.
|
||||
func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo) error {
|
||||
if len(fi.Metadata) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
tempObj := mustGetUUID()
|
||||
disks := er.getDisks()
|
||||
|
||||
var online int
|
||||
// Cleanup in case of xl.meta writing failure
|
||||
defer func() {
|
||||
if online != len(disks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
|
||||
}
|
||||
}()
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if disks, err = writeUniqueFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
// Start writing `xl.meta` to all disks in parallel.
|
||||
for index := range disks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
return disks[index].UpdateMetadata(ctx, bucket, object, fi)
|
||||
}, index)
|
||||
}
|
||||
|
||||
// Atomically rename metadata from tmp location to destination for each disk.
|
||||
if disks, err = renameFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
// Wait for all the routines.
|
||||
mErrs := g.Wait()
|
||||
|
||||
online = countOnlineDisks(disks)
|
||||
return nil
|
||||
return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, getWriteQuorum(len(disks)))
|
||||
}
|
||||
|
||||
// DeleteObjectTags - delete object tags from an existing object
|
||||
|
@ -1778,6 +1778,22 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
|
||||
}
|
||||
}
|
||||
|
||||
// PutObjectMetadata - replace or add tags to an existing object
|
||||
func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
object = encodeDirObject(object)
|
||||
if z.SinglePool() {
|
||||
return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
// We don't know the size here set 1GiB atleast.
|
||||
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
return z.serverPools[idx].PutObjectMetadata(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
object = encodeDirObject(object)
|
||||
|
@ -1318,6 +1318,12 @@ func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID
|
||||
return s.getHashedSet(object).HealObject(ctx, bucket, object, versionID, opts)
|
||||
}
|
||||
|
||||
// PutObjectMetadata - replace or add metadata to an existing object/version
|
||||
func (s *erasureSets) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
er := s.getHashedSet(object)
|
||||
return er.PutObjectMetadata(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
er := s.getHashedSet(object)
|
||||
|
@ -52,6 +52,12 @@ func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, upda
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
// PutObjectMetadata - not implemented for gateway.
|
||||
func (a GatewayUnsupported) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
logger.CriticalIf(ctx, errors.New("not implemented"))
|
||||
return ObjectInfo{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// NewNSLock is a dummy stub for gateway.
|
||||
func (a GatewayUnsupported) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
logger.CriticalIf(context.Background(), errors.New("not implemented"))
|
||||
|
@ -686,7 +686,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
|
||||
// Update block 0 metadata.
|
||||
var retries int
|
||||
for {
|
||||
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{})
|
||||
meta := b.headerKV()
|
||||
fi := FileInfo{
|
||||
Metadata: make(map[string]string, len(meta)),
|
||||
}
|
||||
for k, v := range meta {
|
||||
fi.Metadata[k] = v
|
||||
}
|
||||
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -244,6 +244,13 @@ func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi
|
||||
return d.disk.WriteMetadata(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.disk.UpdateMetadata(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
|
@ -161,6 +161,9 @@ type ObjectLayer interface {
|
||||
Health(ctx context.Context, opts HealthOptions) HealthResult
|
||||
ReadHealth(ctx context.Context) bool
|
||||
|
||||
// Metadata operations
|
||||
PutObjectMetadata(context.Context, string, string, ObjectOptions) (ObjectInfo, error)
|
||||
|
||||
// ObjectTagging operations
|
||||
PutObjectTags(context.Context, string, string, string, ObjectOptions) (ObjectInfo, error)
|
||||
GetObjectTags(context.Context, string, string, ObjectOptions) (*tags.Tags, error)
|
||||
|
@ -3274,22 +3274,28 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
|
||||
if objInfo.UserTags != "" {
|
||||
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
||||
if objInfo.DeleteMarker {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
|
||||
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
|
||||
if replicate {
|
||||
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
|
||||
}
|
||||
|
||||
objInfo.metadataOnly = true
|
||||
if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{
|
||||
VersionID: opts.VersionID,
|
||||
}, ObjectOptions{
|
||||
VersionID: opts.VersionID,
|
||||
MTime: opts.MTime,
|
||||
}); err != nil {
|
||||
// if version-id is not specified retention is supposed to be set on the latest object.
|
||||
if opts.VersionID == "" {
|
||||
opts.VersionID = objInfo.VersionID
|
||||
}
|
||||
popts := ObjectOptions{
|
||||
MTime: opts.MTime,
|
||||
VersionID: opts.VersionID,
|
||||
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
|
||||
}
|
||||
for k, v := range objInfo.UserDefined {
|
||||
popts.UserDefined[k] = v
|
||||
}
|
||||
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
@ -3441,28 +3447,34 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
if objInfo.DeleteMarker {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
if objRetention.Mode.Valid() {
|
||||
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode)
|
||||
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = objRetention.RetainUntilDate.UTC().Format(time.RFC3339)
|
||||
} else {
|
||||
delete(objInfo.UserDefined, strings.ToLower(xhttp.AmzObjectLockRetainUntilDate))
|
||||
delete(objInfo.UserDefined, strings.ToLower(xhttp.AmzObjectLockMode))
|
||||
}
|
||||
if objInfo.UserTags != "" {
|
||||
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
||||
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = ""
|
||||
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
|
||||
}
|
||||
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
|
||||
if replicate {
|
||||
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
|
||||
}
|
||||
objInfo.metadataOnly = true // Perform only metadata updates.
|
||||
if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{
|
||||
VersionID: opts.VersionID,
|
||||
}, ObjectOptions{
|
||||
VersionID: opts.VersionID,
|
||||
MTime: opts.MTime,
|
||||
}); err != nil {
|
||||
// if version-id is not specified retention is supposed to be set on the latest object.
|
||||
if opts.VersionID == "" {
|
||||
opts.VersionID = objInfo.VersionID
|
||||
}
|
||||
popts := ObjectOptions{
|
||||
MTime: opts.MTime,
|
||||
VersionID: opts.VersionID,
|
||||
UserDefined: make(map[string]string, len(objInfo.UserDefined)),
|
||||
}
|
||||
for k, v := range objInfo.UserDefined {
|
||||
popts.UserDefined[k] = v
|
||||
}
|
||||
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ type StorageAPI interface {
|
||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error
|
||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
||||
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error)
|
||||
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
|
||||
|
||||
|
@ -360,6 +360,21 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
|
||||
var reader bytes.Buffer
|
||||
if err := msgp.Encode(&reader, &fi); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
|
@ -17,7 +17,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v29" // Removed WalkVersions()
|
||||
storageRESTVersion = "v30" // Added UpdateMetadata()
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
@ -36,6 +36,7 @@ const (
|
||||
storageRESTMethodCreateFile = "/createfile"
|
||||
storageRESTMethodWriteAll = "/writeall"
|
||||
storageRESTMethodWriteMetadata = "/writemetadata"
|
||||
storageRESTMethodUpdateMetadata = "/updatemetadata"
|
||||
storageRESTMethodDeleteVersion = "/deleteversion"
|
||||
storageRESTMethodReadVersion = "/readversion"
|
||||
storageRESTMethodRenameData = "/renamedata"
|
||||
|
@ -373,6 +373,32 @@ func (s *storageRESTServer) WriteMetadataHandler(w http.ResponseWriter, r *http.
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateMetadata update new updated metadata.
|
||||
func (s *storageRESTServer) UpdateMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
vars := mux.Vars(r)
|
||||
volume := vars[storageRESTVolume]
|
||||
filePath := vars[storageRESTFilePath]
|
||||
|
||||
if r.ContentLength < 0 {
|
||||
s.writeErrorResponse(w, errInvalidArgument)
|
||||
return
|
||||
}
|
||||
|
||||
var fi FileInfo
|
||||
if err := msgp.Decode(r.Body, &fi); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
err := s.storage.UpdateMetadata(r.Context(), volume, filePath, fi)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteAllHandler - write to file all content.
|
||||
func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@ -1018,6 +1044,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWriteMetadata).HandlerFunc(httpTraceHdrs(server.WriteMetadataHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodUpdateMetadata).HandlerFunc(httpTraceHdrs(server.UpdateMetadataHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTForceDelMarker)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
|
||||
|
@ -29,14 +29,15 @@ func _() {
|
||||
_ = x[storageMetricWriteAll-18]
|
||||
_ = x[storageMetricDeleteVersion-19]
|
||||
_ = x[storageMetricWriteMetadata-20]
|
||||
_ = x[storageMetricReadVersion-21]
|
||||
_ = x[storageMetricReadAll-22]
|
||||
_ = x[storageMetricLast-23]
|
||||
_ = x[storageMetricUpdateMetadata-21]
|
||||
_ = x[storageMetricReadVersion-22]
|
||||
_ = x[storageMetricReadAll-23]
|
||||
_ = x[storageMetricLast-24]
|
||||
}
|
||||
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataReadVersionReadAllLast"
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllLast"
|
||||
|
||||
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 212, 219, 223}
|
||||
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 237}
|
||||
|
||||
func (i storageMetric) String() string {
|
||||
if i >= storageMetric(len(_storageMetric_index)-1) {
|
||||
|
@ -54,6 +54,7 @@ const (
|
||||
storageMetricWriteAll
|
||||
storageMetricDeleteVersion
|
||||
storageMetricWriteMetadata
|
||||
storageMetricUpdateMetadata
|
||||
storageMetricReadVersion
|
||||
storageMetricReadAll
|
||||
|
||||
@ -541,6 +542,22 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s
|
||||
return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricUpdateMetadata, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.storage.UpdateMetadata(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricWriteMetadata, volume, path)()
|
||||
|
||||
|
@ -624,6 +624,75 @@ func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) {
|
||||
return append(dst, z.data...), nil
|
||||
}
|
||||
|
||||
// UpdateObjectVersion updates metadata and modTime for a given
|
||||
// versionID, NOTE: versionID must be valid and should exist -
|
||||
// and must not be a DeleteMarker or legacy object, if no
|
||||
// versionID is specified 'null' versionID is updated instead.
|
||||
//
|
||||
// It is callers responsibility to set correct versionID, this
|
||||
// function shouldn't be further extended to update immutable
|
||||
// values such as ErasureInfo, ChecksumInfo.
|
||||
//
|
||||
// Metadata is only updated to new values, existing values
|
||||
// stay as is, if you wish to update all values you should
|
||||
// update all metadata freshly before calling this function
|
||||
// in-case you wish to clear existing metadata.
|
||||
func (z *xlMetaV2) UpdateObjectVersion(fi FileInfo) error {
|
||||
if fi.VersionID == "" {
|
||||
// this means versioning is not yet
|
||||
// enabled or suspend i.e all versions
|
||||
// are basically default value i.e "null"
|
||||
fi.VersionID = nullVersionID
|
||||
}
|
||||
|
||||
var uv uuid.UUID
|
||||
var err error
|
||||
if fi.VersionID != "" && fi.VersionID != nullVersionID {
|
||||
uv, err = uuid.Parse(fi.VersionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for i, version := range z.Versions {
|
||||
if !version.Valid() {
|
||||
return errFileCorrupt
|
||||
}
|
||||
switch version.Type {
|
||||
case LegacyType:
|
||||
if version.ObjectV1.VersionID == fi.VersionID {
|
||||
return errMethodNotAllowed
|
||||
}
|
||||
case ObjectType:
|
||||
if version.ObjectV2.VersionID == uv {
|
||||
for k, v := range fi.Metadata {
|
||||
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
||||
if v == "" {
|
||||
delete(z.Versions[i].ObjectV2.MetaSys, k)
|
||||
} else {
|
||||
z.Versions[i].ObjectV2.MetaSys[k] = []byte(v)
|
||||
}
|
||||
} else {
|
||||
if v == "" {
|
||||
delete(z.Versions[i].ObjectV2.MetaUser, k)
|
||||
} else {
|
||||
z.Versions[i].ObjectV2.MetaUser[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
if !fi.ModTime.IsZero() {
|
||||
z.Versions[i].ObjectV2.ModTime = fi.ModTime.UnixNano()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
case DeleteType:
|
||||
return errMethodNotAllowed
|
||||
}
|
||||
}
|
||||
|
||||
return errFileVersionNotFound
|
||||
}
|
||||
|
||||
// AddVersion adds a new version
|
||||
func (z *xlMetaV2) AddVersion(fi FileInfo) error {
|
||||
if fi.VersionID == "" {
|
||||
|
@ -897,6 +897,44 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||
return err
|
||||
}
|
||||
|
||||
// Updates only metadata for a given version.
|
||||
func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error {
|
||||
if len(fi.Metadata) == 0 {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
if fi.VersionID != "" {
|
||||
return errFileVersionNotFound
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if !isXL2V1Format(buf) {
|
||||
return errFileVersionNotFound
|
||||
}
|
||||
|
||||
var xlMeta xlMetaV2
|
||||
if err = xlMeta.Load(buf); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = xlMeta.UpdateObjectVersion(fi); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf, err = xlMeta.AppendTo(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
||||
}
|
||||
|
||||
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
||||
func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error {
|
||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||
@ -936,7 +974,6 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F
|
||||
}
|
||||
|
||||
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
||||
|
||||
}
|
||||
|
||||
func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) {
|
||||
@ -1044,7 +1081,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
|
||||
// Reading data for small objects when
|
||||
// - object has not yet transitioned
|
||||
// - object size lesser than 32KiB
|
||||
// - object size lesser than 128KiB
|
||||
// - object has maximum of 1 parts
|
||||
if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 {
|
||||
// Enable O_DIRECT optionally only if drive supports it.
|
||||
@ -1891,7 +1928,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
if isXL2V1Format(dstBuf) {
|
||||
if err = xlMeta.Load(dstBuf); err != nil {
|
||||
logger.LogIf(s.ctx, err)
|
||||
return errFileCorrupt
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// This code-path is to preserve the legacy data.
|
||||
|
@ -30,7 +30,7 @@ type LRWMutex struct {
|
||||
source string
|
||||
isWriteLock bool
|
||||
ref int
|
||||
m sync.Mutex // Mutex to prevent multiple simultaneous locks
|
||||
mu sync.Mutex // Mutex to prevent multiple simultaneous locks
|
||||
}
|
||||
|
||||
// NewLRWMutex - initializes a new lsync RW mutex.
|
||||
@ -73,7 +73,9 @@ func (lm *LRWMutex) GetRLock(ctx context.Context, id string, source string, time
|
||||
}
|
||||
|
||||
func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) {
|
||||
lm.m.Lock()
|
||||
lm.mu.Lock()
|
||||
defer lm.mu.Unlock()
|
||||
|
||||
lm.id = id
|
||||
lm.source = source
|
||||
if isWriteLock {
|
||||
@ -88,7 +90,6 @@ func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) {
|
||||
locked = true
|
||||
}
|
||||
}
|
||||
lm.m.Unlock()
|
||||
|
||||
return locked
|
||||
}
|
||||
@ -147,7 +148,8 @@ func (lm *LRWMutex) RUnlock() {
|
||||
}
|
||||
|
||||
func (lm *LRWMutex) unlock(isWriteLock bool) (unlocked bool) {
|
||||
lm.m.Lock()
|
||||
lm.mu.Lock()
|
||||
defer lm.mu.Unlock()
|
||||
|
||||
// Try to release lock.
|
||||
if isWriteLock {
|
||||
@ -165,16 +167,17 @@ func (lm *LRWMutex) unlock(isWriteLock bool) (unlocked bool) {
|
||||
}
|
||||
}
|
||||
|
||||
lm.m.Unlock()
|
||||
return unlocked
|
||||
}
|
||||
|
||||
// ForceUnlock will forcefully clear a write or read lock.
|
||||
func (lm *LRWMutex) ForceUnlock() {
|
||||
lm.m.Lock()
|
||||
lm.mu.Lock()
|
||||
defer lm.mu.Unlock()
|
||||
|
||||
lm.ref = 0
|
||||
lm.isWriteLock = false
|
||||
lm.m.Unlock()
|
||||
|
||||
}
|
||||
|
||||
// DRLocker returns a sync.Locker interface that implements
|
||||
|
Loading…
Reference in New Issue
Block a user