From d46386246fb6db5f823df54d932b6f7274d46059 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 4 Apr 2021 13:32:31 -0700 Subject: [PATCH] api: Introduce metadata update APIs to update only metadata (#11962) Current implementation heavily relies on readAllFileInfo but with the advent of xl.meta inlined with data, we cannot easily avoid reading data when we are only interested is updating metadata, this leads to invariably write amplification during metadata updates, repeatedly reading data when we are only interested in updating metadata. This PR ensures that we implement a metadata only update API at storage layer, that handles updates to metadata alone for any given version - given the version is valid and present. This helps reduce the chattiness for following calls.. - PutObjectTags - DeleteObjectTags - PutObjectLegalHold - PutObjectRetention - ReplicateObject (updates metadata on replication status) --- cmd/bucket-replication.go | 21 ++- cmd/config-current.go | 2 +- cmd/config/storageclass/storage-class.go | 17 +++ cmd/erasure-object.go | 180 ++++++++++------------- cmd/erasure-server-pool.go | 16 ++ cmd/erasure-sets.go | 6 + cmd/gateway-unsupported.go | 6 + cmd/metacache-set.go | 9 +- cmd/naughty-disk_test.go | 7 + cmd/object-api-interface.go | 3 + cmd/object-handlers.go | 60 +++++--- cmd/storage-interface.go | 1 + cmd/storage-rest-client.go | 15 ++ cmd/storage-rest-common.go | 3 +- cmd/storage-rest-server.go | 28 ++++ cmd/storagemetric_string.go | 11 +- cmd/xl-storage-disk-id-check.go | 17 +++ cmd/xl-storage-format-v2.go | 69 +++++++++ cmd/xl-storage.go | 43 +++++- pkg/lsync/lrwmutex.go | 17 ++- 20 files changed, 378 insertions(+), 153 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 92dcd74dd..7ceb5a792 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -571,11 +571,6 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) { - z, ok := objectAPI.(*erasureServerPools) - if !ok { - return - } - bucket := objInfo.Bucket object := objInfo.Name @@ -734,17 +729,27 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa eventName = event.ObjectReplicationFailed } + z, ok := objectAPI.(*erasureServerPools) + if !ok { + return + } + // This lower level implementation is necessary to avoid write locks from CopyObject. poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } else { - if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, objInfo.UserDefined, ObjectOptions{ - VersionID: objInfo.VersionID, - }); err != nil { + fi := FileInfo{} + fi.VersionID = objInfo.VersionID + fi.Metadata = make(map[string]string, len(objInfo.UserDefined)) + for k, v := range objInfo.UserDefined { + fi.Metadata[k] = v + } + if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } } + opType := replication.MetadataReplicationType if rtype == replicateAll { opType = replication.ObjectReplicationType diff --git a/cmd/config-current.go b/cmd/config-current.go index e6b3ba987..7207fe254 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -452,7 +452,7 @@ func lookupConfigs(s config.Config, setDriveCounts []int) { // if we validated all setDriveCounts and it was successful // proceed to store the correct storage class globally. if i == len(setDriveCounts)-1 { - globalStorageClass = sc + globalStorageClass.Update(sc) } } } diff --git a/cmd/config/storageclass/storage-class.go b/cmd/config/storageclass/storage-class.go index d1244b6bb..7dc6ad5e4 100644 --- a/cmd/config/storageclass/storage-class.go +++ b/cmd/config/storageclass/storage-class.go @@ -22,6 +22,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/minio/minio/cmd/config" "github.com/minio/minio/pkg/env" @@ -90,6 +91,9 @@ type StorageClass struct { Parity int } +// ConfigLock is a global lock for storage-class config +var ConfigLock = sync.RWMutex{} + // Config storage class configuration type Config struct { Standard StorageClass `json:"standard"` @@ -232,6 +236,8 @@ func validateParity(ssParity, rrsParity, setDriveCount int) (err error) { // is returned, the caller is expected to choose the right parity // at that point. func (sCfg Config) GetParityForSC(sc string) (parity int) { + ConfigLock.RLock() + defer ConfigLock.RUnlock() switch strings.TrimSpace(sc) { case RRS: // set the rrs parity if available @@ -244,8 +250,19 @@ func (sCfg Config) GetParityForSC(sc string) (parity int) { } } +// Update update storage-class with new config +func (sCfg Config) Update(newCfg Config) { + ConfigLock.Lock() + defer ConfigLock.Unlock() + sCfg.RRS = newCfg.RRS + sCfg.DMA = newCfg.DMA + sCfg.Standard = newCfg.Standard +} + // GetDMA - returns DMA configuration. func (sCfg Config) GetDMA() string { + ConfigLock.RLock() + defer ConfigLock.RUnlock() return sCfg.DMA } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 6b96c4648..17c015a49 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -96,7 +96,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d } onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi) - versionID := srcInfo.VersionID if srcInfo.versionOnly { versionID = dstOpts.VersionID @@ -1201,6 +1200,56 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) { } } +func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + var err error + // Lock the object before updating tags. + lk := er.NewNSLock(bucket, object) + ctx, err = lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + defer lk.Unlock() + + disks := er.getDisks() + + // Read metadata associated with the object from all disks. + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) + + readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) + if err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + + // List all online disks. + _, modTime := listOnlineDisks(disks, metaArr, errs) + + // Pick latest valid metadata. + fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + if err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + if fi.Deleted { + if opts.VersionID == "" { + return ObjectInfo{}, toObjectErr(errFileNotFound, bucket, object) + } + return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) + } + + for k, v := range opts.UserDefined { + fi.Metadata[k] = v + } + fi.ModTime = opts.MTime + fi.VersionID = opts.VersionID + + if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + + objInfo := fi.ToObjectInfo(bucket, object) + return objInfo, nil + +} + // PutObjectTags - replace or add tags to an existing object func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { var err error @@ -1215,15 +1264,15 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) - readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) + readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } // List all online disks. - onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) @@ -1237,118 +1286,43 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) } - onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi) - for i, metaFi := range metaArr { - if metaFi.IsValid() { - // clean fi.Meta of tag key, before updating the new tags - delete(metaFi.Metadata, xhttp.AmzObjectTagging) - // Don't update for empty tags - if tags != "" { - metaFi.Metadata[xhttp.AmzObjectTagging] = tags - } - for k, v := range opts.UserDefined { - metaFi.Metadata[k] = v - } - metaArr[i].Metadata = metaFi.Metadata - } - } - - tempObj := mustGetUUID() - - var online int - // Cleanup in case of xl.meta writing failure - defer func() { - if online != len(onlineDisks) { - er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum) - } - }() - - // Write unique `xl.meta` for each disk. - if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - - // Atomically rename metadata from tmp location to destination for each disk. - if onlineDisks, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - - online = countOnlineDisks(onlineDisks) - - objInfo := fi.ToObjectInfo(bucket, object) - objInfo.UserTags = tags - - return objInfo, nil -} - -// updateObjectMeta will update the metadata of a file. -func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, meta map[string]string, opts ObjectOptions) error { - if len(meta) == 0 { - return nil - } - disks := er.getDisks() - - // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true) - - readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) - if err != nil { - return toObjectErr(err, bucket, object) - } - - // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) - - // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) - if err != nil { - return toObjectErr(err, bucket, object) - } - - // Update metadata - for k, v := range meta { + fi.Metadata[xhttp.AmzObjectTagging] = tags + for k, v := range opts.UserDefined { fi.Metadata[k] = v } - if fi.Deleted { - if opts.VersionID == "" { - return toObjectErr(errFileNotFound, bucket, object) - } - return toObjectErr(errMethodNotAllowed, bucket, object) + if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) } - for i := range metaArr { - if errs[i] != nil { - // Avoid disks where loading metadata fail - continue - } + return fi.ToObjectInfo(bucket, object), nil +} - metaArr[i].Metadata = fi.Metadata +// updateObjectMeta will update the metadata of a file. +func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo) error { + if len(fi.Metadata) == 0 { + return nil } - tempObj := mustGetUUID() + disks := er.getDisks() - var online int - // Cleanup in case of xl.meta writing failure - defer func() { - if online != len(disks) { - er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum) - } - }() + g := errgroup.WithNErrs(len(disks)) - // Write unique `xl.meta` for each disk. - if disks, err = writeUniqueFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil { - return toObjectErr(err, bucket, object) + // Start writing `xl.meta` to all disks in parallel. + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + return disks[index].UpdateMetadata(ctx, bucket, object, fi) + }, index) } - // Atomically rename metadata from tmp location to destination for each disk. - if disks, err = renameFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { - logger.LogIf(ctx, err) - return toObjectErr(err, bucket, object) - } + // Wait for all the routines. + mErrs := g.Wait() - online = countOnlineDisks(disks) - return nil + return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, getWriteQuorum(len(disks))) } // DeleteObjectTags - delete object tags from an existing object diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 3658d6c1f..85c8a784c 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1778,6 +1778,22 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea } } +// PutObjectMetadata - replace or add tags to an existing object +func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + object = encodeDirObject(object) + if z.SinglePool() { + return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts) + } + + // We don't know the size here set 1GiB atleast. + idx, err := z.getPoolIdxExisting(ctx, bucket, object) + if err != nil { + return ObjectInfo{}, err + } + + return z.serverPools[idx].PutObjectMetadata(ctx, bucket, object, opts) +} + // PutObjectTags - replace or add tags to an existing object func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { object = encodeDirObject(object) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 57af94b04..ddc736d8a 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1318,6 +1318,12 @@ func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID return s.getHashedSet(object).HealObject(ctx, bucket, object, versionID, opts) } +// PutObjectMetadata - replace or add metadata to an existing object/version +func (s *erasureSets) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + er := s.getHashedSet(object) + return er.PutObjectMetadata(ctx, bucket, object, opts) +} + // PutObjectTags - replace or add tags to an existing object func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { er := s.getHashedSet(object) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 08d8b766d..24652854b 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -52,6 +52,12 @@ func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, upda return NotImplemented{} } +// PutObjectMetadata - not implemented for gateway. +func (a GatewayUnsupported) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + logger.CriticalIf(ctx, errors.New("not implemented")) + return ObjectInfo{}, NotImplemented{} +} + // NewNSLock is a dummy stub for gateway. func (a GatewayUnsupported) NewNSLock(bucket string, objects ...string) RWLocker { logger.CriticalIf(context.Background(), errors.New("not implemented")) diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index c1ab114e3..b7cbb67a0 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -686,7 +686,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr // Update block 0 metadata. var retries int for { - err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{}) + meta := b.headerKV() + fi := FileInfo{ + Metadata: make(map[string]string, len(meta)), + } + for k, v := range meta { + fi.Metadata[k] = v + } + err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi) if err == nil { break } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 5c201fdab..52ee3df80 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -244,6 +244,13 @@ func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi return d.disk.WriteMetadata(ctx, volume, path, fi) } +func (d *naughtyDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { + if err := d.calcError(); err != nil { + return err + } + return d.disk.UpdateMetadata(ctx, volume, path, fi) +} + func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { if err := d.calcError(); err != nil { return err diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 469c0b872..bb6bd0af5 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -161,6 +161,9 @@ type ObjectLayer interface { Health(ctx context.Context, opts HealthOptions) HealthResult ReadHealth(ctx context.Context) bool + // Metadata operations + PutObjectMetadata(context.Context, string, string, ObjectOptions) (ObjectInfo, error) + // ObjectTagging operations PutObjectTags(context.Context, string, string, string, ObjectOptions) (ObjectInfo, error) GetObjectTags(context.Context, string, string, ObjectOptions) (*tags.Tags, error) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d7784ab97..6b967d345 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -3274,22 +3274,28 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) - if objInfo.UserTags != "" { - objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags + if objInfo.DeleteMarker { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) + return } + objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } - - objInfo.metadataOnly = true - if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ - VersionID: opts.VersionID, - }, ObjectOptions{ - VersionID: opts.VersionID, - MTime: opts.MTime, - }); err != nil { + // if version-id is not specified retention is supposed to be set on the latest object. + if opts.VersionID == "" { + opts.VersionID = objInfo.VersionID + } + popts := ObjectOptions{ + MTime: opts.MTime, + VersionID: opts.VersionID, + UserDefined: make(map[string]string, len(objInfo.UserDefined)), + } + for k, v := range objInfo.UserDefined { + popts.UserDefined[k] = v + } + if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } @@ -3441,28 +3447,34 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - + if objInfo.DeleteMarker { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) + return + } if objRetention.Mode.Valid() { objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode) objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = objRetention.RetainUntilDate.UTC().Format(time.RFC3339) } else { - delete(objInfo.UserDefined, strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)) - delete(objInfo.UserDefined, strings.ToLower(xhttp.AmzObjectLockMode)) - } - if objInfo.UserTags != "" { - objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags + objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = "" + objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = "" } replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } - objInfo.metadataOnly = true // Perform only metadata updates. - if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ - VersionID: opts.VersionID, - }, ObjectOptions{ - VersionID: opts.VersionID, - MTime: opts.MTime, - }); err != nil { + // if version-id is not specified retention is supposed to be set on the latest object. + if opts.VersionID == "" { + opts.VersionID = objInfo.VersionID + } + popts := ObjectOptions{ + MTime: opts.MTime, + VersionID: opts.VersionID, + UserDefined: make(map[string]string, len(objInfo.UserDefined)), + } + for k, v := range objInfo.UserDefined { + popts.UserDefined[k] = v + } + if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 5ca2f84d7..c7c13eaa6 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -55,6 +55,7 @@ type StorageAPI interface { DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error + UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index b610f8505..81c732c0b 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -360,6 +360,21 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path return err } +func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + + var reader bytes.Buffer + if err := msgp.Encode(&reader, &fi); err != nil { + return err + } + + respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1) + defer http.DrainBody(respBody) + return err +} + func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error { values := make(url.Values) values.Set(storageRESTVolume, volume) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 7db5666fe..4e90ef4f2 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v29" // Removed WalkVersions() + storageRESTVersion = "v30" // Added UpdateMetadata() storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -36,6 +36,7 @@ const ( storageRESTMethodCreateFile = "/createfile" storageRESTMethodWriteAll = "/writeall" storageRESTMethodWriteMetadata = "/writemetadata" + storageRESTMethodUpdateMetadata = "/updatemetadata" storageRESTMethodDeleteVersion = "/deleteversion" storageRESTMethodReadVersion = "/readversion" storageRESTMethodRenameData = "/renamedata" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index b161c5c86..8ecb6423b 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -373,6 +373,32 @@ func (s *storageRESTServer) WriteMetadataHandler(w http.ResponseWriter, r *http. } } +// UpdateMetadata update new updated metadata. +func (s *storageRESTServer) UpdateMetadataHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + + if r.ContentLength < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + var fi FileInfo + if err := msgp.Decode(r.Body, &fi); err != nil { + s.writeErrorResponse(w, err) + return + } + + err := s.storage.UpdateMetadata(r.Context(), volume, filePath, fi) + if err != nil { + s.writeErrorResponse(w, err) + } +} + // WriteAllHandler - write to file all content. func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1018,6 +1044,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWriteMetadata).HandlerFunc(httpTraceHdrs(server.WriteMetadataHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodUpdateMetadata).HandlerFunc(httpTraceHdrs(server.UpdateMetadataHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTForceDelMarker)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). diff --git a/cmd/storagemetric_string.go b/cmd/storagemetric_string.go index b8702d64d..8026aabef 100644 --- a/cmd/storagemetric_string.go +++ b/cmd/storagemetric_string.go @@ -29,14 +29,15 @@ func _() { _ = x[storageMetricWriteAll-18] _ = x[storageMetricDeleteVersion-19] _ = x[storageMetricWriteMetadata-20] - _ = x[storageMetricReadVersion-21] - _ = x[storageMetricReadAll-22] - _ = x[storageMetricLast-23] + _ = x[storageMetricUpdateMetadata-21] + _ = x[storageMetricReadVersion-22] + _ = x[storageMetricReadAll-23] + _ = x[storageMetricLast-24] } -const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataReadVersionReadAllLast" +const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllLast" -var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 212, 219, 223} +var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 237} func (i storageMetric) String() string { if i >= storageMetric(len(_storageMetric_index)-1) { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 20b2f5831..f370b0bb6 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -54,6 +54,7 @@ const ( storageMetricWriteAll storageMetricDeleteVersion storageMetricWriteMetadata + storageMetricUpdateMetadata storageMetricReadVersion storageMetricReadAll @@ -541,6 +542,22 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) } +func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { + defer p.updateStorageMetrics(storageMetricUpdateMetadata, volume, path)() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err = p.checkDiskStale(); err != nil { + return err + } + + return p.storage.UpdateMetadata(ctx, volume, path, fi) +} + func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { defer p.updateStorageMetrics(storageMetricWriteMetadata, volume, path)() diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index ac11b98c7..c29cbd4b6 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -624,6 +624,75 @@ func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { return append(dst, z.data...), nil } +// UpdateObjectVersion updates metadata and modTime for a given +// versionID, NOTE: versionID must be valid and should exist - +// and must not be a DeleteMarker or legacy object, if no +// versionID is specified 'null' versionID is updated instead. +// +// It is callers responsibility to set correct versionID, this +// function shouldn't be further extended to update immutable +// values such as ErasureInfo, ChecksumInfo. +// +// Metadata is only updated to new values, existing values +// stay as is, if you wish to update all values you should +// update all metadata freshly before calling this function +// in-case you wish to clear existing metadata. +func (z *xlMetaV2) UpdateObjectVersion(fi FileInfo) error { + if fi.VersionID == "" { + // this means versioning is not yet + // enabled or suspend i.e all versions + // are basically default value i.e "null" + fi.VersionID = nullVersionID + } + + var uv uuid.UUID + var err error + if fi.VersionID != "" && fi.VersionID != nullVersionID { + uv, err = uuid.Parse(fi.VersionID) + if err != nil { + return err + } + } + + for i, version := range z.Versions { + if !version.Valid() { + return errFileCorrupt + } + switch version.Type { + case LegacyType: + if version.ObjectV1.VersionID == fi.VersionID { + return errMethodNotAllowed + } + case ObjectType: + if version.ObjectV2.VersionID == uv { + for k, v := range fi.Metadata { + if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { + if v == "" { + delete(z.Versions[i].ObjectV2.MetaSys, k) + } else { + z.Versions[i].ObjectV2.MetaSys[k] = []byte(v) + } + } else { + if v == "" { + delete(z.Versions[i].ObjectV2.MetaUser, k) + } else { + z.Versions[i].ObjectV2.MetaUser[k] = v + } + } + } + if !fi.ModTime.IsZero() { + z.Versions[i].ObjectV2.ModTime = fi.ModTime.UnixNano() + } + return nil + } + case DeleteType: + return errMethodNotAllowed + } + } + + return errFileVersionNotFound +} + // AddVersion adds a new version func (z *xlMetaV2) AddVersion(fi FileInfo) error { if fi.VersionID == "" { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 7af109167..aff52c74b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -897,6 +897,44 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } +// Updates only metadata for a given version. +func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error { + if len(fi.Metadata) == 0 { + return errInvalidArgument + } + + buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) + if err != nil { + if err == errFileNotFound { + if fi.VersionID != "" { + return errFileVersionNotFound + } + } + return err + } + + if !isXL2V1Format(buf) { + return errFileVersionNotFound + } + + var xlMeta xlMetaV2 + if err = xlMeta.Load(buf); err != nil { + logger.LogIf(ctx, err) + return err + } + + if err = xlMeta.UpdateObjectVersion(fi); err != nil { + return err + } + + buf, err = xlMeta.AppendTo(nil) + if err != nil { + return err + } + + return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) +} + // WriteMetadata - writes FileInfo metadata for path at `xl.meta` func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error { buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) @@ -936,7 +974,6 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F } return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) - } func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) { @@ -1044,7 +1081,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str // Reading data for small objects when // - object has not yet transitioned - // - object size lesser than 32KiB + // - object size lesser than 128KiB // - object has maximum of 1 parts if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 { // Enable O_DIRECT optionally only if drive supports it. @@ -1891,7 +1928,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, if isXL2V1Format(dstBuf) { if err = xlMeta.Load(dstBuf); err != nil { logger.LogIf(s.ctx, err) - return errFileCorrupt + return err } } else { // This code-path is to preserve the legacy data. diff --git a/pkg/lsync/lrwmutex.go b/pkg/lsync/lrwmutex.go index 72719cd30..ddeea13e4 100644 --- a/pkg/lsync/lrwmutex.go +++ b/pkg/lsync/lrwmutex.go @@ -30,7 +30,7 @@ type LRWMutex struct { source string isWriteLock bool ref int - m sync.Mutex // Mutex to prevent multiple simultaneous locks + mu sync.Mutex // Mutex to prevent multiple simultaneous locks } // NewLRWMutex - initializes a new lsync RW mutex. @@ -73,7 +73,9 @@ func (lm *LRWMutex) GetRLock(ctx context.Context, id string, source string, time } func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) { - lm.m.Lock() + lm.mu.Lock() + defer lm.mu.Unlock() + lm.id = id lm.source = source if isWriteLock { @@ -88,7 +90,6 @@ func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) { locked = true } } - lm.m.Unlock() return locked } @@ -147,7 +148,8 @@ func (lm *LRWMutex) RUnlock() { } func (lm *LRWMutex) unlock(isWriteLock bool) (unlocked bool) { - lm.m.Lock() + lm.mu.Lock() + defer lm.mu.Unlock() // Try to release lock. if isWriteLock { @@ -165,16 +167,17 @@ func (lm *LRWMutex) unlock(isWriteLock bool) (unlocked bool) { } } - lm.m.Unlock() return unlocked } // ForceUnlock will forcefully clear a write or read lock. func (lm *LRWMutex) ForceUnlock() { - lm.m.Lock() + lm.mu.Lock() + defer lm.mu.Unlock() + lm.ref = 0 lm.isWriteLock = false - lm.m.Unlock() + } // DRLocker returns a sync.Locker interface that implements