diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 92dcd74dd..7ceb5a792 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -571,11 +571,6 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) { - z, ok := objectAPI.(*erasureServerPools) - if !ok { - return - } - bucket := objInfo.Bucket object := objInfo.Name @@ -734,17 +729,27 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa eventName = event.ObjectReplicationFailed } + z, ok := objectAPI.(*erasureServerPools) + if !ok { + return + } + // This lower level implementation is necessary to avoid write locks from CopyObject. poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } else { - if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, objInfo.UserDefined, ObjectOptions{ - VersionID: objInfo.VersionID, - }); err != nil { + fi := FileInfo{} + fi.VersionID = objInfo.VersionID + fi.Metadata = make(map[string]string, len(objInfo.UserDefined)) + for k, v := range objInfo.UserDefined { + fi.Metadata[k] = v + } + if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } } + opType := replication.MetadataReplicationType if rtype == replicateAll { opType = replication.ObjectReplicationType diff --git a/cmd/config-current.go b/cmd/config-current.go index e6b3ba987..7207fe254 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -452,7 +452,7 @@ func lookupConfigs(s config.Config, setDriveCounts []int) { // if we validated all setDriveCounts and it was successful // proceed to store the correct storage class globally. if i == len(setDriveCounts)-1 { - globalStorageClass = sc + globalStorageClass.Update(sc) } } } diff --git a/cmd/config/storageclass/storage-class.go b/cmd/config/storageclass/storage-class.go index d1244b6bb..7dc6ad5e4 100644 --- a/cmd/config/storageclass/storage-class.go +++ b/cmd/config/storageclass/storage-class.go @@ -22,6 +22,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/minio/minio/cmd/config" "github.com/minio/minio/pkg/env" @@ -90,6 +91,9 @@ type StorageClass struct { Parity int } +// ConfigLock is a global lock for storage-class config +var ConfigLock = sync.RWMutex{} + // Config storage class configuration type Config struct { Standard StorageClass `json:"standard"` @@ -232,6 +236,8 @@ func validateParity(ssParity, rrsParity, setDriveCount int) (err error) { // is returned, the caller is expected to choose the right parity // at that point. func (sCfg Config) GetParityForSC(sc string) (parity int) { + ConfigLock.RLock() + defer ConfigLock.RUnlock() switch strings.TrimSpace(sc) { case RRS: // set the rrs parity if available @@ -244,8 +250,19 @@ func (sCfg Config) GetParityForSC(sc string) (parity int) { } } +// Update update storage-class with new config +func (sCfg Config) Update(newCfg Config) { + ConfigLock.Lock() + defer ConfigLock.Unlock() + sCfg.RRS = newCfg.RRS + sCfg.DMA = newCfg.DMA + sCfg.Standard = newCfg.Standard +} + // GetDMA - returns DMA configuration. func (sCfg Config) GetDMA() string { + ConfigLock.RLock() + defer ConfigLock.RUnlock() return sCfg.DMA } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 6b96c4648..17c015a49 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -96,7 +96,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d } onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi) - versionID := srcInfo.VersionID if srcInfo.versionOnly { versionID = dstOpts.VersionID @@ -1201,6 +1200,56 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) { } } +func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + var err error + // Lock the object before updating tags. + lk := er.NewNSLock(bucket, object) + ctx, err = lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + defer lk.Unlock() + + disks := er.getDisks() + + // Read metadata associated with the object from all disks. + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) + + readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) + if err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + + // List all online disks. + _, modTime := listOnlineDisks(disks, metaArr, errs) + + // Pick latest valid metadata. + fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + if err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + if fi.Deleted { + if opts.VersionID == "" { + return ObjectInfo{}, toObjectErr(errFileNotFound, bucket, object) + } + return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) + } + + for k, v := range opts.UserDefined { + fi.Metadata[k] = v + } + fi.ModTime = opts.MTime + fi.VersionID = opts.VersionID + + if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + + objInfo := fi.ToObjectInfo(bucket, object) + return objInfo, nil + +} + // PutObjectTags - replace or add tags to an existing object func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { var err error @@ -1215,15 +1264,15 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) - readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) + readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } // List all online disks. - onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) @@ -1237,118 +1286,43 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) } - onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi) - for i, metaFi := range metaArr { - if metaFi.IsValid() { - // clean fi.Meta of tag key, before updating the new tags - delete(metaFi.Metadata, xhttp.AmzObjectTagging) - // Don't update for empty tags - if tags != "" { - metaFi.Metadata[xhttp.AmzObjectTagging] = tags - } - for k, v := range opts.UserDefined { - metaFi.Metadata[k] = v - } - metaArr[i].Metadata = metaFi.Metadata - } - } - - tempObj := mustGetUUID() - - var online int - // Cleanup in case of xl.meta writing failure - defer func() { - if online != len(onlineDisks) { - er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum) - } - }() - - // Write unique `xl.meta` for each disk. - if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - - // Atomically rename metadata from tmp location to destination for each disk. - if onlineDisks, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - - online = countOnlineDisks(onlineDisks) - - objInfo := fi.ToObjectInfo(bucket, object) - objInfo.UserTags = tags - - return objInfo, nil -} - -// updateObjectMeta will update the metadata of a file. -func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, meta map[string]string, opts ObjectOptions) error { - if len(meta) == 0 { - return nil - } - disks := er.getDisks() - - // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true) - - readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) - if err != nil { - return toObjectErr(err, bucket, object) - } - - // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) - - // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) - if err != nil { - return toObjectErr(err, bucket, object) - } - - // Update metadata - for k, v := range meta { + fi.Metadata[xhttp.AmzObjectTagging] = tags + for k, v := range opts.UserDefined { fi.Metadata[k] = v } - if fi.Deleted { - if opts.VersionID == "" { - return toObjectErr(errFileNotFound, bucket, object) - } - return toObjectErr(errMethodNotAllowed, bucket, object) + if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) } - for i := range metaArr { - if errs[i] != nil { - // Avoid disks where loading metadata fail - continue - } + return fi.ToObjectInfo(bucket, object), nil +} - metaArr[i].Metadata = fi.Metadata +// updateObjectMeta will update the metadata of a file. +func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo) error { + if len(fi.Metadata) == 0 { + return nil } - tempObj := mustGetUUID() + disks := er.getDisks() - var online int - // Cleanup in case of xl.meta writing failure - defer func() { - if online != len(disks) { - er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum) - } - }() + g := errgroup.WithNErrs(len(disks)) - // Write unique `xl.meta` for each disk. - if disks, err = writeUniqueFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil { - return toObjectErr(err, bucket, object) + // Start writing `xl.meta` to all disks in parallel. + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + return disks[index].UpdateMetadata(ctx, bucket, object, fi) + }, index) } - // Atomically rename metadata from tmp location to destination for each disk. - if disks, err = renameFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { - logger.LogIf(ctx, err) - return toObjectErr(err, bucket, object) - } + // Wait for all the routines. + mErrs := g.Wait() - online = countOnlineDisks(disks) - return nil + return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, getWriteQuorum(len(disks))) } // DeleteObjectTags - delete object tags from an existing object diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 3658d6c1f..85c8a784c 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1778,6 +1778,22 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea } } +// PutObjectMetadata - replace or add tags to an existing object +func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + object = encodeDirObject(object) + if z.SinglePool() { + return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts) + } + + // We don't know the size here set 1GiB atleast. + idx, err := z.getPoolIdxExisting(ctx, bucket, object) + if err != nil { + return ObjectInfo{}, err + } + + return z.serverPools[idx].PutObjectMetadata(ctx, bucket, object, opts) +} + // PutObjectTags - replace or add tags to an existing object func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { object = encodeDirObject(object) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 57af94b04..ddc736d8a 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1318,6 +1318,12 @@ func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID return s.getHashedSet(object).HealObject(ctx, bucket, object, versionID, opts) } +// PutObjectMetadata - replace or add metadata to an existing object/version +func (s *erasureSets) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + er := s.getHashedSet(object) + return er.PutObjectMetadata(ctx, bucket, object, opts) +} + // PutObjectTags - replace or add tags to an existing object func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { er := s.getHashedSet(object) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 08d8b766d..24652854b 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -52,6 +52,12 @@ func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, upda return NotImplemented{} } +// PutObjectMetadata - not implemented for gateway. +func (a GatewayUnsupported) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { + logger.CriticalIf(ctx, errors.New("not implemented")) + return ObjectInfo{}, NotImplemented{} +} + // NewNSLock is a dummy stub for gateway. func (a GatewayUnsupported) NewNSLock(bucket string, objects ...string) RWLocker { logger.CriticalIf(context.Background(), errors.New("not implemented")) diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index c1ab114e3..b7cbb67a0 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -686,7 +686,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr // Update block 0 metadata. var retries int for { - err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{}) + meta := b.headerKV() + fi := FileInfo{ + Metadata: make(map[string]string, len(meta)), + } + for k, v := range meta { + fi.Metadata[k] = v + } + err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi) if err == nil { break } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 5c201fdab..52ee3df80 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -244,6 +244,13 @@ func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi return d.disk.WriteMetadata(ctx, volume, path, fi) } +func (d *naughtyDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { + if err := d.calcError(); err != nil { + return err + } + return d.disk.UpdateMetadata(ctx, volume, path, fi) +} + func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { if err := d.calcError(); err != nil { return err diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 469c0b872..bb6bd0af5 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -161,6 +161,9 @@ type ObjectLayer interface { Health(ctx context.Context, opts HealthOptions) HealthResult ReadHealth(ctx context.Context) bool + // Metadata operations + PutObjectMetadata(context.Context, string, string, ObjectOptions) (ObjectInfo, error) + // ObjectTagging operations PutObjectTags(context.Context, string, string, string, ObjectOptions) (ObjectInfo, error) GetObjectTags(context.Context, string, string, ObjectOptions) (*tags.Tags, error) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d7784ab97..6b967d345 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -3274,22 +3274,28 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) - if objInfo.UserTags != "" { - objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags + if objInfo.DeleteMarker { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) + return } + objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } - - objInfo.metadataOnly = true - if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ - VersionID: opts.VersionID, - }, ObjectOptions{ - VersionID: opts.VersionID, - MTime: opts.MTime, - }); err != nil { + // if version-id is not specified retention is supposed to be set on the latest object. + if opts.VersionID == "" { + opts.VersionID = objInfo.VersionID + } + popts := ObjectOptions{ + MTime: opts.MTime, + VersionID: opts.VersionID, + UserDefined: make(map[string]string, len(objInfo.UserDefined)), + } + for k, v := range objInfo.UserDefined { + popts.UserDefined[k] = v + } + if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } @@ -3441,28 +3447,34 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - + if objInfo.DeleteMarker { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) + return + } if objRetention.Mode.Valid() { objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode) objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = objRetention.RetainUntilDate.UTC().Format(time.RFC3339) } else { - delete(objInfo.UserDefined, strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)) - delete(objInfo.UserDefined, strings.ToLower(xhttp.AmzObjectLockMode)) - } - if objInfo.UserTags != "" { - objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags + objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = "" + objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = "" } replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } - objInfo.metadataOnly = true // Perform only metadata updates. - if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ - VersionID: opts.VersionID, - }, ObjectOptions{ - VersionID: opts.VersionID, - MTime: opts.MTime, - }); err != nil { + // if version-id is not specified retention is supposed to be set on the latest object. + if opts.VersionID == "" { + opts.VersionID = objInfo.VersionID + } + popts := ObjectOptions{ + MTime: opts.MTime, + VersionID: opts.VersionID, + UserDefined: make(map[string]string, len(objInfo.UserDefined)), + } + for k, v := range objInfo.UserDefined { + popts.UserDefined[k] = v + } + if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 5ca2f84d7..c7c13eaa6 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -55,6 +55,7 @@ type StorageAPI interface { DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error + UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index b610f8505..81c732c0b 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -360,6 +360,21 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path return err } +func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + + var reader bytes.Buffer + if err := msgp.Encode(&reader, &fi); err != nil { + return err + } + + respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1) + defer http.DrainBody(respBody) + return err +} + func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error { values := make(url.Values) values.Set(storageRESTVolume, volume) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 7db5666fe..4e90ef4f2 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v29" // Removed WalkVersions() + storageRESTVersion = "v30" // Added UpdateMetadata() storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -36,6 +36,7 @@ const ( storageRESTMethodCreateFile = "/createfile" storageRESTMethodWriteAll = "/writeall" storageRESTMethodWriteMetadata = "/writemetadata" + storageRESTMethodUpdateMetadata = "/updatemetadata" storageRESTMethodDeleteVersion = "/deleteversion" storageRESTMethodReadVersion = "/readversion" storageRESTMethodRenameData = "/renamedata" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index b161c5c86..8ecb6423b 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -373,6 +373,32 @@ func (s *storageRESTServer) WriteMetadataHandler(w http.ResponseWriter, r *http. } } +// UpdateMetadata update new updated metadata. +func (s *storageRESTServer) UpdateMetadataHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + + if r.ContentLength < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + var fi FileInfo + if err := msgp.Decode(r.Body, &fi); err != nil { + s.writeErrorResponse(w, err) + return + } + + err := s.storage.UpdateMetadata(r.Context(), volume, filePath, fi) + if err != nil { + s.writeErrorResponse(w, err) + } +} + // WriteAllHandler - write to file all content. func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1018,6 +1044,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWriteMetadata).HandlerFunc(httpTraceHdrs(server.WriteMetadataHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodUpdateMetadata).HandlerFunc(httpTraceHdrs(server.UpdateMetadataHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTForceDelMarker)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). diff --git a/cmd/storagemetric_string.go b/cmd/storagemetric_string.go index b8702d64d..8026aabef 100644 --- a/cmd/storagemetric_string.go +++ b/cmd/storagemetric_string.go @@ -29,14 +29,15 @@ func _() { _ = x[storageMetricWriteAll-18] _ = x[storageMetricDeleteVersion-19] _ = x[storageMetricWriteMetadata-20] - _ = x[storageMetricReadVersion-21] - _ = x[storageMetricReadAll-22] - _ = x[storageMetricLast-23] + _ = x[storageMetricUpdateMetadata-21] + _ = x[storageMetricReadVersion-22] + _ = x[storageMetricReadAll-23] + _ = x[storageMetricLast-24] } -const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataReadVersionReadAllLast" +const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllLast" -var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 212, 219, 223} +var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 237} func (i storageMetric) String() string { if i >= storageMetric(len(_storageMetric_index)-1) { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 20b2f5831..f370b0bb6 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -54,6 +54,7 @@ const ( storageMetricWriteAll storageMetricDeleteVersion storageMetricWriteMetadata + storageMetricUpdateMetadata storageMetricReadVersion storageMetricReadAll @@ -541,6 +542,22 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) } +func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { + defer p.updateStorageMetrics(storageMetricUpdateMetadata, volume, path)() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err = p.checkDiskStale(); err != nil { + return err + } + + return p.storage.UpdateMetadata(ctx, volume, path, fi) +} + func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { defer p.updateStorageMetrics(storageMetricWriteMetadata, volume, path)() diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index ac11b98c7..c29cbd4b6 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -624,6 +624,75 @@ func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { return append(dst, z.data...), nil } +// UpdateObjectVersion updates metadata and modTime for a given +// versionID, NOTE: versionID must be valid and should exist - +// and must not be a DeleteMarker or legacy object, if no +// versionID is specified 'null' versionID is updated instead. +// +// It is callers responsibility to set correct versionID, this +// function shouldn't be further extended to update immutable +// values such as ErasureInfo, ChecksumInfo. +// +// Metadata is only updated to new values, existing values +// stay as is, if you wish to update all values you should +// update all metadata freshly before calling this function +// in-case you wish to clear existing metadata. +func (z *xlMetaV2) UpdateObjectVersion(fi FileInfo) error { + if fi.VersionID == "" { + // this means versioning is not yet + // enabled or suspend i.e all versions + // are basically default value i.e "null" + fi.VersionID = nullVersionID + } + + var uv uuid.UUID + var err error + if fi.VersionID != "" && fi.VersionID != nullVersionID { + uv, err = uuid.Parse(fi.VersionID) + if err != nil { + return err + } + } + + for i, version := range z.Versions { + if !version.Valid() { + return errFileCorrupt + } + switch version.Type { + case LegacyType: + if version.ObjectV1.VersionID == fi.VersionID { + return errMethodNotAllowed + } + case ObjectType: + if version.ObjectV2.VersionID == uv { + for k, v := range fi.Metadata { + if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { + if v == "" { + delete(z.Versions[i].ObjectV2.MetaSys, k) + } else { + z.Versions[i].ObjectV2.MetaSys[k] = []byte(v) + } + } else { + if v == "" { + delete(z.Versions[i].ObjectV2.MetaUser, k) + } else { + z.Versions[i].ObjectV2.MetaUser[k] = v + } + } + } + if !fi.ModTime.IsZero() { + z.Versions[i].ObjectV2.ModTime = fi.ModTime.UnixNano() + } + return nil + } + case DeleteType: + return errMethodNotAllowed + } + } + + return errFileVersionNotFound +} + // AddVersion adds a new version func (z *xlMetaV2) AddVersion(fi FileInfo) error { if fi.VersionID == "" { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 7af109167..aff52c74b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -897,6 +897,44 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } +// Updates only metadata for a given version. +func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error { + if len(fi.Metadata) == 0 { + return errInvalidArgument + } + + buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) + if err != nil { + if err == errFileNotFound { + if fi.VersionID != "" { + return errFileVersionNotFound + } + } + return err + } + + if !isXL2V1Format(buf) { + return errFileVersionNotFound + } + + var xlMeta xlMetaV2 + if err = xlMeta.Load(buf); err != nil { + logger.LogIf(ctx, err) + return err + } + + if err = xlMeta.UpdateObjectVersion(fi); err != nil { + return err + } + + buf, err = xlMeta.AppendTo(nil) + if err != nil { + return err + } + + return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) +} + // WriteMetadata - writes FileInfo metadata for path at `xl.meta` func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error { buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) @@ -936,7 +974,6 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F } return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) - } func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) { @@ -1044,7 +1081,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str // Reading data for small objects when // - object has not yet transitioned - // - object size lesser than 32KiB + // - object size lesser than 128KiB // - object has maximum of 1 parts if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 { // Enable O_DIRECT optionally only if drive supports it. @@ -1891,7 +1928,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, if isXL2V1Format(dstBuf) { if err = xlMeta.Load(dstBuf); err != nil { logger.LogIf(s.ctx, err) - return errFileCorrupt + return err } } else { // This code-path is to preserve the legacy data. diff --git a/pkg/lsync/lrwmutex.go b/pkg/lsync/lrwmutex.go index 72719cd30..ddeea13e4 100644 --- a/pkg/lsync/lrwmutex.go +++ b/pkg/lsync/lrwmutex.go @@ -30,7 +30,7 @@ type LRWMutex struct { source string isWriteLock bool ref int - m sync.Mutex // Mutex to prevent multiple simultaneous locks + mu sync.Mutex // Mutex to prevent multiple simultaneous locks } // NewLRWMutex - initializes a new lsync RW mutex. @@ -73,7 +73,9 @@ func (lm *LRWMutex) GetRLock(ctx context.Context, id string, source string, time } func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) { - lm.m.Lock() + lm.mu.Lock() + defer lm.mu.Unlock() + lm.id = id lm.source = source if isWriteLock { @@ -88,7 +90,6 @@ func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) { locked = true } } - lm.m.Unlock() return locked } @@ -147,7 +148,8 @@ func (lm *LRWMutex) RUnlock() { } func (lm *LRWMutex) unlock(isWriteLock bool) (unlocked bool) { - lm.m.Lock() + lm.mu.Lock() + defer lm.mu.Unlock() // Try to release lock. if isWriteLock { @@ -165,16 +167,17 @@ func (lm *LRWMutex) unlock(isWriteLock bool) (unlocked bool) { } } - lm.m.Unlock() return unlocked } // ForceUnlock will forcefully clear a write or read lock. func (lm *LRWMutex) ForceUnlock() { - lm.m.Lock() + lm.mu.Lock() + defer lm.mu.Unlock() + lm.ref = 0 lm.isWriteLock = false - lm.m.Unlock() + } // DRLocker returns a sync.Locker interface that implements