From fba883839d8719f649bfda65b6561a642ec21a95 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 22 Nov 2023 13:46:17 -0800 Subject: [PATCH] feat: bring new HDD related performance enhancements (#18239) Optionally allows customers to enable - Enable an external cache to catch GET/HEAD responses - Enable skipping disks that are slow to respond in GET/HEAD when we have already achieved a quorum --- cmd/api-response.go | 64 +- cmd/bucket-handlers.go | 39 +- cmd/config-current.go | 20 + cmd/erasure-healing-common.go | 1 + cmd/erasure-metadata-utils.go | 45 +- cmd/erasure-metadata.go | 1 + cmd/erasure-object.go | 369 +++++--- cmd/globals.go | 4 + cmd/object-api-datatypes.go | 13 + cmd/object-api-interface.go | 2 + cmd/object-api-utils.go | 6 +- cmd/object-handlers-common.go | 89 +- cmd/object-handlers.go | 351 +++++++- cmd/object-multipart-handlers.go | 17 + cmd/server-main.go | 4 +- cmd/storage-datatypes.go | 10 + cmd/storage-errors.go | 2 + cmd/storage-rest-common.go | 1 + cmd/xl-storage.go | 75 +- .../setup_2site_existing_replication.sh | 2 +- internal/config/cache/cache.go | 238 ++++++ internal/config/cache/help.go | 48 ++ internal/config/cache/remote.go | 111 +++ internal/config/cache/remote_gen.go | 795 ++++++++++++++++++ internal/config/cache/remote_gen_test.go | 236 ++++++ internal/config/config.go | 2 + 26 files changed, 2294 insertions(+), 251 deletions(-) create mode 100644 internal/config/cache/cache.go create mode 100644 internal/config/cache/help.go create mode 100644 internal/config/cache/remote.go create mode 100644 internal/config/cache/remote_gen.go create mode 100644 internal/config/cache/remote_gen_test.go diff --git a/cmd/api-response.go b/cmd/api-response.go index 14ccb666e..d713fc5f8 100644 --- a/cmd/api-response.go +++ b/cmd/api-response.go @@ -502,6 +502,47 @@ func generateListBucketsResponse(buckets []BucketInfo) ListBucketsResponse { return data } +func cleanReservedKeys(metadata map[string]string) map[string]string { + m := cloneMSS(metadata) + + switch kind, _ := crypto.IsEncrypted(metadata); kind { + case crypto.S3: + m[xhttp.AmzServerSideEncryption] = xhttp.AmzEncryptionAES + case crypto.S3KMS: + m[xhttp.AmzServerSideEncryption] = xhttp.AmzEncryptionKMS + m[xhttp.AmzServerSideEncryptionKmsID] = kmsKeyIDFromMetadata(metadata) + if kmsCtx, ok := metadata[crypto.MetaContext]; ok { + m[xhttp.AmzServerSideEncryptionKmsContext] = kmsCtx + } + case crypto.SSEC: + m[xhttp.AmzServerSideEncryptionCustomerAlgorithm] = xhttp.AmzEncryptionAES + + } + + var toRemove []string + for k := range cleanMinioInternalMetadataKeys(m) { + if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) { + // Do not need to send any internal metadata + // values to client. + toRemove = append(toRemove, k) + continue + } + + // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w + if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { + toRemove = append(toRemove, k) + continue + } + } + + for _, k := range toRemove { + delete(m, k) + delete(m, strings.ToLower(k)) + } + + return m +} + // generates an ListBucketVersions response for the said bucket with other enumerated options. func generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delimiter, encodingType string, maxKeys int, resp ListObjectVersionsInfo, metadata metaCheckFn) ListVersionsResponse { versions := make([]ObjectVersion, 0, len(resp.Objects)) @@ -549,18 +590,10 @@ func generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delim case crypto.SSEC: content.UserMetadata.Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, xhttp.AmzEncryptionAES) } - for k, v := range cleanMinioInternalMetadataKeys(object.UserDefined) { - if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) { - // Do not need to send any internal metadata - // values to client. - continue - } - // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w - if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { - continue - } + for k, v := range cleanReservedKeys(object.UserDefined) { content.UserMetadata.Set(k, v) } + content.UserMetadata.Set("expires", object.Expires.Format(http.TimeFormat)) content.Internal = &ObjectInternalInfo{ K: object.DataBlocks, @@ -693,16 +726,7 @@ func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter, case crypto.SSEC: content.UserMetadata.Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, xhttp.AmzEncryptionAES) } - for k, v := range cleanMinioInternalMetadataKeys(object.UserDefined) { - if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) { - // Do not need to send any internal metadata - // values to client. - continue - } - // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w - if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { - continue - } + for k, v := range cleanReservedKeys(object.UserDefined) { content.UserMetadata.Set(k, v) } content.UserMetadata.Set("expires", object.Expires.Format(http.TimeFormat)) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 4b9ddd18d..8169d5cd9 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -51,6 +51,7 @@ import ( sse "github.com/minio/minio/internal/bucket/encryption" objectlock "github.com/minio/minio/internal/bucket/object/lock" "github.com/minio/minio/internal/bucket/replication" + "github.com/minio/minio/internal/config/cache" "github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/event" @@ -668,6 +669,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, continue } + defer globalCacheConfig.Delete(bucket, dobj.ObjectName) + if replicateDeletes && (dobj.DeleteMarkerReplicationStatus() == replication.Pending || dobj.VersionPurgeStatus() == Pending) { // copy so we can re-add null ID. dobj := dobj @@ -1343,6 +1346,22 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h continue } + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: getDecryptedETag(formValues, objInfo, false), + ModTime: objInfo.ModTime, + Expires: objInfo.Expires.UTC().Format(http.TimeFormat), + CacheControl: objInfo.CacheControl, + Metadata: cleanReservedKeys(objInfo.UserDefined), + Size: asize, + }) + fanOutResp = append(fanOutResp, minio.PutObjectFanOutResponse{ Key: objInfo.Name, ETag: getDecryptedETag(formValues, objInfo, false), @@ -1399,10 +1418,12 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h return } + etag := getDecryptedETag(formValues, objInfo, false) + // We must not use the http.Header().Set method here because some (broken) // clients expect the ETag header key to be literally "ETag" - not "Etag" (case-sensitive). // Therefore, we have to set the ETag directly as map entry. - w.Header()[xhttp.ETag] = []string{`"` + objInfo.ETag + `"`} + w.Header()[xhttp.ETag] = []string{`"` + etag + `"`} // Set the relevant version ID as part of the response header. if objInfo.VersionID != "" && objInfo.VersionID != nullVersionID { @@ -1413,6 +1434,22 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h w.Header().Set(xhttp.Location, obj) } + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + defer globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: etag, + ModTime: objInfo.ModTime, + Expires: objInfo.ExpiresStr(), + CacheControl: objInfo.CacheControl, + Metadata: cleanReservedKeys(objInfo.UserDefined), + Size: asize, + }) + // Notify object created event. defer sendEvent(eventArgs{ EventName: event.ObjectCreatedPost, diff --git a/cmd/config-current.go b/cmd/config-current.go index 105a32287..c3b9fdc3d 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -27,6 +27,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/api" + "github.com/minio/minio/internal/config/cache" "github.com/minio/minio/internal/config/callhome" "github.com/minio/minio/internal/config/compress" "github.com/minio/minio/internal/config/dns" @@ -68,6 +69,7 @@ func initHelp() { config.ScannerSubSys: scanner.DefaultKVS, config.SubnetSubSys: subnet.DefaultKVS, config.CallhomeSubSys: callhome.DefaultKVS, + config.CacheSubSys: cache.DefaultKVS, } for k, v := range notify.DefaultNotificationKVS { kvs[k] = v @@ -206,6 +208,12 @@ func initHelp() { Key: config.EtcdSubSys, Description: "persist IAM assets externally to etcd", }, + config.HelpKV{ + Key: config.CacheSubSys, + Type: "string", + Description: "enable various cache optimizations on MinIO for reads", + Optional: true, + }, } if globalIsErasure { @@ -250,6 +258,7 @@ func initHelp() { config.LambdaWebhookSubSys: lambda.HelpWebhook, config.SubnetSubSys: subnet.HelpSubnet, config.CallhomeSubSys: callhome.HelpCallhome, + config.CacheSubSys: cache.Help, } config.RegisterHelpSubSys(helpMap) @@ -357,6 +366,10 @@ func validateSubSysConfig(ctx context.Context, s config.Config, subSys string, o if cfg.Enabled() && !globalSubnetConfig.Registered() { return errors.New("Deployment is not registered with SUBNET. Please register the deployment via 'mc license register ALIAS'") } + case config.CacheSubSys: + if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport); err != nil { + return err + } case config.PolicyOPASubSys: // In case legacy OPA config is being set, we treat it as if the // AuthZPlugin is being set. @@ -632,6 +645,13 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf initCallhome(ctx, objAPI) } } + case config.CacheSubSys: + cacheCfg, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to load cache config: %w", err)) + } else { + globalCacheConfig.Update(cacheCfg) + } } globalServerConfigMu.Lock() defer globalServerConfigMu.Unlock() diff --git a/cmd/erasure-healing-common.go b/cmd/erasure-healing-common.go index 7629047e4..0f576d0d9 100644 --- a/cmd/erasure-healing-common.go +++ b/cmd/erasure-healing-common.go @@ -286,6 +286,7 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad availableDisks := make([]StorageAPI, len(onlineDisks)) dataErrs := make([]error, len(onlineDisks)) + inconsistent := 0 for i, meta := range partsMetadata { if !meta.IsValid() { diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index ef3bc3f0f..7a858fa90 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -148,6 +148,26 @@ func hashOrder(key string, cardinality int) []int { return nums } +var readFileInfoIgnoredErrs = append(objectOpIgnoredErrs, + errFileNotFound, + errVolumeNotFound, + errFileVersionNotFound, + io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors + io.EOF, // some times we would read without locks, ignore these errors +) + +func readFileInfo(ctx context.Context, disk StorageAPI, bucket, object, versionID string, opts ReadOptions) (FileInfo, error) { + fi, err := disk.ReadVersion(ctx, bucket, object, versionID, opts) + + if err != nil && !IsErr(err, readFileInfoIgnoredErrs...) { + logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", + disk.String(), bucket, object, err), + disk.String()) + } + + return fi, err +} + // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData, healing bool) ([]FileInfo, []error) { @@ -166,33 +186,12 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, opts) + metadataArray[index], err = readFileInfo(ctx, disks[index], bucket, object, versionID, opts) return err }, index) } - ignoredErrs := []error{ - errFileNotFound, - errVolumeNotFound, - errFileVersionNotFound, - io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors - io.EOF, // some times we would read without locks, ignore these errors - } - ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...) - errs := g.Wait() - for index, err := range errs { - if err == nil { - continue - } - if !IsErr(err, ignoredErrs...) { - logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", - disks[index], bucket, object, err), - disks[index].String()) - } - } - - // Return all the metadata. - return metadataArray, errs + return metadataArray, g.Wait() } // shuffleDisksAndPartsMetadataByIndex this function should be always used by GetObjectNInfo() diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 5cd78ee96..693addc27 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -158,6 +158,7 @@ func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInf ContentEncoding: fi.Metadata["content-encoding"], NumVersions: fi.NumVersions, SuccessorModTime: fi.SuccessorModTime, + CacheControl: fi.Metadata["cache-control"], } if exp, ok := fi.Metadata["expires"]; ok { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 003792e22..edb429e41 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -50,7 +50,7 @@ import ( ) // list all errors which can be ignored in object operations. -var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnformattedDisk) +var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnformattedDisk, errDiskOngoingReq) // Object Operations @@ -545,66 +545,111 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st return m, err } -func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData, inclFreeVers, allParts bool) ([]FileInfo, []error) { - metadataArray := make([]*xlMetaV2, len(disks)) - metaFileInfos := make([]FileInfo, len(metadataArray)) - metadataShallowVersions := make([][]xlMetaV2ShallowVersion, len(disks)) - var v2bufs [][]byte - if !readData { - v2bufs = make([][]byte, len(disks)) +var readRawFileInfoErrs = append(objectOpIgnoredErrs, + errFileNotFound, + errFileNameTooLong, + errVolumeNotFound, + errFileVersionNotFound, + io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors + io.EOF, // some times we would read without locks, ignore these errors + msgp.ErrShortBytes, + context.DeadlineExceeded, + context.Canceled, +) + +func readRawFileInfo(ctx context.Context, disk StorageAPI, bucket, object string, readData bool) (RawFileInfo, error) { + rf, err := disk.ReadXL(ctx, bucket, object, readData) + + if err != nil && !IsErr(err, readRawFileInfoErrs...) { + logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", + disk.String(), bucket, object, err), + disk.String()) + } + return rf, err +} + +func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVers, allParts bool) (FileInfo, error) { + var xl xlMetaV2 + if err := xl.LoadOrConvert(ri.Buf); err != nil { + return FileInfo{}, err } + fi, err := xl.ToFileInfo(bucket, object, "", inclFreeVers, allParts) + if err != nil { + return FileInfo{}, err + } + + if !fi.IsValid() { + return FileInfo{}, errCorruptedFormat + } + + versionID := fi.VersionID + if versionID == "" { + versionID = nullVersionID + } + + fileInfo, err := xl.ToFileInfo(bucket, object, versionID, inclFreeVers, allParts) + if err != nil { + return FileInfo{}, err + } + + if readData { + fileInfo.Data = xl.data.find(versionID) + } + + fileInfo.DiskMTime = ri.DiskMTime + return fileInfo, nil +} + +func readAllRawFileInfo(ctx context.Context, disks []StorageAPI, bucket, object string, readData bool) ([]RawFileInfo, []error) { + rawFileInfos := make([]RawFileInfo, len(disks)) g := errgroup.WithNErrs(len(disks)) - // Read `xl.meta` in parallel across disks. for index := range disks { index := index g.Go(func() (err error) { if disks[index] == nil { return errDiskNotFound } - rf, err := disks[index].ReadXL(ctx, bucket, object, readData) + rf, err := readRawFileInfo(ctx, disks[index], bucket, object, readData) if err != nil { return err } - if !readData { - // Save the buffer so we can reuse it. - v2bufs[index] = rf.Buf - } - - var xl xlMetaV2 - if err = xl.LoadOrConvert(rf.Buf); err != nil { - return err - } - metadataArray[index] = &xl - metaFileInfos[index] = FileInfo{ - DiskMTime: rf.DiskMTime, - } + rawFileInfos[index] = rf return nil }, index) } - ignoredErrs := []error{ - errFileNotFound, - errFileNameTooLong, - errVolumeNotFound, - errFileVersionNotFound, - io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors - io.EOF, // some times we would read without locks, ignore these errors - msgp.ErrShortBytes, - context.DeadlineExceeded, - context.Canceled, - } - ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...) + return rawFileInfos, g.Wait() +} - errs := g.Wait() - for index, err := range errs { - if err == nil { +func pickLatestQuorumFilesInfo(ctx context.Context, rawFileInfos []RawFileInfo, errs []error, bucket, object string, readData, inclFreeVers, allParts bool) ([]FileInfo, []error) { + metadataArray := make([]*xlMetaV2, len(rawFileInfos)) + metaFileInfos := make([]FileInfo, len(rawFileInfos)) + metadataShallowVersions := make([][]xlMetaV2ShallowVersion, len(rawFileInfos)) + var v2bufs [][]byte + if !readData { + v2bufs = make([][]byte, len(rawFileInfos)) + } + + // Read `xl.meta` in parallel across disks. + for index := range rawFileInfos { + rf := rawFileInfos[index] + if rf.Buf == nil { continue } - if !IsErr(err, ignoredErrs...) { - logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", - disks[index], bucket, object, err), - disks[index].String()) + if !readData { + // Save the buffer so we can reuse it. + v2bufs[index] = rf.Buf + } + + var xl xlMetaV2 + if err := xl.LoadOrConvert(rf.Buf); err != nil { + errs[index] = err + continue + } + metadataArray[index] = &xl + metaFileInfos[index] = FileInfo{ + DiskMTime: rf.DiskMTime, } } @@ -614,7 +659,7 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r } } - readQuorum := (len(disks) + 1) / 2 + readQuorum := (len(rawFileInfos) + 1) / 2 meta := &xlMetaV2{versions: mergeXLV2Versions(readQuorum, false, 1, metadataShallowVersions...)} lfi, err := meta.ToFileInfo(bucket, object, "", inclFreeVers, allParts) if err != nil { @@ -692,19 +737,177 @@ func shouldCheckForDangling(err error, errs []error, bucket string) bool { return false } -func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { - disks := er.getDisks() +func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData, inclFreeVers, allParts bool) ([]FileInfo, []error) { + rawFileInfos, errs := readAllRawFileInfo(ctx, disks, bucket, object, readData) + return pickLatestQuorumFilesInfo(ctx, rawFileInfos, errs, bucket, object, readData, inclFreeVers, allParts) +} - var errs []error +func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (FileInfo, []FileInfo, []StorageAPI, error) { + var mu sync.Mutex - // Read metadata associated with the object from all disks. - if opts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData, false) - } else { - metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions, true) + rawArr := make([]RawFileInfo, er.setDriveCount) + metaArr := make([]FileInfo, er.setDriveCount) + errs := make([]error, er.setDriveCount) + for i := range errs { + errs[i] = errDiskOngoingReq + } + + done := make(chan bool, er.setDriveCount) + disks := er.getDisks() + + mrfCheck := make(chan FileInfo) + defer close(mrfCheck) + + ropts := ReadOptions{ + ReadData: readData, + Healing: false, + } + + // Ask for all disks first; + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + wg := sync.WaitGroup{} + for i, disk := range disks { + if disk == nil { + done <- false + continue + } + wg.Add(1) + go func(i int, disk StorageAPI) { + defer wg.Done() + var fi FileInfo + var err error + if opts.VersionID != "" { + // Read a specific version ID + fi, err = readFileInfo(ctx, disk, bucket, object, opts.VersionID, ropts) + mu.Lock() + metaArr[i], errs[i] = fi, err + mu.Unlock() + } else { + // Read the latest version + ri, err := readRawFileInfo(ctx, disk, bucket, object, readData) + mu.Lock() + rawArr[i], errs[i] = ri, err + mu.Unlock() + if err == nil { + fi, err = fileInfoFromRaw(ri, bucket, object, readData, opts.InclFreeVersions, true) + mu.Lock() + metaArr[i], errs[i] = fi, err + mu.Unlock() + } + } + done <- err == nil + }(i, disk) + } + + wg.Wait() + close(done) + + fi, ok := <-mrfCheck + if !ok { + return + } + if fi.Deleted { + return + } + // if one of the disk is offline, return right here no need + // to attempt a heal on the object. + if countErrs(errs, errDiskNotFound) > 0 { + return + } + var missingBlocks int + for i := range errs { + if errors.Is(errs[i], errFileNotFound) { + missingBlocks++ + } + } + // if missing metadata can be reconstructed, attempt to reconstruct. + // additionally do not heal delete markers inline, let them be + // healed upon regular heal process. + if missingBlocks > 0 && missingBlocks < fi.Erasure.DataBlocks { + globalMRFState.addPartialOp(partialOperation{ + bucket: fi.Volume, + object: fi.Name, + versionID: fi.VersionID, + queued: time.Now(), + setIndex: er.setIndex, + poolIndex: er.poolIndex, + }) + } + + return + }() + + validResp := 0 + totalResp := 0 + + // minDisks value is only to reduce the number of calls + // to the disks; this value is not accurate because we do + // not know the storage class of the object yet + minDisks := 0 + if p := globalStorageClass.GetParityForSC(""); p > -1 { + minDisks = er.setDriveCount - p + } else { + minDisks = er.setDriveCount - er.defaultParityCount + } + + calcQuorum := func() (FileInfo, []FileInfo, []StorageAPI, time.Time, string, error) { + readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) + if err != nil { + return FileInfo{}, nil, nil, time.Time{}, "", err + } + err = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) + if err != nil { + return FileInfo{}, nil, nil, time.Time{}, "", err + } + onlineDisks, modTime, etag := listOnlineDisks(disks, metaArr, errs, readQuorum) + fi, err := pickValidFileInfo(ctx, metaArr, modTime, etag, readQuorum) + if err != nil { + return FileInfo{}, nil, nil, time.Time{}, "", err + } + + onlineMeta := make([]FileInfo, len(metaArr)) + copy(onlineMeta, metaArr) + + return fi, onlineMeta, onlineDisks, modTime, etag, nil + } + + var ( + modTime time.Time + etag string + fi FileInfo + onlineMeta []FileInfo + onlineDisks []StorageAPI + err error + ) + + for success := range done { + totalResp++ + if success { + validResp++ + } + if totalResp < er.setDriveCount { + if !opts.FastGetObjInfo { + continue + } + if validResp < minDisks { + continue + } + } + if opts.VersionID == "" && totalResp == er.setDriveCount { + // Disks cannot agree about the latest version, pass this to a more advanced code + metaArr, errs = pickLatestQuorumFilesInfo(ctx, rawArr, errs, bucket, object, readData, opts.InclFreeVersions, true) + } + mu.Lock() + fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum() + mu.Unlock() + if err == nil && fi.InlineData() { + break + } } - readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) if err != nil { if shouldCheckForDangling(err, errs, bucket) { _, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts) @@ -715,25 +918,6 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s return fi, nil, nil, toObjectErr(err, bucket, object) } - if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { - if shouldCheckForDangling(err, errs, bucket) { - _, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts) - if derr != nil { - reducedErr = derr - } - } - return fi, nil, nil, toObjectErr(reducedErr, bucket, object) - } - - // List all online disks. - onlineDisks, modTime, etag := listOnlineDisks(disks, metaArr, errs, readQuorum) - - // Pick latest valid metadata. - fi, err = pickValidFileInfo(ctx, metaArr, modTime, etag, readQuorum) - if err != nil { - return fi, nil, nil, err - } - if !fi.Deleted && len(fi.Erasure.Distribution) != len(onlineDisks) { err := fmt.Errorf("unexpected file distribution (%v) from online disks (%v), looks like backend disks have been manually modified refusing to heal %s/%s(%s)", fi.Erasure.Distribution, onlineDisks, bucket, object, opts.VersionID) @@ -741,26 +925,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s return fi, nil, nil, toObjectErr(err, bucket, object, opts.VersionID) } - filterOnlineDisksInplace(fi, metaArr, onlineDisks) - - // if one of the disk is offline, return right here no need - // to attempt a heal on the object. - if countErrs(errs, errDiskNotFound) > 0 { - return fi, metaArr, onlineDisks, nil - } - - var missingBlocks int - for i, err := range errs { - if err != nil && errors.Is(err, errFileNotFound) { - missingBlocks++ - continue - } - + filterOnlineDisksInplace(fi, onlineMeta, onlineDisks) + for i := range onlineMeta { // verify metadata is valid, it has similar erasure info // as well as common modtime, if modtime is not possible // verify if it has common "etag" atleast. - if metaArr[i].IsValid() && metaArr[i].Erasure.Equal(fi.Erasure) { - ok := metaArr[i].ModTime.Equal(modTime) + if onlineMeta[i].IsValid() && onlineMeta[i].Erasure.Equal(fi.Erasure) { + ok := onlineMeta[i].ModTime.Equal(modTime) if modTime.IsZero() || modTime.Equal(timeSentinel) { ok = etag != "" && etag == fi.Metadata["etag"] } @@ -769,26 +940,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } } // in all other cases metadata is corrupt, do not read from it. - metaArr[i] = FileInfo{} + onlineMeta[i] = FileInfo{} onlineDisks[i] = nil - missingBlocks++ } - // if missing metadata can be reconstructed, attempt to reconstruct. - // additionally do not heal delete markers inline, let them be - // healed upon regular heal process. - if !fi.Deleted && missingBlocks > 0 && missingBlocks < readQuorum { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - versionID: fi.VersionID, - queued: time.Now(), - setIndex: er.setIndex, - poolIndex: er.poolIndex, - }) - } + mrfCheck <- fi.ShallowCopy() - return fi, metaArr, onlineDisks, nil + return fi, onlineMeta, onlineDisks, nil } // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. @@ -1637,6 +1795,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string auditObjectErasureSet(ctx, object, &er) if opts.DeletePrefix { + if globalCacheConfig.Enabled() { + return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) + } return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object) } diff --git a/cmd/globals.go b/cmd/globals.go index e48c2a9bb..eb173efc3 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -38,6 +38,7 @@ import ( "github.com/dustin/go-humanize" "github.com/minio/minio/internal/auth" + "github.com/minio/minio/internal/config/cache" "github.com/minio/minio/internal/config/callhome" "github.com/minio/minio/internal/config/compress" "github.com/minio/minio/internal/config/dns" @@ -242,6 +243,9 @@ var ( // The global callhome config globalCallhomeConfig callhome.Config + // The global cache config + globalCacheConfig cache.Config + // Global server's network statistics globalConnStats = newConnStats() diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index ae9949a89..e12c213c5 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -20,6 +20,7 @@ package cmd import ( "io" "math" + "net/http" "time" "github.com/dustin/go-humanize" @@ -149,6 +150,9 @@ type ObjectInfo struct { // Date and time at which the object is no longer able to be cached Expires time.Time + // Cache-Control - Specifies caching behavior along the request/reply chain + CacheControl string + // Specify object storage class StorageClass string @@ -200,6 +204,15 @@ type ObjectInfo struct { ParityBlocks int } +// ExpiresStr returns a stringified version of Expires header in http.TimeFormat +func (o ObjectInfo) ExpiresStr() string { + var expires string + if !o.Expires.IsZero() { + expires = o.Expires.UTC().Format(http.TimeFormat) + } + return expires +} + // ArchiveInfo returns any saved zip archive meta information. // It will be decrypted if needed. func (o *ObjectInfo) ArchiveInfo() []byte { diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 63305fab5..8b17e82df 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -109,6 +109,8 @@ type ObjectOptions struct { MetadataChg bool // is true if it is a metadata update operation. EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject. + + FastGetObjInfo bool // Only for S3 Head/Get Object calls for now } // ExpirationOptions represents object options for object expiration at objectLayer. diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 043c3b76a..42279ae66 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -64,6 +64,7 @@ const ( minioMetaTmpBucket = minioMetaBucket + "/tmp" // MinIO tmp meta prefix for deleted objects. minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash" + // DNS separator (period), used for bucket name validation. dnsDelimiter = "." // On compressed files bigger than this; @@ -697,7 +698,6 @@ type GetObjectReader struct { io.Reader ObjInfo ObjectInfo cleanUpFns []func() - opts ObjectOptions once sync.Once } @@ -722,7 +722,6 @@ func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions ObjInfo: oi, Reader: r, cleanUpFns: cleanupFns, - opts: opts, }, nil } @@ -859,7 +858,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( ObjInfo: oi, Reader: decReader, cleanUpFns: cFns, - opts: opts, } return r, nil } @@ -913,7 +911,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( ObjInfo: oi, Reader: decReader, cleanUpFns: cFns, - opts: opts, } return r, nil } @@ -928,7 +925,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( ObjInfo: oi, Reader: inputReader, cleanUpFns: cFns, - opts: opts, } return r, nil } diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index c53287f3e..8ff232f7e 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -202,6 +202,31 @@ func checkPreconditionsPUT(ctx context.Context, w http.ResponseWriter, r *http.R return false } +// Headers to be set of object content is not going to be written to the client. +func writeHeadersPrecondition(w http.ResponseWriter, objInfo ObjectInfo) { + // set common headers + setCommonHeaders(w) + + // set object-related metadata headers + w.Header().Set(xhttp.LastModified, objInfo.ModTime.UTC().Format(http.TimeFormat)) + + if objInfo.ETag != "" { + w.Header()[xhttp.ETag] = []string{"\"" + objInfo.ETag + "\""} + } + + if objInfo.VersionID != "" { + w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID} + } + + if !objInfo.Expires.IsZero() { + w.Header().Set(xhttp.Expires, objInfo.Expires.UTC().Format(http.TimeFormat)) + } + + if objInfo.CacheControl != "" { + w.Header().Set(xhttp.CacheControl, objInfo.CacheControl) + } +} + // Validates the preconditions. Returns true if GET/HEAD operation should not proceed. // Preconditions supported are: // @@ -221,19 +246,6 @@ func checkPreconditions(ctx context.Context, w http.ResponseWriter, r *http.Requ return false } - // Headers to be set of object content is not going to be written to the client. - writeHeaders := func() { - // set common headers - setCommonHeaders(w) - - // set object-related metadata headers - w.Header().Set(xhttp.LastModified, objInfo.ModTime.UTC().Format(http.TimeFormat)) - - if objInfo.ETag != "" { - w.Header()[xhttp.ETag] = []string{"\"" + objInfo.ETag + "\""} - } - } - // Check if the part number is correct. if opts.PartNumber > 1 && opts.PartNumber > len(objInfo.Parts) { // According to S3 we don't need to set any object information here. @@ -241,6 +253,18 @@ func checkPreconditions(ctx context.Context, w http.ResponseWriter, r *http.Requ return true } + // If-None-Match : Return the object only if its entity tag (ETag) is different from the + // one specified otherwise, return a 304 (not modified). + ifNoneMatchETagHeader := r.Header.Get(xhttp.IfNoneMatch) + if ifNoneMatchETagHeader != "" { + if isETagEqual(objInfo.ETag, ifNoneMatchETagHeader) { + // If the object ETag matches with the specified ETag. + writeHeadersPrecondition(w, objInfo) + w.WriteHeader(http.StatusNotModified) + return true + } + } + // If-Modified-Since : Return the object only if it has been modified since the specified time, // otherwise return a 304 (not modified). ifModifiedSinceHeader := r.Header.Get(xhttp.IfModifiedSince) @@ -248,50 +272,39 @@ func checkPreconditions(ctx context.Context, w http.ResponseWriter, r *http.Requ if givenTime, err := amztime.ParseHeader(ifModifiedSinceHeader); err == nil { if !ifModifiedSince(objInfo.ModTime, givenTime) { // If the object is not modified since the specified time. - writeHeaders() + writeHeadersPrecondition(w, objInfo) w.WriteHeader(http.StatusNotModified) return true } } } - // If-Unmodified-Since : Return the object only if it has not been modified since the specified - // time, otherwise return a 412 (precondition failed). - ifUnmodifiedSinceHeader := r.Header.Get(xhttp.IfUnmodifiedSince) - if ifUnmodifiedSinceHeader != "" { - if givenTime, err := amztime.ParseHeader(ifUnmodifiedSinceHeader); err == nil { - if ifModifiedSince(objInfo.ModTime, givenTime) { - // If the object is modified since the specified time. - writeHeaders() - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL) - return true - } - } - } - // If-Match : Return the object only if its entity tag (ETag) is the same as the one specified; // otherwise return a 412 (precondition failed). ifMatchETagHeader := r.Header.Get(xhttp.IfMatch) if ifMatchETagHeader != "" { if !isETagEqual(objInfo.ETag, ifMatchETagHeader) { // If the object ETag does not match with the specified ETag. - writeHeaders() + writeHeadersPrecondition(w, objInfo) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL) return true } } - // If-None-Match : Return the object only if its entity tag (ETag) is different from the - // one specified otherwise, return a 304 (not modified). - ifNoneMatchETagHeader := r.Header.Get(xhttp.IfNoneMatch) - if ifNoneMatchETagHeader != "" { - if isETagEqual(objInfo.ETag, ifNoneMatchETagHeader) { - // If the object ETag matches with the specified ETag. - writeHeaders() - w.WriteHeader(http.StatusNotModified) - return true + // If-Unmodified-Since : Return the object only if it has not been modified since the specified + // time, otherwise return a 412 (precondition failed). + ifUnmodifiedSinceHeader := r.Header.Get(xhttp.IfUnmodifiedSince) + if ifUnmodifiedSinceHeader != "" { + if givenTime, err := amztime.ParseHeader(ifUnmodifiedSinceHeader); err == nil { + if ifModifiedSince(objInfo.ModTime, givenTime) { + // If the object is modified since the specified time. + writeHeadersPrecondition(w, objInfo) + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL) + return true + } } } + // Object content should be written to http.ResponseWriter return false } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 181382f7c..1ec41a4f7 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -19,6 +19,7 @@ package cmd import ( "archive/tar" + "bytes" "context" "encoding/hex" "encoding/xml" @@ -35,6 +36,7 @@ import ( "strings" "sync" "time" + "unicode" "github.com/google/uuid" "github.com/klauspost/compress/gzhttp" @@ -48,6 +50,7 @@ import ( "github.com/minio/minio/internal/bucket/lifecycle" objectlock "github.com/minio/minio/internal/bucket/object/lock" "github.com/minio/minio/internal/bucket/replication" + "github.com/minio/minio/internal/config/cache" "github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/crypto" @@ -62,6 +65,7 @@ import ( "github.com/minio/minio/internal/s3select" "github.com/minio/mux" "github.com/minio/pkg/v2/policy" + "github.com/valyala/bytebufferpool" ) // supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request. @@ -379,6 +383,92 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj } } + cachedResult := globalCacheConfig.Enabled() && opts.VersionID == "" + + var update bool + if cachedResult { + rc := &cache.CondCheck{} + h := r.Header.Clone() + if opts.PartNumber > 0 { + h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber)) + } + rc.Init(bucket, object, h) + + ci, err := globalCacheConfig.Get(rc) + if ci != nil { + tgs, ok := ci.Metadata[xhttp.AmzObjectTagging] + if ok { + // Set this such that authorization policies can be applied on the object tags. + r.Header.Set(xhttp.AmzObjectTagging, tgs) + } + + if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone { + writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error)) + return + } + + okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent || + ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified) + if okSt { + ci.WriteHeaders(w, func() { + // set common headers + setCommonHeaders(w) + }, func() { + okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent) + if okSt && len(ci.Data) > 0 { + for k, v := range ci.Metadata { + w.Header().Set(k, v) + } + + if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") { + w.Header()[xhttp.AmzMpPartsCount] = []string{ + strings.TrimLeftFunc(ci.ETag, func(r rune) bool { + return !unicode.IsNumber(r) + }), + } + } + + // For providing ranged content + start, rangeLen, err := rs.GetOffsetLength(ci.Size) + if err != nil { + start, rangeLen = 0, ci.Size + } + + // Set content length. + w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10)) + if rs != nil { + contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size) + w.Header().Set(xhttp.ContentRange, contentRange) + } + + io.Copy(w, bytes.NewReader(ci.Data)) + return + } + if ci.StatusCode == http.StatusPreconditionFailed { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL) + return + } else if ci.StatusCode == http.StatusNotModified { + w.WriteHeader(ci.StatusCode) + return + } + + // We did not satisfy any requirement from the cache, update the cache. + // this basically means that we do not have the Data for the object + // cached yet + update = true + }) + if !update { + // No update is needed means we have written already to the client just return here. + return + } + } + } + + if errors.Is(err, cache.ErrKeyMissing) { + update = true + } + } + // Validate pre-conditions if any. opts.CheckPrecondFn = func(oi ObjectInfo) bool { if _, err := DecryptObjectInfo(&oi, r); err != nil { @@ -389,6 +479,8 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj return checkPreconditions(ctx, w, r, oi, opts) } + opts.FastGetObjInfo = true + var proxy proxyResult gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, opts) if err != nil { @@ -487,7 +579,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone) // Set encryption response headers - if kind, isEncrypted := crypto.IsEncrypted(objInfo.UserDefined); isEncrypted { switch kind { case crypto.S3: @@ -510,6 +601,39 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber)) } + var buf *bytebufferpool.ByteBuffer + if update { + if globalCacheConfig.MatchesSize(objInfo.Size) { + buf = bytebufferpool.Get() + defer bytebufferpool.Put(buf) + } + defer func() { + var data []byte + if buf != nil { + data = buf.Bytes() + } + + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: objInfo.ETag, + ModTime: objInfo.ModTime, + Expires: objInfo.ExpiresStr(), + CacheControl: objInfo.CacheControl, + Metadata: cleanReservedKeys(objInfo.UserDefined), + Range: rangeHeader, + PartNumber: opts.PartNumber, + Size: asize, + Data: data, + }) + }() + } + if err = setObjectHeaders(w, objInfo, rs, opts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -522,8 +646,14 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj setHeadGetRespHeaders(w, r.Form) + var iw io.Writer + iw = w + if buf != nil { + iw = io.MultiWriter(w, buf) + } + statusCodeWritten := false - httpWriter := xioutil.WriteOnClose(w) + httpWriter := xioutil.WriteOnClose(iw) if rs != nil || opts.PartNumber > 0 { statusCodeWritten = true w.WriteHeader(http.StatusPartialContent) @@ -644,6 +774,104 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob // Get request range. var rs *HTTPRangeSpec rangeHeader := r.Header.Get(xhttp.Range) + if rangeHeader != "" { + rs, _ = parseRequestRangeSpec(rangeHeader) + } + + if rangeHeader != "" { + // Both 'Range' and 'partNumber' cannot be specified at the same time + if opts.PartNumber > 0 { + writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber)) + return + } + + if rs, err = parseRequestRangeSpec(rangeHeader); err != nil { + // Handle only errInvalidRange. Ignore other + // parse error and treat it as regular Get + // request like Amazon S3. + if errors.Is(err, errInvalidRange) { + writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange)) + return + } + } + } + + cachedResult := globalCacheConfig.Enabled() && opts.VersionID == "" + + var update bool + if cachedResult { + rc := &cache.CondCheck{} + h := r.Header.Clone() + if opts.PartNumber > 0 { + h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber)) + } + rc.Init(bucket, object, h) + + ci, err := globalCacheConfig.Get(rc) + if ci != nil { + tgs, ok := ci.Metadata[xhttp.AmzObjectTagging] + if ok { + // Set this such that authorization policies can be applied on the object tags. + r.Header.Set(xhttp.AmzObjectTagging, tgs) + } + + if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone { + writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error)) + return + } + + okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent || + ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified) + if okSt { + ci.WriteHeaders(w, func() { + // set common headers + setCommonHeaders(w) + }, func() { + okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent) + if okSt { + for k, v := range ci.Metadata { + w.Header().Set(k, v) + } + + // For providing ranged content + start, rangeLen, err := rs.GetOffsetLength(ci.Size) + if err != nil { + start, rangeLen = 0, ci.Size + } + + if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") { + w.Header()[xhttp.AmzMpPartsCount] = []string{ + strings.TrimLeftFunc(ci.ETag, func(r rune) bool { + return !unicode.IsNumber(r) + }), + } + } + + // Set content length for the range. + w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10)) + if rs != nil { + contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size) + w.Header().Set(xhttp.ContentRange, contentRange) + } + + return + } + if ci.StatusCode == http.StatusPreconditionFailed { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL) + return + } + + w.WriteHeader(ci.StatusCode) + }) + return + } + } + if errors.Is(err, cache.ErrKeyMissing) { + update = true + } + } + + opts.FastGetObjInfo = true objInfo, err := getObjectInfo(ctx, bucket, object, opts) var proxy proxyResult @@ -651,9 +879,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob // proxy HEAD to replication target if active-active replication configured on bucket proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - if rangeHeader != "" { - rs, _ = parseRequestRangeSpec(rangeHeader) - } var oi ObjectInfo oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts) if proxy.Proxy { @@ -737,29 +962,29 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob return } + if update { + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + defer globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: objInfo.ETag, + ModTime: objInfo.ModTime, + Expires: objInfo.ExpiresStr(), + CacheControl: objInfo.CacheControl, + Size: asize, + Metadata: cleanReservedKeys(objInfo.UserDefined), + }) + } + // Validate pre-conditions if any. if checkPreconditions(ctx, w, r, objInfo, opts) { return } - if rangeHeader != "" { - // Both 'Range' and 'partNumber' cannot be specified at the same time - if opts.PartNumber > 0 { - writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber)) - return - } - - if rs, err = parseRequestRangeSpec(rangeHeader); err != nil { - // Handle only errInvalidRange. Ignore other - // parse error and treat it as regular Get - // request like Amazon S3. - if errors.Is(err, errInvalidRange) { - writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange)) - return - } - } - } - // Set encryption response headers switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind { case crypto.S3: @@ -1531,6 +1756,22 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re Host: handlers.GetSourceIP(r), }) + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + defer globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: objInfo.ETag, + ModTime: objInfo.ModTime, + Expires: objInfo.ExpiresStr(), + CacheControl: objInfo.CacheControl, + Size: asize, + Metadata: cleanReservedKeys(objInfo.UserDefined), + }) + if !remoteCallRequired && !globalTierConfigMgr.Empty() { // Schedule object for immediate transition if eligible. objInfo.ETag = origETag @@ -1633,7 +1874,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req var ( md5hex = clientETag.String() sha256hex = "" - reader io.Reader = r.Body + rd io.Reader = r.Body s3Err APIErrorCode putObject = objectAPI.PutObject ) @@ -1647,14 +1888,14 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req switch rAuthType { case authTypeStreamingSigned, authTypeStreamingSignedTrailer: // Initialize stream signature verifier. - reader, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer) + rd, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer) if s3Err != ErrNone { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) return } case authTypeStreamingUnsignedTrailer: // Initialize stream chunked reader with optional trailers. - reader, s3Err = newUnsignedV4ChunkedReader(r, true) + rd, s3Err = newUnsignedV4ChunkedReader(r, true) if s3Err != ErrNone { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) return @@ -1702,6 +1943,18 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req AutoEncrypt: globalAutoEncryption, }) + var buf *bytebufferpool.ByteBuffer + if globalCacheConfig.MatchesSize(size) { + buf = bytebufferpool.Get() + defer bytebufferpool.Put(buf) + } + + var reader io.Reader + reader = rd + if buf != nil { + reader = io.TeeReader(rd, buf) + } + actualSize := size var idxCb func() []byte if isCompressible(r.Header, object) && size > minCompressibleSize { @@ -1904,6 +2157,30 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req setPutObjHeaders(w, objInfo, false) + defer func() { + var data []byte + if buf != nil { + data = buf.Bytes() + } + + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: objInfo.ETag, + ModTime: objInfo.ModTime, + Expires: objInfo.ExpiresStr(), + CacheControl: objInfo.CacheControl, + Size: asize, + Metadata: cleanReservedKeys(objInfo.UserDefined), + Data: data, + }) + }() + // Notify object created event. evt := eventArgs{ EventName: event.ObjectCreatedPut, @@ -2234,10 +2511,28 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h return err } + objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType) } + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + defer globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: objInfo.ETag, + ModTime: objInfo.ModTime, + Expires: objInfo.ExpiresStr(), + CacheControl: objInfo.CacheControl, + Size: asize, + Metadata: cleanReservedKeys(objInfo.UserDefined), + }) + // Notify object created event. evt := eventArgs{ EventName: event.ObjectCreatedPut, @@ -2413,6 +2708,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } + defer globalCacheConfig.Delete(bucket, object) + setPutObjHeaders(w, objInfo, true) writeSuccessNoContent(w) diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 3dd05dcb4..9e52c828f 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -35,6 +35,7 @@ import ( sse "github.com/minio/minio/internal/bucket/encryption" objectlock "github.com/minio/minio/internal/bucket/object/lock" "github.com/minio/minio/internal/bucket/replication" + "github.com/minio/minio/internal/config/cache" "github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/crypto" @@ -1011,6 +1012,22 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } sendEvent(evt) + asize, err := objInfo.GetActualSize() + if err != nil { + asize = objInfo.Size + } + + defer globalCacheConfig.Set(&cache.ObjectInfo{ + Key: objInfo.Name, + Bucket: objInfo.Bucket, + ETag: objInfo.ETag, + ModTime: objInfo.ModTime, + Expires: objInfo.ExpiresStr(), + CacheControl: objInfo.CacheControl, + Size: asize, + Metadata: cleanReservedKeys(objInfo.UserDefined), + }) + if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold { evt.EventName = event.ObjectManyVersions sendEvent(evt) diff --git a/cmd/server-main.go b/cmd/server-main.go index f9735e61c..8bc9cf586 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -840,7 +840,9 @@ func serverMain(ctx *cli.Context) { // Initialize data scanner. bootstrapTrace("initDataScanner", func() { - initDataScanner(GlobalContext, newObject) + if v := env.Get("_MINIO_SCANNER", config.EnableOn); v == config.EnableOn { + initDataScanner(GlobalContext, newObject) + } }) // Initialize background replication diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index d9cf9ff6e..e235d9db4 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -239,6 +239,16 @@ type FileInfo struct { Versioned bool `msg:"vs"` } +// ShallowCopy - copies minimal information for READ MRF checks. +func (fi FileInfo) ShallowCopy() (n FileInfo) { + n.Volume = fi.Volume + n.Name = fi.Name + n.VersionID = fi.VersionID + n.Deleted = fi.Deleted + n.Erasure = fi.Erasure + return +} + // WriteQuorum returns expected write quorum for this FileInfo func (fi FileInfo) WriteQuorum(dquorum int) int { if fi.Deleted { diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index aa008b511..4f66ae5ae 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -51,6 +51,8 @@ var errDiskNotDir = StorageErr("drive is not directory or mountpoint") // errDiskNotFound - cannot find the underlying configured disk anymore. var errDiskNotFound = StorageErr("drive not found") +var errDiskOngoingReq = StorageErr("drive still did not complete the request") + // errFaultyRemoteDisk - remote disk is faulty. var errFaultyRemoteDisk = StorageErr("remote drive is faulty") diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 66b8be8ee..25047478a 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -47,6 +47,7 @@ const ( storageRESTMethodStatInfoFile = "/statfile" storageRESTMethodReadMultiple = "/readmultiple" storageRESTMethodCleanAbandoned = "/cleanabandoned" + storageRESTMethodLinkXL = "/linkxl" ) const ( diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 5ade78e6e..c78deb86f 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1100,7 +1100,7 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, force bool) error { targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID) if recursive { - if err := renameAll(filePath, targetPath, s.drivePath); err != nil { + if err := renameAll(filePath, targetPath, pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket)); err != nil { return err } } else { @@ -1127,8 +1127,19 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F }) } + volumeDir, err := s.getVolDir(volume) + if err != nil { + return err + } + + // Validate file path length, before reading. + filePath := pathJoin(volumeDir, path) + if err = checkPathLength(filePath); err != nil { + return err + } + var legacyJSON bool - buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) + buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile), false) if err != nil { if !errors.Is(err, errFileNotFound) { return err @@ -1161,11 +1172,6 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return errFileNotFound } - volumeDir, err := s.getVolDir(volume) - if err != nil { - return err - } - if legacyJSON { // Delete the meta file, if there are no more versions the // top level parent is automatically removed. @@ -1217,12 +1223,6 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) } - // No more versions, this is the last version purge everything. - filePath := pathJoin(volumeDir, path) - if err = checkPathLength(filePath); err != nil { - return err - } - return s.deleteFile(volumeDir, filePath, true, false) } @@ -1371,10 +1371,15 @@ func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) { } func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath string, readData bool) (buf []byte, dmTime time.Time, err error) { + if filePath == "" { + return nil, dmTime, errFileNotFound + } + + xlPath := pathJoin(filePath, xlStorageFormatFile) if readData { - buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile), true) + buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, xlPath, true) } else { - buf, dmTime, err = s.readMetadataWithDMTime(ctx, pathJoin(filePath, xlStorageFormatFile)) + buf, dmTime, err = s.readMetadataWithDMTime(ctx, xlPath) if err != nil { if osIsNotExist(err) { if !skipAccessChecks(volume) { @@ -1387,18 +1392,21 @@ func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath str } } - if err != nil { - if err == errFileNotFound { - buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true) - if err != nil { - return nil, time.Time{}, err - } - } else { + s.RLock() + legacy := s.formatLegacy + s.RUnlock() + + if err != nil && errors.Is(err, errFileNotFound) && legacy { + buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true) + if err != nil { return nil, time.Time{}, err } } if len(buf) == 0 { + if err != nil { + return nil, time.Time{}, err + } return nil, time.Time{}, errFileNotFound } @@ -1440,6 +1448,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str if err != nil { return fi, err } + // Validate file path length, before reading. filePath := pathJoin(volumeDir, path) if err = checkPathLength(filePath); err != nil { @@ -1532,6 +1541,10 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str } func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, filePath string, sync bool) (buf []byte, dmTime time.Time, err error) { + if filePath == "" { + return nil, dmTime, errFileNotFound + } + if contextCanceled(ctx) { return nil, time.Time{}, ctx.Err() } @@ -1593,7 +1606,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f // Get size for precise allocation. stat, err := f.Stat() if err != nil { - buf, err = io.ReadAll(diskHealthReader(ctx, r)) + buf, err = io.ReadAll(r) return buf, dmTime, osErrToFileErr(err) } if stat.IsDir() { @@ -1612,7 +1625,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f dr.SmallFile = sz <= xioutil.BlockSizeSmall*2 } // Read file... - _, err = io.ReadFull(diskHealthReader(ctx, r), buf) + _, err = io.ReadFull(r, buf) return buf, stat.ModTime().UTC(), osErrToFileErr(err) } @@ -1641,7 +1654,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu return nil, err } - buf, _, err = s.readAllData(ctx, volume, volumeDir, filePath, true) + buf, _, err = s.readAllData(ctx, volume, volumeDir, filePath, false) return buf, err } @@ -2225,10 +2238,10 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, dele func skipAccessChecks(volume string) (ok bool) { for _, prefix := range []string{ - minioMetaBucket, - minioMetaMultipartBucket, minioMetaTmpDeletedBucket, minioMetaTmpBucket, + minioMetaMultipartBucket, + minioMetaBucket, } { if strings.HasPrefix(volume, prefix) { return true @@ -2408,7 +2421,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f } // legacy data dir means its old content, honor system umask. - if err = mkdirAll(legacyDataPath, 0o777, s.drivePath); err != nil { + if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil { // any failed mkdir-calls delete them. s.deleteFile(dstVolumeDir, legacyDataPath, true, false) return 0, osErrToFileErr(err) @@ -2526,7 +2539,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f // on a versioned bucket. s.moveToTrash(legacyDataPath, true, false) } - if err = renameAll(srcDataPath, dstDataPath, s.drivePath); err != nil { + if err = renameAll(srcDataPath, dstDataPath, dstVolumeDir); err != nil { if legacyPreserved { // Any failed rename calls un-roll previous transaction. s.deleteFile(dstVolumeDir, legacyDataPath, true, false) @@ -2537,7 +2550,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f } // Commit meta-file - if err = renameAll(srcFilePath, dstFilePath, s.drivePath); err != nil { + if err = renameAll(srcFilePath, dstFilePath, dstVolumeDir); err != nil { if legacyPreserved { // Any failed rename calls un-roll previous transaction. s.deleteFile(dstVolumeDir, legacyDataPath, true, false) @@ -2647,7 +2660,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum } } - if err = renameAll(srcFilePath, dstFilePath, s.drivePath); err != nil { + if err = renameAll(srcFilePath, dstFilePath, dstVolumeDir); err != nil { if isSysErrNotEmpty(err) || isSysErrNotDir(err) { return errFileAccessDenied } diff --git a/docs/bucket/replication/setup_2site_existing_replication.sh b/docs/bucket/replication/setup_2site_existing_replication.sh index b07e976a9..fe977e7c5 100755 --- a/docs/bucket/replication/setup_2site_existing_replication.sh +++ b/docs/bucket/replication/setup_2site_existing_replication.sh @@ -84,7 +84,7 @@ remote_arn=$(./mc replicate ls sitea/bucket --json | jq -r .rule.Destination.Buc sleep 1 ./mc replicate resync start sitea/bucket/ --remote-bucket "${remote_arn}" -sleep 10s ## sleep for 10s idea is that we give 100ms per object. +sleep 30s ## sleep for 30s idea is that we give 300ms per object. ./mc ls -r --versions sitea/bucket >/tmp/sitea.txt ./mc ls -r --versions siteb/bucket >/tmp/siteb.txt diff --git a/internal/config/cache/cache.go b/internal/config/cache/cache.go new file mode 100644 index 000000000..602ed4218 --- /dev/null +++ b/internal/config/cache/cache.go @@ -0,0 +1,238 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cache + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/dustin/go-humanize" + "github.com/minio/minio/internal/config" + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/pkg/v2/env" + "github.com/tinylib/msgp/msgp" +) + +// Cache related keys +const ( + Enable = "enable" + Endpoint = "endpoint" + ObjectSize = "object_size" + + EnvEnable = "MINIO_CACHE_ENABLE" + EnvEndpoint = "MINIO_CACHE_ENDPOINT" + EnvObjectSize = "MINIO_CACHE_OBJECT_SIZE" +) + +// DefaultKVS - default KV config for cache settings +var DefaultKVS = config.KVS{ + config.KV{ + Key: Enable, + Value: "off", + }, + config.KV{ + Key: Endpoint, + Value: "", + }, + config.KV{ + Key: ObjectSize, + Value: "", + }, +} + +// Config represents the subnet related configuration +type Config struct { + // Flag indicating whether cache is enabled. + Enable bool `json:"enable"` + + // Endpoint for caching uses remote mcache server to + // store and retrieve pre-condition check entities such as + // Etag and ModTime of an object + version + Endpoint string `json:"endpoint"` + + // ObjectSize indicates the maximum object size below which + // data is cached and fetched remotely from DRAM. + ObjectSize int64 + + // Is the HTTP client used for communicating with mcache server + clnt *http.Client +} + +var configLock sync.RWMutex + +// Enabled - indicates if cache is enabled or not +func (c *Config) Enabled() bool { + return c.Enable && c.Endpoint != "" +} + +// MatchesSize verifies if input 'size' falls under cacheable threshold +func (c Config) MatchesSize(size int64) bool { + configLock.RLock() + defer configLock.RUnlock() + + return c.Enable && c.ObjectSize > 0 && size <= c.ObjectSize +} + +// Update updates new cache frequency +func (c *Config) Update(ncfg Config) { + configLock.Lock() + defer configLock.Unlock() + + c.Enable = ncfg.Enable + c.Endpoint = ncfg.Endpoint + c.ObjectSize = ncfg.ObjectSize + c.clnt = ncfg.clnt +} + +// cache related errors +var ( + ErrInvalidArgument = errors.New("invalid argument") + ErrKeyMissing = errors.New("key is missing") +) + +const ( + mcacheV1Check = "/_mcache/v1/check" + mcacheV1Update = "/_mcache/v1/update" + mcacheV1Delete = "/_mcache/v1/delete" +) + +// Get performs conditional check and returns the cached object info if any. +func (c Config) Get(r *CondCheck) (*ObjectInfo, error) { + configLock.RLock() + defer configLock.RUnlock() + + if !c.Enable { + return nil, nil + } + + if c.Endpoint == "" { + // Endpoint not set, make this a no-op + return nil, nil + } + + buf, err := r.MarshalMsg(nil) + if err != nil { + return nil, err + } + + // We do not want Get's to take so much time, anything + // beyond 250ms we should cut it, remote cache is too + // busy already. + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.Endpoint+mcacheV1Check, bytes.NewReader(buf)) + if err != nil { + return nil, err + } + + resp, err := c.clnt.Do(req) + if err != nil { + return nil, err + } + defer xhttp.DrainBody(resp.Body) + + switch resp.StatusCode { + case http.StatusNotFound: + return nil, ErrKeyMissing + case http.StatusOK: + co := &ObjectInfo{} + return co, co.DecodeMsg(msgp.NewReader(resp.Body)) + default: + return nil, ErrInvalidArgument + } +} + +// Set sets the cache object info +func (c Config) Set(ci *ObjectInfo) { + configLock.RLock() + defer configLock.RUnlock() + + if !c.Enable { + return + } + + if c.Endpoint == "" { + // Endpoint not set, make this a no-op + return + } + + buf, err := ci.MarshalMsg(nil) + if err != nil { + return + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPut, c.Endpoint+mcacheV1Update, bytes.NewReader(buf)) + if err != nil { + return + } + + resp, err := c.clnt.Do(req) + if err != nil { + return + } + defer xhttp.DrainBody(resp.Body) +} + +// Delete deletes remote cached content for object and its version. +func (c Config) Delete(bucket, key string) { + configLock.RLock() + defer configLock.RUnlock() + + if !c.Enable { + return + } + + if c.Endpoint == "" { + return + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodDelete, c.Endpoint+fmt.Sprintf("%s?bucket=%s&key=%s", mcacheV1Delete, bucket, key), nil) + if err != nil { + return + } + + resp, err := c.clnt.Do(req) + if err != nil { + return + } + defer xhttp.DrainBody(resp.Body) +} + +// LookupConfig - lookup config and override with valid environment settings if any. +func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err error) { + cfg.Enable = env.Get(EnvEnable, kvs.GetWithDefault(Enable, DefaultKVS)) == config.EnableOn + + if d := env.Get(EnvObjectSize, kvs.GetWithDefault(ObjectSize, DefaultKVS)); d != "" { + objectSize, err := humanize.ParseBytes(d) + if err != nil { + return cfg, err + } + cfg.ObjectSize = int64(objectSize) + } + + cfg.Endpoint = env.Get(EnvEndpoint, kvs.GetWithDefault(Endpoint, DefaultKVS)) + cfg.clnt = &http.Client{Transport: transport} + + return cfg, nil +} diff --git a/internal/config/cache/help.go b/internal/config/cache/help.go new file mode 100644 index 000000000..31ce940a6 --- /dev/null +++ b/internal/config/cache/help.go @@ -0,0 +1,48 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cache + +import "github.com/minio/minio/internal/config" + +var ( + defaultHelpPostfix = func(key string) string { + return config.DefaultHelpPostfix(DefaultKVS, key) + } + + // Help - provides help for cache config + Help = config.HelpKVS{ + config.HelpKV{ + Key: Enable, + Type: "on|off", + Description: "set to enable various caching optimizations" + defaultHelpPostfix(Enable), + Optional: true, + }, + config.HelpKV{ + Key: Endpoint, + Type: "string", + Description: "remote endpoint where MinIO will cache GET/HEAD metadata values such as ETag, ModTime" + defaultHelpPostfix(Endpoint), + Optional: true, + }, + config.HelpKV{ + Key: ObjectSize, + Type: "string", + Description: "maximum object size below which data is cached and fetched remotely from DRAM if possible" + defaultHelpPostfix(ObjectSize), + Optional: true, + }, + } +) diff --git a/internal/config/cache/remote.go b/internal/config/cache/remote.go new file mode 100644 index 000000000..8db4e2f55 --- /dev/null +++ b/internal/config/cache/remote.go @@ -0,0 +1,111 @@ +package cache + +import ( + "net/http" + "regexp" + "strconv" + "time" + + "github.com/minio/minio/internal/amztime" + xhttp "github.com/minio/minio/internal/http" +) + +//go:generate msgp -file=$GOFILE + +// ObjectInfo represents the object information cached remotely +type ObjectInfo struct { + Key string `json:"key"` + Bucket string `json:"bucket"` + ETag string `json:"etag"` + ModTime time.Time `json:"modTime"` + StatusCode int `json:"statusCode"` + + // Optional elements + CacheControl string `json:"cacheControl,omitempty" msg:",omitempty"` + Expires string `json:"expires,omitempty" msg:",omitempty"` + Metadata map[string]string `json:"metadata,omitempty" msg:",omitempty"` + Range string `json:"range,omitempty" msg:",omitempty"` + PartNumber int `json:"partNumber,omitempty" msg:",omitempty"` + Size int64 `json:"size,omitempty" msg:",omitempty"` // Full size of the object + Data []byte `json:"data,omitempty" msg:",omitempty"` // Data can container full data of the object or partial +} + +// WriteHeaders writes the response headers for conditional requests +func (oi ObjectInfo) WriteHeaders(w http.ResponseWriter, preamble, statusCode func()) { + preamble() + + if !oi.ModTime.IsZero() { + w.Header().Set(xhttp.LastModified, oi.ModTime.UTC().Format(http.TimeFormat)) + } + + if oi.ETag != "" { + w.Header()[xhttp.ETag] = []string{"\"" + oi.ETag + "\""} + } + + if oi.Expires != "" { + w.Header().Set(xhttp.Expires, oi.Expires) + } + + if oi.CacheControl != "" { + w.Header().Set(xhttp.CacheControl, oi.CacheControl) + } + + statusCode() +} + +// CondCheck represents the conditional request made to the remote cache +// for validation during GET/HEAD object requests. +type CondCheck struct { + ObjectInfo + IfMatch string `json:"ifMatch,omitempty" msg:",omitempty"` + IfNoneMatch string `json:"ifNoneMatch,omitempty" msg:",omitempty"` + IfModifiedSince *time.Time `json:"ifModSince,omitempty" msg:",omitempty"` + IfUnModifiedSince *time.Time `json:"ifUnmodSince,omitempty" msg:",omitempty"` + IfRange string `json:"ifRange,omitempty" msg:",omitempty"` + IfPartNumber int `json:"ifPartNumber,omitempty" msg:",omitempty"` +} + +// IsSet tells the cache lookup to avoid sending a request +func (r *CondCheck) IsSet() bool { + if r == nil { + return false + } + return r.IfMatch != "" || r.IfNoneMatch != "" || r.IfModifiedSince != nil || r.IfUnModifiedSince != nil || r.IfRange != "" +} + +var etagRegex = regexp.MustCompile("\"*?([^\"]*?)\"*?$") + +// canonicalizeETag returns ETag with leading and trailing double-quotes removed, +// if any present +func canonicalizeETag(etag string) string { + return etagRegex.ReplaceAllString(etag, "$1") +} + +// Init - populates the input values, initializes CondCheck +// before sending the request remotely. +func (r *CondCheck) Init(bucket, object string, header http.Header) { + r.Key = object + r.Bucket = bucket + + ifModifiedSinceHeader := header.Get(xhttp.IfModifiedSince) + if ifModifiedSinceHeader != "" { + if givenTime, err := amztime.ParseHeader(ifModifiedSinceHeader); err == nil { + r.IfModifiedSince = &givenTime + } + } + ifUnmodifiedSinceHeader := header.Get(xhttp.IfUnmodifiedSince) + if ifUnmodifiedSinceHeader != "" { + if givenTime, err := amztime.ParseHeader(ifUnmodifiedSinceHeader); err == nil { + r.IfUnModifiedSince = &givenTime + } + } + r.IfMatch = canonicalizeETag(header.Get(xhttp.IfMatch)) + r.IfNoneMatch = canonicalizeETag(header.Get(xhttp.IfNoneMatch)) + r.IfRange = header.Get(xhttp.Range) + ifPartNumberHeader := header.Get(xhttp.PartNumber) + if ifPartNumberHeader != "" { + if partNumber, err := strconv.Atoi(ifPartNumberHeader); err == nil { + r.IfPartNumber = partNumber + } + } +} diff --git a/internal/config/cache/remote_gen.go b/internal/config/cache/remote_gen.go new file mode 100644 index 000000000..b6b7e7e62 --- /dev/null +++ b/internal/config/cache/remote_gen.go @@ -0,0 +1,795 @@ +package cache + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "time" + + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *CondCheck) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ObjectInfo": + err = z.ObjectInfo.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "ObjectInfo") + return + } + case "IfMatch": + z.IfMatch, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "IfMatch") + return + } + case "IfNoneMatch": + z.IfNoneMatch, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "IfNoneMatch") + return + } + case "IfModifiedSince": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "IfModifiedSince") + return + } + z.IfModifiedSince = nil + } else { + if z.IfModifiedSince == nil { + z.IfModifiedSince = new(time.Time) + } + *z.IfModifiedSince, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "IfModifiedSince") + return + } + } + case "IfUnModifiedSince": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "IfUnModifiedSince") + return + } + z.IfUnModifiedSince = nil + } else { + if z.IfUnModifiedSince == nil { + z.IfUnModifiedSince = new(time.Time) + } + *z.IfUnModifiedSince, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "IfUnModifiedSince") + return + } + } + case "IfRange": + z.IfRange, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "IfRange") + return + } + case "IfPartNumber": + z.IfPartNumber, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "IfPartNumber") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *CondCheck) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 7 + // write "ObjectInfo" + err = en.Append(0x87, 0xaa, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x49, 0x6e, 0x66, 0x6f) + if err != nil { + return + } + err = z.ObjectInfo.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "ObjectInfo") + return + } + // write "IfMatch" + err = en.Append(0xa7, 0x49, 0x66, 0x4d, 0x61, 0x74, 0x63, 0x68) + if err != nil { + return + } + err = en.WriteString(z.IfMatch) + if err != nil { + err = msgp.WrapError(err, "IfMatch") + return + } + // write "IfNoneMatch" + err = en.Append(0xab, 0x49, 0x66, 0x4e, 0x6f, 0x6e, 0x65, 0x4d, 0x61, 0x74, 0x63, 0x68) + if err != nil { + return + } + err = en.WriteString(z.IfNoneMatch) + if err != nil { + err = msgp.WrapError(err, "IfNoneMatch") + return + } + // write "IfModifiedSince" + err = en.Append(0xaf, 0x49, 0x66, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65) + if err != nil { + return + } + if z.IfModifiedSince == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteTime(*z.IfModifiedSince) + if err != nil { + err = msgp.WrapError(err, "IfModifiedSince") + return + } + } + // write "IfUnModifiedSince" + err = en.Append(0xb1, 0x49, 0x66, 0x55, 0x6e, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65) + if err != nil { + return + } + if z.IfUnModifiedSince == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteTime(*z.IfUnModifiedSince) + if err != nil { + err = msgp.WrapError(err, "IfUnModifiedSince") + return + } + } + // write "IfRange" + err = en.Append(0xa7, 0x49, 0x66, 0x52, 0x61, 0x6e, 0x67, 0x65) + if err != nil { + return + } + err = en.WriteString(z.IfRange) + if err != nil { + err = msgp.WrapError(err, "IfRange") + return + } + // write "IfPartNumber" + err = en.Append(0xac, 0x49, 0x66, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72) + if err != nil { + return + } + err = en.WriteInt(z.IfPartNumber) + if err != nil { + err = msgp.WrapError(err, "IfPartNumber") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *CondCheck) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 7 + // string "ObjectInfo" + o = append(o, 0x87, 0xaa, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x49, 0x6e, 0x66, 0x6f) + o, err = z.ObjectInfo.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "ObjectInfo") + return + } + // string "IfMatch" + o = append(o, 0xa7, 0x49, 0x66, 0x4d, 0x61, 0x74, 0x63, 0x68) + o = msgp.AppendString(o, z.IfMatch) + // string "IfNoneMatch" + o = append(o, 0xab, 0x49, 0x66, 0x4e, 0x6f, 0x6e, 0x65, 0x4d, 0x61, 0x74, 0x63, 0x68) + o = msgp.AppendString(o, z.IfNoneMatch) + // string "IfModifiedSince" + o = append(o, 0xaf, 0x49, 0x66, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65) + if z.IfModifiedSince == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendTime(o, *z.IfModifiedSince) + } + // string "IfUnModifiedSince" + o = append(o, 0xb1, 0x49, 0x66, 0x55, 0x6e, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65) + if z.IfUnModifiedSince == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendTime(o, *z.IfUnModifiedSince) + } + // string "IfRange" + o = append(o, 0xa7, 0x49, 0x66, 0x52, 0x61, 0x6e, 0x67, 0x65) + o = msgp.AppendString(o, z.IfRange) + // string "IfPartNumber" + o = append(o, 0xac, 0x49, 0x66, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72) + o = msgp.AppendInt(o, z.IfPartNumber) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *CondCheck) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ObjectInfo": + bts, err = z.ObjectInfo.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "ObjectInfo") + return + } + case "IfMatch": + z.IfMatch, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IfMatch") + return + } + case "IfNoneMatch": + z.IfNoneMatch, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IfNoneMatch") + return + } + case "IfModifiedSince": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.IfModifiedSince = nil + } else { + if z.IfModifiedSince == nil { + z.IfModifiedSince = new(time.Time) + } + *z.IfModifiedSince, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IfModifiedSince") + return + } + } + case "IfUnModifiedSince": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.IfUnModifiedSince = nil + } else { + if z.IfUnModifiedSince == nil { + z.IfUnModifiedSince = new(time.Time) + } + *z.IfUnModifiedSince, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IfUnModifiedSince") + return + } + } + case "IfRange": + z.IfRange, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IfRange") + return + } + case "IfPartNumber": + z.IfPartNumber, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IfPartNumber") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *CondCheck) Msgsize() (s int) { + s = 1 + 11 + z.ObjectInfo.Msgsize() + 8 + msgp.StringPrefixSize + len(z.IfMatch) + 12 + msgp.StringPrefixSize + len(z.IfNoneMatch) + 16 + if z.IfModifiedSince == nil { + s += msgp.NilSize + } else { + s += msgp.TimeSize + } + s += 18 + if z.IfUnModifiedSince == nil { + s += msgp.NilSize + } else { + s += msgp.TimeSize + } + s += 8 + msgp.StringPrefixSize + len(z.IfRange) + 13 + msgp.IntSize + return +} + +// DecodeMsg implements msgp.Decodable +func (z *ObjectInfo) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Key": + z.Key, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Key") + return + } + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "ETag": + z.ETag, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ETag") + return + } + case "ModTime": + z.ModTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "ModTime") + return + } + case "StatusCode": + z.StatusCode, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "StatusCode") + return + } + case "CacheControl": + z.CacheControl, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "CacheControl") + return + } + case "Expires": + z.Expires, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Expires") + return + } + case "Metadata": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + if z.Metadata == nil { + z.Metadata = make(map[string]string, zb0002) + } else if len(z.Metadata) > 0 { + for key := range z.Metadata { + delete(z.Metadata, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 string + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Metadata", za0001) + return + } + z.Metadata[za0001] = za0002 + } + case "Range": + z.Range, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Range") + return + } + case "PartNumber": + z.PartNumber, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "PartNumber") + return + } + case "Size": + z.Size, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + case "Data": + z.Data, err = dc.ReadBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ObjectInfo) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 12 + // write "Key" + err = en.Append(0x8c, 0xa3, 0x4b, 0x65, 0x79) + if err != nil { + return + } + err = en.WriteString(z.Key) + if err != nil { + err = msgp.WrapError(err, "Key") + return + } + // write "Bucket" + err = en.Append(0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "ETag" + err = en.Append(0xa4, 0x45, 0x54, 0x61, 0x67) + if err != nil { + return + } + err = en.WriteString(z.ETag) + if err != nil { + err = msgp.WrapError(err, "ETag") + return + } + // write "ModTime" + err = en.Append(0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteTime(z.ModTime) + if err != nil { + err = msgp.WrapError(err, "ModTime") + return + } + // write "StatusCode" + err = en.Append(0xaa, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65) + if err != nil { + return + } + err = en.WriteInt(z.StatusCode) + if err != nil { + err = msgp.WrapError(err, "StatusCode") + return + } + // write "CacheControl" + err = en.Append(0xac, 0x43, 0x61, 0x63, 0x68, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c) + if err != nil { + return + } + err = en.WriteString(z.CacheControl) + if err != nil { + err = msgp.WrapError(err, "CacheControl") + return + } + // write "Expires" + err = en.Append(0xa7, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteString(z.Expires) + if err != nil { + err = msgp.WrapError(err, "Expires") + return + } + // write "Metadata" + err = en.Append(0xa8, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Metadata))) + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + for za0001, za0002 := range z.Metadata { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Metadata", za0001) + return + } + } + // write "Range" + err = en.Append(0xa5, 0x52, 0x61, 0x6e, 0x67, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Range) + if err != nil { + err = msgp.WrapError(err, "Range") + return + } + // write "PartNumber" + err = en.Append(0xaa, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72) + if err != nil { + return + } + err = en.WriteInt(z.PartNumber) + if err != nil { + err = msgp.WrapError(err, "PartNumber") + return + } + // write "Size" + err = en.Append(0xa4, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.Size) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + // write "Data" + err = en.Append(0xa4, 0x44, 0x61, 0x74, 0x61) + if err != nil { + return + } + err = en.WriteBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ObjectInfo) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 12 + // string "Key" + o = append(o, 0x8c, 0xa3, 0x4b, 0x65, 0x79) + o = msgp.AppendString(o, z.Key) + // string "Bucket" + o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "ETag" + o = append(o, 0xa4, 0x45, 0x54, 0x61, 0x67) + o = msgp.AppendString(o, z.ETag) + // string "ModTime" + o = append(o, 0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65) + o = msgp.AppendTime(o, z.ModTime) + // string "StatusCode" + o = append(o, 0xaa, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65) + o = msgp.AppendInt(o, z.StatusCode) + // string "CacheControl" + o = append(o, 0xac, 0x43, 0x61, 0x63, 0x68, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c) + o = msgp.AppendString(o, z.CacheControl) + // string "Expires" + o = append(o, 0xa7, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73) + o = msgp.AppendString(o, z.Expires) + // string "Metadata" + o = append(o, 0xa8, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61) + o = msgp.AppendMapHeader(o, uint32(len(z.Metadata))) + for za0001, za0002 := range z.Metadata { + o = msgp.AppendString(o, za0001) + o = msgp.AppendString(o, za0002) + } + // string "Range" + o = append(o, 0xa5, 0x52, 0x61, 0x6e, 0x67, 0x65) + o = msgp.AppendString(o, z.Range) + // string "PartNumber" + o = append(o, 0xaa, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72) + o = msgp.AppendInt(o, z.PartNumber) + // string "Size" + o = append(o, 0xa4, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendInt64(o, z.Size) + // string "Data" + o = append(o, 0xa4, 0x44, 0x61, 0x74, 0x61) + o = msgp.AppendBytes(o, z.Data) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ObjectInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Key": + z.Key, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Key") + return + } + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "ETag": + z.ETag, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ETag") + return + } + case "ModTime": + z.ModTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ModTime") + return + } + case "StatusCode": + z.StatusCode, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "StatusCode") + return + } + case "CacheControl": + z.CacheControl, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "CacheControl") + return + } + case "Expires": + z.Expires, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Expires") + return + } + case "Metadata": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + if z.Metadata == nil { + z.Metadata = make(map[string]string, zb0002) + } else if len(z.Metadata) > 0 { + for key := range z.Metadata { + delete(z.Metadata, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 string + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Metadata", za0001) + return + } + z.Metadata[za0001] = za0002 + } + case "Range": + z.Range, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Range") + return + } + case "PartNumber": + z.PartNumber, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "PartNumber") + return + } + case "Size": + z.Size, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + case "Data": + z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ObjectInfo) Msgsize() (s int) { + s = 1 + 4 + msgp.StringPrefixSize + len(z.Key) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.ETag) + 8 + msgp.TimeSize + 11 + msgp.IntSize + 13 + msgp.StringPrefixSize + len(z.CacheControl) + 8 + msgp.StringPrefixSize + len(z.Expires) + 9 + msgp.MapHeaderSize + if z.Metadata != nil { + for za0001, za0002 := range z.Metadata { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) + } + } + s += 6 + msgp.StringPrefixSize + len(z.Range) + 11 + msgp.IntSize + 5 + msgp.Int64Size + 5 + msgp.BytesPrefixSize + len(z.Data) + return +} diff --git a/internal/config/cache/remote_gen_test.go b/internal/config/cache/remote_gen_test.go new file mode 100644 index 000000000..86a1a2b92 --- /dev/null +++ b/internal/config/cache/remote_gen_test.go @@ -0,0 +1,236 @@ +package cache + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalCondCheck(t *testing.T) { + v := CondCheck{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgCondCheck(b *testing.B) { + v := CondCheck{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgCondCheck(b *testing.B) { + v := CondCheck{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalCondCheck(b *testing.B) { + v := CondCheck{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeCondCheck(t *testing.T) { + v := CondCheck{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeCondCheck Msgsize() is inaccurate") + } + + vn := CondCheck{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeCondCheck(b *testing.B) { + v := CondCheck{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeCondCheck(b *testing.B) { + v := CondCheck{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalObjectInfo(t *testing.T) { + v := ObjectInfo{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgObjectInfo(b *testing.B) { + v := ObjectInfo{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgObjectInfo(b *testing.B) { + v := ObjectInfo{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalObjectInfo(b *testing.B) { + v := ObjectInfo{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeObjectInfo(t *testing.T) { + v := ObjectInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeObjectInfo Msgsize() is inaccurate") + } + + vn := ObjectInfo{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeObjectInfo(b *testing.B) { + v := ObjectInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeObjectInfo(b *testing.B) { + v := ObjectInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 080734d63..623fda661 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -183,6 +183,7 @@ var SubSystemsDynamic = set.CreateStringSet( AuditWebhookSubSys, AuditKafkaSubSys, StorageClassSubSys, + CacheSubSys, ) // SubSystemsSingleTargets - subsystems which only support single target. @@ -203,6 +204,7 @@ var SubSystemsSingleTargets = set.CreateStringSet( ScannerSubSys, SubnetSubSys, CallhomeSubSys, + CacheSubSys, ) // Constant separators