diff --git a/cmd/api-response.go b/cmd/api-response.go
index 14ccb666e..d713fc5f8 100644
--- a/cmd/api-response.go
+++ b/cmd/api-response.go
@@ -502,6 +502,47 @@ func generateListBucketsResponse(buckets []BucketInfo) ListBucketsResponse {
return data
}
+func cleanReservedKeys(metadata map[string]string) map[string]string {
+ m := cloneMSS(metadata)
+
+ switch kind, _ := crypto.IsEncrypted(metadata); kind {
+ case crypto.S3:
+ m[xhttp.AmzServerSideEncryption] = xhttp.AmzEncryptionAES
+ case crypto.S3KMS:
+ m[xhttp.AmzServerSideEncryption] = xhttp.AmzEncryptionKMS
+ m[xhttp.AmzServerSideEncryptionKmsID] = kmsKeyIDFromMetadata(metadata)
+ if kmsCtx, ok := metadata[crypto.MetaContext]; ok {
+ m[xhttp.AmzServerSideEncryptionKmsContext] = kmsCtx
+ }
+ case crypto.SSEC:
+ m[xhttp.AmzServerSideEncryptionCustomerAlgorithm] = xhttp.AmzEncryptionAES
+
+ }
+
+ var toRemove []string
+ for k := range cleanMinioInternalMetadataKeys(m) {
+ if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
+ // Do not need to send any internal metadata
+ // values to client.
+ toRemove = append(toRemove, k)
+ continue
+ }
+
+ // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
+ if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
+ toRemove = append(toRemove, k)
+ continue
+ }
+ }
+
+ for _, k := range toRemove {
+ delete(m, k)
+ delete(m, strings.ToLower(k))
+ }
+
+ return m
+}
+
// generates an ListBucketVersions response for the said bucket with other enumerated options.
func generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delimiter, encodingType string, maxKeys int, resp ListObjectVersionsInfo, metadata metaCheckFn) ListVersionsResponse {
versions := make([]ObjectVersion, 0, len(resp.Objects))
@@ -549,18 +590,10 @@ func generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delim
case crypto.SSEC:
content.UserMetadata.Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, xhttp.AmzEncryptionAES)
}
- for k, v := range cleanMinioInternalMetadataKeys(object.UserDefined) {
- if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
- // Do not need to send any internal metadata
- // values to client.
- continue
- }
- // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
- if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
- continue
- }
+ for k, v := range cleanReservedKeys(object.UserDefined) {
content.UserMetadata.Set(k, v)
}
+
content.UserMetadata.Set("expires", object.Expires.Format(http.TimeFormat))
content.Internal = &ObjectInternalInfo{
K: object.DataBlocks,
@@ -693,16 +726,7 @@ func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter,
case crypto.SSEC:
content.UserMetadata.Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, xhttp.AmzEncryptionAES)
}
- for k, v := range cleanMinioInternalMetadataKeys(object.UserDefined) {
- if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
- // Do not need to send any internal metadata
- // values to client.
- continue
- }
- // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
- if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
- continue
- }
+ for k, v := range cleanReservedKeys(object.UserDefined) {
content.UserMetadata.Set(k, v)
}
content.UserMetadata.Set("expires", object.Expires.Format(http.TimeFormat))
diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go
index 4b9ddd18d..8169d5cd9 100644
--- a/cmd/bucket-handlers.go
+++ b/cmd/bucket-handlers.go
@@ -51,6 +51,7 @@ import (
sse "github.com/minio/minio/internal/bucket/encryption"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
+ "github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event"
@@ -668,6 +669,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
continue
}
+ defer globalCacheConfig.Delete(bucket, dobj.ObjectName)
+
if replicateDeletes && (dobj.DeleteMarkerReplicationStatus() == replication.Pending || dobj.VersionPurgeStatus() == Pending) {
// copy so we can re-add null ID.
dobj := dobj
@@ -1343,6 +1346,22 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
continue
}
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: getDecryptedETag(formValues, objInfo, false),
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.Expires.UTC().Format(http.TimeFormat),
+ CacheControl: objInfo.CacheControl,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ Size: asize,
+ })
+
fanOutResp = append(fanOutResp, minio.PutObjectFanOutResponse{
Key: objInfo.Name,
ETag: getDecryptedETag(formValues, objInfo, false),
@@ -1399,10 +1418,12 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
+ etag := getDecryptedETag(formValues, objInfo, false)
+
// We must not use the http.Header().Set method here because some (broken)
// clients expect the ETag header key to be literally "ETag" - not "Etag" (case-sensitive).
// Therefore, we have to set the ETag directly as map entry.
- w.Header()[xhttp.ETag] = []string{`"` + objInfo.ETag + `"`}
+ w.Header()[xhttp.ETag] = []string{`"` + etag + `"`}
// Set the relevant version ID as part of the response header.
if objInfo.VersionID != "" && objInfo.VersionID != nullVersionID {
@@ -1413,6 +1434,22 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
w.Header().Set(xhttp.Location, obj)
}
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ defer globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: etag,
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.ExpiresStr(),
+ CacheControl: objInfo.CacheControl,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ Size: asize,
+ })
+
// Notify object created event.
defer sendEvent(eventArgs{
EventName: event.ObjectCreatedPost,
diff --git a/cmd/config-current.go b/cmd/config-current.go
index 105a32287..c3b9fdc3d 100644
--- a/cmd/config-current.go
+++ b/cmd/config-current.go
@@ -27,6 +27,7 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/api"
+ "github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns"
@@ -68,6 +69,7 @@ func initHelp() {
config.ScannerSubSys: scanner.DefaultKVS,
config.SubnetSubSys: subnet.DefaultKVS,
config.CallhomeSubSys: callhome.DefaultKVS,
+ config.CacheSubSys: cache.DefaultKVS,
}
for k, v := range notify.DefaultNotificationKVS {
kvs[k] = v
@@ -206,6 +208,12 @@ func initHelp() {
Key: config.EtcdSubSys,
Description: "persist IAM assets externally to etcd",
},
+ config.HelpKV{
+ Key: config.CacheSubSys,
+ Type: "string",
+ Description: "enable various cache optimizations on MinIO for reads",
+ Optional: true,
+ },
}
if globalIsErasure {
@@ -250,6 +258,7 @@ func initHelp() {
config.LambdaWebhookSubSys: lambda.HelpWebhook,
config.SubnetSubSys: subnet.HelpSubnet,
config.CallhomeSubSys: callhome.HelpCallhome,
+ config.CacheSubSys: cache.Help,
}
config.RegisterHelpSubSys(helpMap)
@@ -357,6 +366,10 @@ func validateSubSysConfig(ctx context.Context, s config.Config, subSys string, o
if cfg.Enabled() && !globalSubnetConfig.Registered() {
return errors.New("Deployment is not registered with SUBNET. Please register the deployment via 'mc license register ALIAS'")
}
+ case config.CacheSubSys:
+ if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport); err != nil {
+ return err
+ }
case config.PolicyOPASubSys:
// In case legacy OPA config is being set, we treat it as if the
// AuthZPlugin is being set.
@@ -632,6 +645,13 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
initCallhome(ctx, objAPI)
}
}
+ case config.CacheSubSys:
+ cacheCfg, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport)
+ if err != nil {
+ logger.LogIf(ctx, fmt.Errorf("Unable to load cache config: %w", err))
+ } else {
+ globalCacheConfig.Update(cacheCfg)
+ }
}
globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock()
diff --git a/cmd/erasure-healing-common.go b/cmd/erasure-healing-common.go
index 7629047e4..0f576d0d9 100644
--- a/cmd/erasure-healing-common.go
+++ b/cmd/erasure-healing-common.go
@@ -286,6 +286,7 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
availableDisks := make([]StorageAPI, len(onlineDisks))
dataErrs := make([]error, len(onlineDisks))
+
inconsistent := 0
for i, meta := range partsMetadata {
if !meta.IsValid() {
diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go
index ef3bc3f0f..7a858fa90 100644
--- a/cmd/erasure-metadata-utils.go
+++ b/cmd/erasure-metadata-utils.go
@@ -148,6 +148,26 @@ func hashOrder(key string, cardinality int) []int {
return nums
}
+var readFileInfoIgnoredErrs = append(objectOpIgnoredErrs,
+ errFileNotFound,
+ errVolumeNotFound,
+ errFileVersionNotFound,
+ io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors
+ io.EOF, // some times we would read without locks, ignore these errors
+)
+
+func readFileInfo(ctx context.Context, disk StorageAPI, bucket, object, versionID string, opts ReadOptions) (FileInfo, error) {
+ fi, err := disk.ReadVersion(ctx, bucket, object, versionID, opts)
+
+ if err != nil && !IsErr(err, readFileInfoIgnoredErrs...) {
+ logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
+ disk.String(), bucket, object, err),
+ disk.String())
+ }
+
+ return fi, err
+}
+
// Reads all `xl.meta` metadata as a FileInfo slice.
// Returns error slice indicating the failed metadata reads.
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData, healing bool) ([]FileInfo, []error) {
@@ -166,33 +186,12 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
if disks[index] == nil {
return errDiskNotFound
}
- metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, opts)
+ metadataArray[index], err = readFileInfo(ctx, disks[index], bucket, object, versionID, opts)
return err
}, index)
}
- ignoredErrs := []error{
- errFileNotFound,
- errVolumeNotFound,
- errFileVersionNotFound,
- io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors
- io.EOF, // some times we would read without locks, ignore these errors
- }
- ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...)
- errs := g.Wait()
- for index, err := range errs {
- if err == nil {
- continue
- }
- if !IsErr(err, ignoredErrs...) {
- logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
- disks[index], bucket, object, err),
- disks[index].String())
- }
- }
-
- // Return all the metadata.
- return metadataArray, errs
+ return metadataArray, g.Wait()
}
// shuffleDisksAndPartsMetadataByIndex this function should be always used by GetObjectNInfo()
diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go
index 5cd78ee96..693addc27 100644
--- a/cmd/erasure-metadata.go
+++ b/cmd/erasure-metadata.go
@@ -158,6 +158,7 @@ func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInf
ContentEncoding: fi.Metadata["content-encoding"],
NumVersions: fi.NumVersions,
SuccessorModTime: fi.SuccessorModTime,
+ CacheControl: fi.Metadata["cache-control"],
}
if exp, ok := fi.Metadata["expires"]; ok {
diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go
index 003792e22..edb429e41 100644
--- a/cmd/erasure-object.go
+++ b/cmd/erasure-object.go
@@ -50,7 +50,7 @@ import (
)
// list all errors which can be ignored in object operations.
-var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnformattedDisk)
+var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnformattedDisk, errDiskOngoingReq)
// Object Operations
@@ -545,66 +545,111 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
return m, err
}
-func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData, inclFreeVers, allParts bool) ([]FileInfo, []error) {
- metadataArray := make([]*xlMetaV2, len(disks))
- metaFileInfos := make([]FileInfo, len(metadataArray))
- metadataShallowVersions := make([][]xlMetaV2ShallowVersion, len(disks))
- var v2bufs [][]byte
- if !readData {
- v2bufs = make([][]byte, len(disks))
+var readRawFileInfoErrs = append(objectOpIgnoredErrs,
+ errFileNotFound,
+ errFileNameTooLong,
+ errVolumeNotFound,
+ errFileVersionNotFound,
+ io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors
+ io.EOF, // some times we would read without locks, ignore these errors
+ msgp.ErrShortBytes,
+ context.DeadlineExceeded,
+ context.Canceled,
+)
+
+func readRawFileInfo(ctx context.Context, disk StorageAPI, bucket, object string, readData bool) (RawFileInfo, error) {
+ rf, err := disk.ReadXL(ctx, bucket, object, readData)
+
+ if err != nil && !IsErr(err, readRawFileInfoErrs...) {
+ logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
+ disk.String(), bucket, object, err),
+ disk.String())
+ }
+ return rf, err
+}
+
+func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVers, allParts bool) (FileInfo, error) {
+ var xl xlMetaV2
+ if err := xl.LoadOrConvert(ri.Buf); err != nil {
+ return FileInfo{}, err
}
+ fi, err := xl.ToFileInfo(bucket, object, "", inclFreeVers, allParts)
+ if err != nil {
+ return FileInfo{}, err
+ }
+
+ if !fi.IsValid() {
+ return FileInfo{}, errCorruptedFormat
+ }
+
+ versionID := fi.VersionID
+ if versionID == "" {
+ versionID = nullVersionID
+ }
+
+ fileInfo, err := xl.ToFileInfo(bucket, object, versionID, inclFreeVers, allParts)
+ if err != nil {
+ return FileInfo{}, err
+ }
+
+ if readData {
+ fileInfo.Data = xl.data.find(versionID)
+ }
+
+ fileInfo.DiskMTime = ri.DiskMTime
+ return fileInfo, nil
+}
+
+func readAllRawFileInfo(ctx context.Context, disks []StorageAPI, bucket, object string, readData bool) ([]RawFileInfo, []error) {
+ rawFileInfos := make([]RawFileInfo, len(disks))
g := errgroup.WithNErrs(len(disks))
- // Read `xl.meta` in parallel across disks.
for index := range disks {
index := index
g.Go(func() (err error) {
if disks[index] == nil {
return errDiskNotFound
}
- rf, err := disks[index].ReadXL(ctx, bucket, object, readData)
+ rf, err := readRawFileInfo(ctx, disks[index], bucket, object, readData)
if err != nil {
return err
}
- if !readData {
- // Save the buffer so we can reuse it.
- v2bufs[index] = rf.Buf
- }
-
- var xl xlMetaV2
- if err = xl.LoadOrConvert(rf.Buf); err != nil {
- return err
- }
- metadataArray[index] = &xl
- metaFileInfos[index] = FileInfo{
- DiskMTime: rf.DiskMTime,
- }
+ rawFileInfos[index] = rf
return nil
}, index)
}
- ignoredErrs := []error{
- errFileNotFound,
- errFileNameTooLong,
- errVolumeNotFound,
- errFileVersionNotFound,
- io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors
- io.EOF, // some times we would read without locks, ignore these errors
- msgp.ErrShortBytes,
- context.DeadlineExceeded,
- context.Canceled,
- }
- ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...)
+ return rawFileInfos, g.Wait()
+}
- errs := g.Wait()
- for index, err := range errs {
- if err == nil {
+func pickLatestQuorumFilesInfo(ctx context.Context, rawFileInfos []RawFileInfo, errs []error, bucket, object string, readData, inclFreeVers, allParts bool) ([]FileInfo, []error) {
+ metadataArray := make([]*xlMetaV2, len(rawFileInfos))
+ metaFileInfos := make([]FileInfo, len(rawFileInfos))
+ metadataShallowVersions := make([][]xlMetaV2ShallowVersion, len(rawFileInfos))
+ var v2bufs [][]byte
+ if !readData {
+ v2bufs = make([][]byte, len(rawFileInfos))
+ }
+
+ // Read `xl.meta` in parallel across disks.
+ for index := range rawFileInfos {
+ rf := rawFileInfos[index]
+ if rf.Buf == nil {
continue
}
- if !IsErr(err, ignoredErrs...) {
- logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
- disks[index], bucket, object, err),
- disks[index].String())
+ if !readData {
+ // Save the buffer so we can reuse it.
+ v2bufs[index] = rf.Buf
+ }
+
+ var xl xlMetaV2
+ if err := xl.LoadOrConvert(rf.Buf); err != nil {
+ errs[index] = err
+ continue
+ }
+ metadataArray[index] = &xl
+ metaFileInfos[index] = FileInfo{
+ DiskMTime: rf.DiskMTime,
}
}
@@ -614,7 +659,7 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
}
}
- readQuorum := (len(disks) + 1) / 2
+ readQuorum := (len(rawFileInfos) + 1) / 2
meta := &xlMetaV2{versions: mergeXLV2Versions(readQuorum, false, 1, metadataShallowVersions...)}
lfi, err := meta.ToFileInfo(bucket, object, "", inclFreeVers, allParts)
if err != nil {
@@ -692,19 +737,177 @@ func shouldCheckForDangling(err error, errs []error, bucket string) bool {
return false
}
-func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
- disks := er.getDisks()
+func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData, inclFreeVers, allParts bool) ([]FileInfo, []error) {
+ rawFileInfos, errs := readAllRawFileInfo(ctx, disks, bucket, object, readData)
+ return pickLatestQuorumFilesInfo(ctx, rawFileInfos, errs, bucket, object, readData, inclFreeVers, allParts)
+}
- var errs []error
+func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (FileInfo, []FileInfo, []StorageAPI, error) {
+ var mu sync.Mutex
- // Read metadata associated with the object from all disks.
- if opts.VersionID != "" {
- metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData, false)
- } else {
- metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions, true)
+ rawArr := make([]RawFileInfo, er.setDriveCount)
+ metaArr := make([]FileInfo, er.setDriveCount)
+ errs := make([]error, er.setDriveCount)
+ for i := range errs {
+ errs[i] = errDiskOngoingReq
+ }
+
+ done := make(chan bool, er.setDriveCount)
+ disks := er.getDisks()
+
+ mrfCheck := make(chan FileInfo)
+ defer close(mrfCheck)
+
+ ropts := ReadOptions{
+ ReadData: readData,
+ Healing: false,
+ }
+
+ // Ask for all disks first;
+ go func() {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ wg := sync.WaitGroup{}
+ for i, disk := range disks {
+ if disk == nil {
+ done <- false
+ continue
+ }
+ wg.Add(1)
+ go func(i int, disk StorageAPI) {
+ defer wg.Done()
+ var fi FileInfo
+ var err error
+ if opts.VersionID != "" {
+ // Read a specific version ID
+ fi, err = readFileInfo(ctx, disk, bucket, object, opts.VersionID, ropts)
+ mu.Lock()
+ metaArr[i], errs[i] = fi, err
+ mu.Unlock()
+ } else {
+ // Read the latest version
+ ri, err := readRawFileInfo(ctx, disk, bucket, object, readData)
+ mu.Lock()
+ rawArr[i], errs[i] = ri, err
+ mu.Unlock()
+ if err == nil {
+ fi, err = fileInfoFromRaw(ri, bucket, object, readData, opts.InclFreeVersions, true)
+ mu.Lock()
+ metaArr[i], errs[i] = fi, err
+ mu.Unlock()
+ }
+ }
+ done <- err == nil
+ }(i, disk)
+ }
+
+ wg.Wait()
+ close(done)
+
+ fi, ok := <-mrfCheck
+ if !ok {
+ return
+ }
+ if fi.Deleted {
+ return
+ }
+ // if one of the disk is offline, return right here no need
+ // to attempt a heal on the object.
+ if countErrs(errs, errDiskNotFound) > 0 {
+ return
+ }
+ var missingBlocks int
+ for i := range errs {
+ if errors.Is(errs[i], errFileNotFound) {
+ missingBlocks++
+ }
+ }
+ // if missing metadata can be reconstructed, attempt to reconstruct.
+ // additionally do not heal delete markers inline, let them be
+ // healed upon regular heal process.
+ if missingBlocks > 0 && missingBlocks < fi.Erasure.DataBlocks {
+ globalMRFState.addPartialOp(partialOperation{
+ bucket: fi.Volume,
+ object: fi.Name,
+ versionID: fi.VersionID,
+ queued: time.Now(),
+ setIndex: er.setIndex,
+ poolIndex: er.poolIndex,
+ })
+ }
+
+ return
+ }()
+
+ validResp := 0
+ totalResp := 0
+
+ // minDisks value is only to reduce the number of calls
+ // to the disks; this value is not accurate because we do
+ // not know the storage class of the object yet
+ minDisks := 0
+ if p := globalStorageClass.GetParityForSC(""); p > -1 {
+ minDisks = er.setDriveCount - p
+ } else {
+ minDisks = er.setDriveCount - er.defaultParityCount
+ }
+
+ calcQuorum := func() (FileInfo, []FileInfo, []StorageAPI, time.Time, string, error) {
+ readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
+ if err != nil {
+ return FileInfo{}, nil, nil, time.Time{}, "", err
+ }
+ err = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
+ if err != nil {
+ return FileInfo{}, nil, nil, time.Time{}, "", err
+ }
+ onlineDisks, modTime, etag := listOnlineDisks(disks, metaArr, errs, readQuorum)
+ fi, err := pickValidFileInfo(ctx, metaArr, modTime, etag, readQuorum)
+ if err != nil {
+ return FileInfo{}, nil, nil, time.Time{}, "", err
+ }
+
+ onlineMeta := make([]FileInfo, len(metaArr))
+ copy(onlineMeta, metaArr)
+
+ return fi, onlineMeta, onlineDisks, modTime, etag, nil
+ }
+
+ var (
+ modTime time.Time
+ etag string
+ fi FileInfo
+ onlineMeta []FileInfo
+ onlineDisks []StorageAPI
+ err error
+ )
+
+ for success := range done {
+ totalResp++
+ if success {
+ validResp++
+ }
+ if totalResp < er.setDriveCount {
+ if !opts.FastGetObjInfo {
+ continue
+ }
+ if validResp < minDisks {
+ continue
+ }
+ }
+ if opts.VersionID == "" && totalResp == er.setDriveCount {
+ // Disks cannot agree about the latest version, pass this to a more advanced code
+ metaArr, errs = pickLatestQuorumFilesInfo(ctx, rawArr, errs, bucket, object, readData, opts.InclFreeVersions, true)
+ }
+ mu.Lock()
+ fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum()
+ mu.Unlock()
+ if err == nil && fi.InlineData() {
+ break
+ }
}
- readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil {
if shouldCheckForDangling(err, errs, bucket) {
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
@@ -715,25 +918,6 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
return fi, nil, nil, toObjectErr(err, bucket, object)
}
- if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
- if shouldCheckForDangling(err, errs, bucket) {
- _, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
- if derr != nil {
- reducedErr = derr
- }
- }
- return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
- }
-
- // List all online disks.
- onlineDisks, modTime, etag := listOnlineDisks(disks, metaArr, errs, readQuorum)
-
- // Pick latest valid metadata.
- fi, err = pickValidFileInfo(ctx, metaArr, modTime, etag, readQuorum)
- if err != nil {
- return fi, nil, nil, err
- }
-
if !fi.Deleted && len(fi.Erasure.Distribution) != len(onlineDisks) {
err := fmt.Errorf("unexpected file distribution (%v) from online disks (%v), looks like backend disks have been manually modified refusing to heal %s/%s(%s)",
fi.Erasure.Distribution, onlineDisks, bucket, object, opts.VersionID)
@@ -741,26 +925,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
return fi, nil, nil, toObjectErr(err, bucket, object, opts.VersionID)
}
- filterOnlineDisksInplace(fi, metaArr, onlineDisks)
-
- // if one of the disk is offline, return right here no need
- // to attempt a heal on the object.
- if countErrs(errs, errDiskNotFound) > 0 {
- return fi, metaArr, onlineDisks, nil
- }
-
- var missingBlocks int
- for i, err := range errs {
- if err != nil && errors.Is(err, errFileNotFound) {
- missingBlocks++
- continue
- }
-
+ filterOnlineDisksInplace(fi, onlineMeta, onlineDisks)
+ for i := range onlineMeta {
// verify metadata is valid, it has similar erasure info
// as well as common modtime, if modtime is not possible
// verify if it has common "etag" atleast.
- if metaArr[i].IsValid() && metaArr[i].Erasure.Equal(fi.Erasure) {
- ok := metaArr[i].ModTime.Equal(modTime)
+ if onlineMeta[i].IsValid() && onlineMeta[i].Erasure.Equal(fi.Erasure) {
+ ok := onlineMeta[i].ModTime.Equal(modTime)
if modTime.IsZero() || modTime.Equal(timeSentinel) {
ok = etag != "" && etag == fi.Metadata["etag"]
}
@@ -769,26 +940,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
} // in all other cases metadata is corrupt, do not read from it.
- metaArr[i] = FileInfo{}
+ onlineMeta[i] = FileInfo{}
onlineDisks[i] = nil
- missingBlocks++
}
- // if missing metadata can be reconstructed, attempt to reconstruct.
- // additionally do not heal delete markers inline, let them be
- // healed upon regular heal process.
- if !fi.Deleted && missingBlocks > 0 && missingBlocks < readQuorum {
- globalMRFState.addPartialOp(partialOperation{
- bucket: bucket,
- object: object,
- versionID: fi.VersionID,
- queued: time.Now(),
- setIndex: er.setIndex,
- poolIndex: er.poolIndex,
- })
- }
+ mrfCheck <- fi.ShallowCopy()
- return fi, metaArr, onlineDisks, nil
+ return fi, onlineMeta, onlineDisks, nil
}
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
@@ -1637,6 +1795,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
auditObjectErasureSet(ctx, object, &er)
if opts.DeletePrefix {
+ if globalCacheConfig.Enabled() {
+ return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
+ }
return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object)
}
diff --git a/cmd/globals.go b/cmd/globals.go
index e48c2a9bb..eb173efc3 100644
--- a/cmd/globals.go
+++ b/cmd/globals.go
@@ -38,6 +38,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/auth"
+ "github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns"
@@ -242,6 +243,9 @@ var (
// The global callhome config
globalCallhomeConfig callhome.Config
+ // The global cache config
+ globalCacheConfig cache.Config
+
// Global server's network statistics
globalConnStats = newConnStats()
diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go
index ae9949a89..e12c213c5 100644
--- a/cmd/object-api-datatypes.go
+++ b/cmd/object-api-datatypes.go
@@ -20,6 +20,7 @@ package cmd
import (
"io"
"math"
+ "net/http"
"time"
"github.com/dustin/go-humanize"
@@ -149,6 +150,9 @@ type ObjectInfo struct {
// Date and time at which the object is no longer able to be cached
Expires time.Time
+ // Cache-Control - Specifies caching behavior along the request/reply chain
+ CacheControl string
+
// Specify object storage class
StorageClass string
@@ -200,6 +204,15 @@ type ObjectInfo struct {
ParityBlocks int
}
+// ExpiresStr returns a stringified version of Expires header in http.TimeFormat
+func (o ObjectInfo) ExpiresStr() string {
+ var expires string
+ if !o.Expires.IsZero() {
+ expires = o.Expires.UTC().Format(http.TimeFormat)
+ }
+ return expires
+}
+
// ArchiveInfo returns any saved zip archive meta information.
// It will be decrypted if needed.
func (o *ObjectInfo) ArchiveInfo() []byte {
diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go
index 63305fab5..8b17e82df 100644
--- a/cmd/object-api-interface.go
+++ b/cmd/object-api-interface.go
@@ -109,6 +109,8 @@ type ObjectOptions struct {
MetadataChg bool // is true if it is a metadata update operation.
EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject.
+
+ FastGetObjInfo bool // Only for S3 Head/Get Object calls for now
}
// ExpirationOptions represents object options for object expiration at objectLayer.
diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go
index 043c3b76a..42279ae66 100644
--- a/cmd/object-api-utils.go
+++ b/cmd/object-api-utils.go
@@ -64,6 +64,7 @@ const (
minioMetaTmpBucket = minioMetaBucket + "/tmp"
// MinIO tmp meta prefix for deleted objects.
minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash"
+
// DNS separator (period), used for bucket name validation.
dnsDelimiter = "."
// On compressed files bigger than this;
@@ -697,7 +698,6 @@ type GetObjectReader struct {
io.Reader
ObjInfo ObjectInfo
cleanUpFns []func()
- opts ObjectOptions
once sync.Once
}
@@ -722,7 +722,6 @@ func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions
ObjInfo: oi,
Reader: r,
cleanUpFns: cleanupFns,
- opts: opts,
}, nil
}
@@ -859,7 +858,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
ObjInfo: oi,
Reader: decReader,
cleanUpFns: cFns,
- opts: opts,
}
return r, nil
}
@@ -913,7 +911,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
ObjInfo: oi,
Reader: decReader,
cleanUpFns: cFns,
- opts: opts,
}
return r, nil
}
@@ -928,7 +925,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
ObjInfo: oi,
Reader: inputReader,
cleanUpFns: cFns,
- opts: opts,
}
return r, nil
}
diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go
index c53287f3e..8ff232f7e 100644
--- a/cmd/object-handlers-common.go
+++ b/cmd/object-handlers-common.go
@@ -202,6 +202,31 @@ func checkPreconditionsPUT(ctx context.Context, w http.ResponseWriter, r *http.R
return false
}
+// Headers to be set of object content is not going to be written to the client.
+func writeHeadersPrecondition(w http.ResponseWriter, objInfo ObjectInfo) {
+ // set common headers
+ setCommonHeaders(w)
+
+ // set object-related metadata headers
+ w.Header().Set(xhttp.LastModified, objInfo.ModTime.UTC().Format(http.TimeFormat))
+
+ if objInfo.ETag != "" {
+ w.Header()[xhttp.ETag] = []string{"\"" + objInfo.ETag + "\""}
+ }
+
+ if objInfo.VersionID != "" {
+ w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
+ }
+
+ if !objInfo.Expires.IsZero() {
+ w.Header().Set(xhttp.Expires, objInfo.Expires.UTC().Format(http.TimeFormat))
+ }
+
+ if objInfo.CacheControl != "" {
+ w.Header().Set(xhttp.CacheControl, objInfo.CacheControl)
+ }
+}
+
// Validates the preconditions. Returns true if GET/HEAD operation should not proceed.
// Preconditions supported are:
//
@@ -221,19 +246,6 @@ func checkPreconditions(ctx context.Context, w http.ResponseWriter, r *http.Requ
return false
}
- // Headers to be set of object content is not going to be written to the client.
- writeHeaders := func() {
- // set common headers
- setCommonHeaders(w)
-
- // set object-related metadata headers
- w.Header().Set(xhttp.LastModified, objInfo.ModTime.UTC().Format(http.TimeFormat))
-
- if objInfo.ETag != "" {
- w.Header()[xhttp.ETag] = []string{"\"" + objInfo.ETag + "\""}
- }
- }
-
// Check if the part number is correct.
if opts.PartNumber > 1 && opts.PartNumber > len(objInfo.Parts) {
// According to S3 we don't need to set any object information here.
@@ -241,6 +253,18 @@ func checkPreconditions(ctx context.Context, w http.ResponseWriter, r *http.Requ
return true
}
+ // If-None-Match : Return the object only if its entity tag (ETag) is different from the
+ // one specified otherwise, return a 304 (not modified).
+ ifNoneMatchETagHeader := r.Header.Get(xhttp.IfNoneMatch)
+ if ifNoneMatchETagHeader != "" {
+ if isETagEqual(objInfo.ETag, ifNoneMatchETagHeader) {
+ // If the object ETag matches with the specified ETag.
+ writeHeadersPrecondition(w, objInfo)
+ w.WriteHeader(http.StatusNotModified)
+ return true
+ }
+ }
+
// If-Modified-Since : Return the object only if it has been modified since the specified time,
// otherwise return a 304 (not modified).
ifModifiedSinceHeader := r.Header.Get(xhttp.IfModifiedSince)
@@ -248,50 +272,39 @@ func checkPreconditions(ctx context.Context, w http.ResponseWriter, r *http.Requ
if givenTime, err := amztime.ParseHeader(ifModifiedSinceHeader); err == nil {
if !ifModifiedSince(objInfo.ModTime, givenTime) {
// If the object is not modified since the specified time.
- writeHeaders()
+ writeHeadersPrecondition(w, objInfo)
w.WriteHeader(http.StatusNotModified)
return true
}
}
}
- // If-Unmodified-Since : Return the object only if it has not been modified since the specified
- // time, otherwise return a 412 (precondition failed).
- ifUnmodifiedSinceHeader := r.Header.Get(xhttp.IfUnmodifiedSince)
- if ifUnmodifiedSinceHeader != "" {
- if givenTime, err := amztime.ParseHeader(ifUnmodifiedSinceHeader); err == nil {
- if ifModifiedSince(objInfo.ModTime, givenTime) {
- // If the object is modified since the specified time.
- writeHeaders()
- writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
- return true
- }
- }
- }
-
// If-Match : Return the object only if its entity tag (ETag) is the same as the one specified;
// otherwise return a 412 (precondition failed).
ifMatchETagHeader := r.Header.Get(xhttp.IfMatch)
if ifMatchETagHeader != "" {
if !isETagEqual(objInfo.ETag, ifMatchETagHeader) {
// If the object ETag does not match with the specified ETag.
- writeHeaders()
+ writeHeadersPrecondition(w, objInfo)
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
return true
}
}
- // If-None-Match : Return the object only if its entity tag (ETag) is different from the
- // one specified otherwise, return a 304 (not modified).
- ifNoneMatchETagHeader := r.Header.Get(xhttp.IfNoneMatch)
- if ifNoneMatchETagHeader != "" {
- if isETagEqual(objInfo.ETag, ifNoneMatchETagHeader) {
- // If the object ETag matches with the specified ETag.
- writeHeaders()
- w.WriteHeader(http.StatusNotModified)
- return true
+ // If-Unmodified-Since : Return the object only if it has not been modified since the specified
+ // time, otherwise return a 412 (precondition failed).
+ ifUnmodifiedSinceHeader := r.Header.Get(xhttp.IfUnmodifiedSince)
+ if ifUnmodifiedSinceHeader != "" {
+ if givenTime, err := amztime.ParseHeader(ifUnmodifiedSinceHeader); err == nil {
+ if ifModifiedSince(objInfo.ModTime, givenTime) {
+ // If the object is modified since the specified time.
+ writeHeadersPrecondition(w, objInfo)
+ writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
+ return true
+ }
}
}
+
// Object content should be written to http.ResponseWriter
return false
}
diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go
index 181382f7c..1ec41a4f7 100644
--- a/cmd/object-handlers.go
+++ b/cmd/object-handlers.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
+// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@@ -19,6 +19,7 @@ package cmd
import (
"archive/tar"
+ "bytes"
"context"
"encoding/hex"
"encoding/xml"
@@ -35,6 +36,7 @@ import (
"strings"
"sync"
"time"
+ "unicode"
"github.com/google/uuid"
"github.com/klauspost/compress/gzhttp"
@@ -48,6 +50,7 @@ import (
"github.com/minio/minio/internal/bucket/lifecycle"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
+ "github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto"
@@ -62,6 +65,7 @@ import (
"github.com/minio/minio/internal/s3select"
"github.com/minio/mux"
"github.com/minio/pkg/v2/policy"
+ "github.com/valyala/bytebufferpool"
)
// supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request.
@@ -379,6 +383,92 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
}
}
+ cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""
+
+ var update bool
+ if cachedResult {
+ rc := &cache.CondCheck{}
+ h := r.Header.Clone()
+ if opts.PartNumber > 0 {
+ h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
+ }
+ rc.Init(bucket, object, h)
+
+ ci, err := globalCacheConfig.Get(rc)
+ if ci != nil {
+ tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
+ if ok {
+ // Set this such that authorization policies can be applied on the object tags.
+ r.Header.Set(xhttp.AmzObjectTagging, tgs)
+ }
+
+ if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
+ writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
+ return
+ }
+
+ okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
+ ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
+ if okSt {
+ ci.WriteHeaders(w, func() {
+ // set common headers
+ setCommonHeaders(w)
+ }, func() {
+ okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
+ if okSt && len(ci.Data) > 0 {
+ for k, v := range ci.Metadata {
+ w.Header().Set(k, v)
+ }
+
+ if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
+ w.Header()[xhttp.AmzMpPartsCount] = []string{
+ strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
+ return !unicode.IsNumber(r)
+ }),
+ }
+ }
+
+ // For providing ranged content
+ start, rangeLen, err := rs.GetOffsetLength(ci.Size)
+ if err != nil {
+ start, rangeLen = 0, ci.Size
+ }
+
+ // Set content length.
+ w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
+ if rs != nil {
+ contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
+ w.Header().Set(xhttp.ContentRange, contentRange)
+ }
+
+ io.Copy(w, bytes.NewReader(ci.Data))
+ return
+ }
+ if ci.StatusCode == http.StatusPreconditionFailed {
+ writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
+ return
+ } else if ci.StatusCode == http.StatusNotModified {
+ w.WriteHeader(ci.StatusCode)
+ return
+ }
+
+ // We did not satisfy any requirement from the cache, update the cache.
+ // this basically means that we do not have the Data for the object
+ // cached yet
+ update = true
+ })
+ if !update {
+ // No update is needed means we have written already to the client just return here.
+ return
+ }
+ }
+ }
+
+ if errors.Is(err, cache.ErrKeyMissing) {
+ update = true
+ }
+ }
+
// Validate pre-conditions if any.
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
@@ -389,6 +479,8 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
return checkPreconditions(ctx, w, r, oi, opts)
}
+ opts.FastGetObjInfo = true
+
var proxy proxyResult
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
if err != nil {
@@ -487,7 +579,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
// Set encryption response headers
-
if kind, isEncrypted := crypto.IsEncrypted(objInfo.UserDefined); isEncrypted {
switch kind {
case crypto.S3:
@@ -510,6 +601,39 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber))
}
+ var buf *bytebufferpool.ByteBuffer
+ if update {
+ if globalCacheConfig.MatchesSize(objInfo.Size) {
+ buf = bytebufferpool.Get()
+ defer bytebufferpool.Put(buf)
+ }
+ defer func() {
+ var data []byte
+ if buf != nil {
+ data = buf.Bytes()
+ }
+
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: objInfo.ETag,
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.ExpiresStr(),
+ CacheControl: objInfo.CacheControl,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ Range: rangeHeader,
+ PartNumber: opts.PartNumber,
+ Size: asize,
+ Data: data,
+ })
+ }()
+ }
+
if err = setObjectHeaders(w, objInfo, rs, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -522,8 +646,14 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
setHeadGetRespHeaders(w, r.Form)
+ var iw io.Writer
+ iw = w
+ if buf != nil {
+ iw = io.MultiWriter(w, buf)
+ }
+
statusCodeWritten := false
- httpWriter := xioutil.WriteOnClose(w)
+ httpWriter := xioutil.WriteOnClose(iw)
if rs != nil || opts.PartNumber > 0 {
statusCodeWritten = true
w.WriteHeader(http.StatusPartialContent)
@@ -644,6 +774,104 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
// Get request range.
var rs *HTTPRangeSpec
rangeHeader := r.Header.Get(xhttp.Range)
+ if rangeHeader != "" {
+ rs, _ = parseRequestRangeSpec(rangeHeader)
+ }
+
+ if rangeHeader != "" {
+ // Both 'Range' and 'partNumber' cannot be specified at the same time
+ if opts.PartNumber > 0 {
+ writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
+ return
+ }
+
+ if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
+ // Handle only errInvalidRange. Ignore other
+ // parse error and treat it as regular Get
+ // request like Amazon S3.
+ if errors.Is(err, errInvalidRange) {
+ writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange))
+ return
+ }
+ }
+ }
+
+ cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""
+
+ var update bool
+ if cachedResult {
+ rc := &cache.CondCheck{}
+ h := r.Header.Clone()
+ if opts.PartNumber > 0 {
+ h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
+ }
+ rc.Init(bucket, object, h)
+
+ ci, err := globalCacheConfig.Get(rc)
+ if ci != nil {
+ tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
+ if ok {
+ // Set this such that authorization policies can be applied on the object tags.
+ r.Header.Set(xhttp.AmzObjectTagging, tgs)
+ }
+
+ if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
+ writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
+ return
+ }
+
+ okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
+ ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
+ if okSt {
+ ci.WriteHeaders(w, func() {
+ // set common headers
+ setCommonHeaders(w)
+ }, func() {
+ okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
+ if okSt {
+ for k, v := range ci.Metadata {
+ w.Header().Set(k, v)
+ }
+
+ // For providing ranged content
+ start, rangeLen, err := rs.GetOffsetLength(ci.Size)
+ if err != nil {
+ start, rangeLen = 0, ci.Size
+ }
+
+ if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
+ w.Header()[xhttp.AmzMpPartsCount] = []string{
+ strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
+ return !unicode.IsNumber(r)
+ }),
+ }
+ }
+
+ // Set content length for the range.
+ w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
+ if rs != nil {
+ contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
+ w.Header().Set(xhttp.ContentRange, contentRange)
+ }
+
+ return
+ }
+ if ci.StatusCode == http.StatusPreconditionFailed {
+ writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
+ return
+ }
+
+ w.WriteHeader(ci.StatusCode)
+ })
+ return
+ }
+ }
+ if errors.Is(err, cache.ErrKeyMissing) {
+ update = true
+ }
+ }
+
+ opts.FastGetObjInfo = true
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
var proxy proxyResult
@@ -651,9 +879,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
// proxy HEAD to replication target if active-active replication configured on bucket
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
- if rangeHeader != "" {
- rs, _ = parseRequestRangeSpec(rangeHeader)
- }
var oi ObjectInfo
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
if proxy.Proxy {
@@ -737,29 +962,29 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
return
}
+ if update {
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ defer globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: objInfo.ETag,
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.ExpiresStr(),
+ CacheControl: objInfo.CacheControl,
+ Size: asize,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ })
+ }
+
// Validate pre-conditions if any.
if checkPreconditions(ctx, w, r, objInfo, opts) {
return
}
- if rangeHeader != "" {
- // Both 'Range' and 'partNumber' cannot be specified at the same time
- if opts.PartNumber > 0 {
- writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
- return
- }
-
- if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
- // Handle only errInvalidRange. Ignore other
- // parse error and treat it as regular Get
- // request like Amazon S3.
- if errors.Is(err, errInvalidRange) {
- writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange))
- return
- }
- }
- }
-
// Set encryption response headers
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
case crypto.S3:
@@ -1531,6 +1756,22 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
Host: handlers.GetSourceIP(r),
})
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ defer globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: objInfo.ETag,
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.ExpiresStr(),
+ CacheControl: objInfo.CacheControl,
+ Size: asize,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ })
+
if !remoteCallRequired && !globalTierConfigMgr.Empty() {
// Schedule object for immediate transition if eligible.
objInfo.ETag = origETag
@@ -1633,7 +1874,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
var (
md5hex = clientETag.String()
sha256hex = ""
- reader io.Reader = r.Body
+ rd io.Reader = r.Body
s3Err APIErrorCode
putObject = objectAPI.PutObject
)
@@ -1647,14 +1888,14 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
switch rAuthType {
case authTypeStreamingSigned, authTypeStreamingSignedTrailer:
// Initialize stream signature verifier.
- reader, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
+ rd, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
case authTypeStreamingUnsignedTrailer:
// Initialize stream chunked reader with optional trailers.
- reader, s3Err = newUnsignedV4ChunkedReader(r, true)
+ rd, s3Err = newUnsignedV4ChunkedReader(r, true)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
@@ -1702,6 +1943,18 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
AutoEncrypt: globalAutoEncryption,
})
+ var buf *bytebufferpool.ByteBuffer
+ if globalCacheConfig.MatchesSize(size) {
+ buf = bytebufferpool.Get()
+ defer bytebufferpool.Put(buf)
+ }
+
+ var reader io.Reader
+ reader = rd
+ if buf != nil {
+ reader = io.TeeReader(rd, buf)
+ }
+
actualSize := size
var idxCb func() []byte
if isCompressible(r.Header, object) && size > minCompressibleSize {
@@ -1904,6 +2157,30 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
setPutObjHeaders(w, objInfo, false)
+ defer func() {
+ var data []byte
+ if buf != nil {
+ data = buf.Bytes()
+ }
+
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: objInfo.ETag,
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.ExpiresStr(),
+ CacheControl: objInfo.CacheControl,
+ Size: asize,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ Data: data,
+ })
+ }()
+
// Notify object created event.
evt := eventArgs{
EventName: event.ObjectCreatedPut,
@@ -2234,10 +2511,28 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
return err
}
+ objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
+
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ defer globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: objInfo.ETag,
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.ExpiresStr(),
+ CacheControl: objInfo.CacheControl,
+ Size: asize,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ })
+
// Notify object created event.
evt := eventArgs{
EventName: event.ObjectCreatedPut,
@@ -2413,6 +2708,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
return
}
+ defer globalCacheConfig.Delete(bucket, object)
+
setPutObjHeaders(w, objInfo, true)
writeSuccessNoContent(w)
diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go
index 3dd05dcb4..9e52c828f 100644
--- a/cmd/object-multipart-handlers.go
+++ b/cmd/object-multipart-handlers.go
@@ -35,6 +35,7 @@ import (
sse "github.com/minio/minio/internal/bucket/encryption"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
+ "github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto"
@@ -1011,6 +1012,22 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
sendEvent(evt)
+ asize, err := objInfo.GetActualSize()
+ if err != nil {
+ asize = objInfo.Size
+ }
+
+ defer globalCacheConfig.Set(&cache.ObjectInfo{
+ Key: objInfo.Name,
+ Bucket: objInfo.Bucket,
+ ETag: objInfo.ETag,
+ ModTime: objInfo.ModTime,
+ Expires: objInfo.ExpiresStr(),
+ CacheControl: objInfo.CacheControl,
+ Size: asize,
+ Metadata: cleanReservedKeys(objInfo.UserDefined),
+ })
+
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
evt.EventName = event.ObjectManyVersions
sendEvent(evt)
diff --git a/cmd/server-main.go b/cmd/server-main.go
index f9735e61c..8bc9cf586 100644
--- a/cmd/server-main.go
+++ b/cmd/server-main.go
@@ -840,7 +840,9 @@ func serverMain(ctx *cli.Context) {
// Initialize data scanner.
bootstrapTrace("initDataScanner", func() {
- initDataScanner(GlobalContext, newObject)
+ if v := env.Get("_MINIO_SCANNER", config.EnableOn); v == config.EnableOn {
+ initDataScanner(GlobalContext, newObject)
+ }
})
// Initialize background replication
diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go
index d9cf9ff6e..e235d9db4 100644
--- a/cmd/storage-datatypes.go
+++ b/cmd/storage-datatypes.go
@@ -239,6 +239,16 @@ type FileInfo struct {
Versioned bool `msg:"vs"`
}
+// ShallowCopy - copies minimal information for READ MRF checks.
+func (fi FileInfo) ShallowCopy() (n FileInfo) {
+ n.Volume = fi.Volume
+ n.Name = fi.Name
+ n.VersionID = fi.VersionID
+ n.Deleted = fi.Deleted
+ n.Erasure = fi.Erasure
+ return
+}
+
// WriteQuorum returns expected write quorum for this FileInfo
func (fi FileInfo) WriteQuorum(dquorum int) int {
if fi.Deleted {
diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go
index aa008b511..4f66ae5ae 100644
--- a/cmd/storage-errors.go
+++ b/cmd/storage-errors.go
@@ -51,6 +51,8 @@ var errDiskNotDir = StorageErr("drive is not directory or mountpoint")
// errDiskNotFound - cannot find the underlying configured disk anymore.
var errDiskNotFound = StorageErr("drive not found")
+var errDiskOngoingReq = StorageErr("drive still did not complete the request")
+
// errFaultyRemoteDisk - remote disk is faulty.
var errFaultyRemoteDisk = StorageErr("remote drive is faulty")
diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go
index 66b8be8ee..25047478a 100644
--- a/cmd/storage-rest-common.go
+++ b/cmd/storage-rest-common.go
@@ -47,6 +47,7 @@ const (
storageRESTMethodStatInfoFile = "/statfile"
storageRESTMethodReadMultiple = "/readmultiple"
storageRESTMethodCleanAbandoned = "/cleanabandoned"
+ storageRESTMethodLinkXL = "/linkxl"
)
const (
diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go
index 5ade78e6e..c78deb86f 100644
--- a/cmd/xl-storage.go
+++ b/cmd/xl-storage.go
@@ -1100,7 +1100,7 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, force bool) error {
targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID)
if recursive {
- if err := renameAll(filePath, targetPath, s.drivePath); err != nil {
+ if err := renameAll(filePath, targetPath, pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket)); err != nil {
return err
}
} else {
@@ -1127,8 +1127,19 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
})
}
+ volumeDir, err := s.getVolDir(volume)
+ if err != nil {
+ return err
+ }
+
+ // Validate file path length, before reading.
+ filePath := pathJoin(volumeDir, path)
+ if err = checkPathLength(filePath); err != nil {
+ return err
+ }
+
var legacyJSON bool
- buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
+ buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile), false)
if err != nil {
if !errors.Is(err, errFileNotFound) {
return err
@@ -1161,11 +1172,6 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return errFileNotFound
}
- volumeDir, err := s.getVolDir(volume)
- if err != nil {
- return err
- }
-
if legacyJSON {
// Delete the meta file, if there are no more versions the
// top level parent is automatically removed.
@@ -1217,12 +1223,6 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
}
- // No more versions, this is the last version purge everything.
- filePath := pathJoin(volumeDir, path)
- if err = checkPathLength(filePath); err != nil {
- return err
- }
-
return s.deleteFile(volumeDir, filePath, true, false)
}
@@ -1371,10 +1371,15 @@ func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) {
}
func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath string, readData bool) (buf []byte, dmTime time.Time, err error) {
+ if filePath == "" {
+ return nil, dmTime, errFileNotFound
+ }
+
+ xlPath := pathJoin(filePath, xlStorageFormatFile)
if readData {
- buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile), true)
+ buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, xlPath, true)
} else {
- buf, dmTime, err = s.readMetadataWithDMTime(ctx, pathJoin(filePath, xlStorageFormatFile))
+ buf, dmTime, err = s.readMetadataWithDMTime(ctx, xlPath)
if err != nil {
if osIsNotExist(err) {
if !skipAccessChecks(volume) {
@@ -1387,18 +1392,21 @@ func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath str
}
}
- if err != nil {
- if err == errFileNotFound {
- buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true)
- if err != nil {
- return nil, time.Time{}, err
- }
- } else {
+ s.RLock()
+ legacy := s.formatLegacy
+ s.RUnlock()
+
+ if err != nil && errors.Is(err, errFileNotFound) && legacy {
+ buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true)
+ if err != nil {
return nil, time.Time{}, err
}
}
if len(buf) == 0 {
+ if err != nil {
+ return nil, time.Time{}, err
+ }
return nil, time.Time{}, errFileNotFound
}
@@ -1440,6 +1448,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
if err != nil {
return fi, err
}
+
// Validate file path length, before reading.
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
@@ -1532,6 +1541,10 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
}
func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, filePath string, sync bool) (buf []byte, dmTime time.Time, err error) {
+ if filePath == "" {
+ return nil, dmTime, errFileNotFound
+ }
+
if contextCanceled(ctx) {
return nil, time.Time{}, ctx.Err()
}
@@ -1593,7 +1606,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f
// Get size for precise allocation.
stat, err := f.Stat()
if err != nil {
- buf, err = io.ReadAll(diskHealthReader(ctx, r))
+ buf, err = io.ReadAll(r)
return buf, dmTime, osErrToFileErr(err)
}
if stat.IsDir() {
@@ -1612,7 +1625,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f
dr.SmallFile = sz <= xioutil.BlockSizeSmall*2
}
// Read file...
- _, err = io.ReadFull(diskHealthReader(ctx, r), buf)
+ _, err = io.ReadFull(r, buf)
return buf, stat.ModTime().UTC(), osErrToFileErr(err)
}
@@ -1641,7 +1654,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu
return nil, err
}
- buf, _, err = s.readAllData(ctx, volume, volumeDir, filePath, true)
+ buf, _, err = s.readAllData(ctx, volume, volumeDir, filePath, false)
return buf, err
}
@@ -2225,10 +2238,10 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, dele
func skipAccessChecks(volume string) (ok bool) {
for _, prefix := range []string{
- minioMetaBucket,
- minioMetaMultipartBucket,
minioMetaTmpDeletedBucket,
minioMetaTmpBucket,
+ minioMetaMultipartBucket,
+ minioMetaBucket,
} {
if strings.HasPrefix(volume, prefix) {
return true
@@ -2408,7 +2421,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
}
// legacy data dir means its old content, honor system umask.
- if err = mkdirAll(legacyDataPath, 0o777, s.drivePath); err != nil {
+ if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil {
// any failed mkdir-calls delete them.
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
return 0, osErrToFileErr(err)
@@ -2526,7 +2539,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// on a versioned bucket.
s.moveToTrash(legacyDataPath, true, false)
}
- if err = renameAll(srcDataPath, dstDataPath, s.drivePath); err != nil {
+ if err = renameAll(srcDataPath, dstDataPath, dstVolumeDir); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
@@ -2537,7 +2550,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
}
// Commit meta-file
- if err = renameAll(srcFilePath, dstFilePath, s.drivePath); err != nil {
+ if err = renameAll(srcFilePath, dstFilePath, dstVolumeDir); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
@@ -2647,7 +2660,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum
}
}
- if err = renameAll(srcFilePath, dstFilePath, s.drivePath); err != nil {
+ if err = renameAll(srcFilePath, dstFilePath, dstVolumeDir); err != nil {
if isSysErrNotEmpty(err) || isSysErrNotDir(err) {
return errFileAccessDenied
}
diff --git a/docs/bucket/replication/setup_2site_existing_replication.sh b/docs/bucket/replication/setup_2site_existing_replication.sh
index b07e976a9..fe977e7c5 100755
--- a/docs/bucket/replication/setup_2site_existing_replication.sh
+++ b/docs/bucket/replication/setup_2site_existing_replication.sh
@@ -84,7 +84,7 @@ remote_arn=$(./mc replicate ls sitea/bucket --json | jq -r .rule.Destination.Buc
sleep 1
./mc replicate resync start sitea/bucket/ --remote-bucket "${remote_arn}"
-sleep 10s ## sleep for 10s idea is that we give 100ms per object.
+sleep 30s ## sleep for 30s idea is that we give 300ms per object.
./mc ls -r --versions sitea/bucket >/tmp/sitea.txt
./mc ls -r --versions siteb/bucket >/tmp/siteb.txt
diff --git a/internal/config/cache/cache.go b/internal/config/cache/cache.go
new file mode 100644
index 000000000..602ed4218
--- /dev/null
+++ b/internal/config/cache/cache.go
@@ -0,0 +1,238 @@
+// Copyright (c) 2015-2022 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cache
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/dustin/go-humanize"
+ "github.com/minio/minio/internal/config"
+ xhttp "github.com/minio/minio/internal/http"
+ "github.com/minio/pkg/v2/env"
+ "github.com/tinylib/msgp/msgp"
+)
+
+// Cache related keys
+const (
+ Enable = "enable"
+ Endpoint = "endpoint"
+ ObjectSize = "object_size"
+
+ EnvEnable = "MINIO_CACHE_ENABLE"
+ EnvEndpoint = "MINIO_CACHE_ENDPOINT"
+ EnvObjectSize = "MINIO_CACHE_OBJECT_SIZE"
+)
+
+// DefaultKVS - default KV config for cache settings
+var DefaultKVS = config.KVS{
+ config.KV{
+ Key: Enable,
+ Value: "off",
+ },
+ config.KV{
+ Key: Endpoint,
+ Value: "",
+ },
+ config.KV{
+ Key: ObjectSize,
+ Value: "",
+ },
+}
+
+// Config represents the subnet related configuration
+type Config struct {
+ // Flag indicating whether cache is enabled.
+ Enable bool `json:"enable"`
+
+ // Endpoint for caching uses remote mcache server to
+ // store and retrieve pre-condition check entities such as
+ // Etag and ModTime of an object + version
+ Endpoint string `json:"endpoint"`
+
+ // ObjectSize indicates the maximum object size below which
+ // data is cached and fetched remotely from DRAM.
+ ObjectSize int64
+
+ // Is the HTTP client used for communicating with mcache server
+ clnt *http.Client
+}
+
+var configLock sync.RWMutex
+
+// Enabled - indicates if cache is enabled or not
+func (c *Config) Enabled() bool {
+ return c.Enable && c.Endpoint != ""
+}
+
+// MatchesSize verifies if input 'size' falls under cacheable threshold
+func (c Config) MatchesSize(size int64) bool {
+ configLock.RLock()
+ defer configLock.RUnlock()
+
+ return c.Enable && c.ObjectSize > 0 && size <= c.ObjectSize
+}
+
+// Update updates new cache frequency
+func (c *Config) Update(ncfg Config) {
+ configLock.Lock()
+ defer configLock.Unlock()
+
+ c.Enable = ncfg.Enable
+ c.Endpoint = ncfg.Endpoint
+ c.ObjectSize = ncfg.ObjectSize
+ c.clnt = ncfg.clnt
+}
+
+// cache related errors
+var (
+ ErrInvalidArgument = errors.New("invalid argument")
+ ErrKeyMissing = errors.New("key is missing")
+)
+
+const (
+ mcacheV1Check = "/_mcache/v1/check"
+ mcacheV1Update = "/_mcache/v1/update"
+ mcacheV1Delete = "/_mcache/v1/delete"
+)
+
+// Get performs conditional check and returns the cached object info if any.
+func (c Config) Get(r *CondCheck) (*ObjectInfo, error) {
+ configLock.RLock()
+ defer configLock.RUnlock()
+
+ if !c.Enable {
+ return nil, nil
+ }
+
+ if c.Endpoint == "" {
+ // Endpoint not set, make this a no-op
+ return nil, nil
+ }
+
+ buf, err := r.MarshalMsg(nil)
+ if err != nil {
+ return nil, err
+ }
+
+ // We do not want Get's to take so much time, anything
+ // beyond 250ms we should cut it, remote cache is too
+ // busy already.
+ ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
+ defer cancel()
+
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.Endpoint+mcacheV1Check, bytes.NewReader(buf))
+ if err != nil {
+ return nil, err
+ }
+
+ resp, err := c.clnt.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer xhttp.DrainBody(resp.Body)
+
+ switch resp.StatusCode {
+ case http.StatusNotFound:
+ return nil, ErrKeyMissing
+ case http.StatusOK:
+ co := &ObjectInfo{}
+ return co, co.DecodeMsg(msgp.NewReader(resp.Body))
+ default:
+ return nil, ErrInvalidArgument
+ }
+}
+
+// Set sets the cache object info
+func (c Config) Set(ci *ObjectInfo) {
+ configLock.RLock()
+ defer configLock.RUnlock()
+
+ if !c.Enable {
+ return
+ }
+
+ if c.Endpoint == "" {
+ // Endpoint not set, make this a no-op
+ return
+ }
+
+ buf, err := ci.MarshalMsg(nil)
+ if err != nil {
+ return
+ }
+
+ req, err := http.NewRequestWithContext(context.Background(), http.MethodPut, c.Endpoint+mcacheV1Update, bytes.NewReader(buf))
+ if err != nil {
+ return
+ }
+
+ resp, err := c.clnt.Do(req)
+ if err != nil {
+ return
+ }
+ defer xhttp.DrainBody(resp.Body)
+}
+
+// Delete deletes remote cached content for object and its version.
+func (c Config) Delete(bucket, key string) {
+ configLock.RLock()
+ defer configLock.RUnlock()
+
+ if !c.Enable {
+ return
+ }
+
+ if c.Endpoint == "" {
+ return
+ }
+
+ req, err := http.NewRequestWithContext(context.Background(), http.MethodDelete, c.Endpoint+fmt.Sprintf("%s?bucket=%s&key=%s", mcacheV1Delete, bucket, key), nil)
+ if err != nil {
+ return
+ }
+
+ resp, err := c.clnt.Do(req)
+ if err != nil {
+ return
+ }
+ defer xhttp.DrainBody(resp.Body)
+}
+
+// LookupConfig - lookup config and override with valid environment settings if any.
+func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err error) {
+ cfg.Enable = env.Get(EnvEnable, kvs.GetWithDefault(Enable, DefaultKVS)) == config.EnableOn
+
+ if d := env.Get(EnvObjectSize, kvs.GetWithDefault(ObjectSize, DefaultKVS)); d != "" {
+ objectSize, err := humanize.ParseBytes(d)
+ if err != nil {
+ return cfg, err
+ }
+ cfg.ObjectSize = int64(objectSize)
+ }
+
+ cfg.Endpoint = env.Get(EnvEndpoint, kvs.GetWithDefault(Endpoint, DefaultKVS))
+ cfg.clnt = &http.Client{Transport: transport}
+
+ return cfg, nil
+}
diff --git a/internal/config/cache/help.go b/internal/config/cache/help.go
new file mode 100644
index 000000000..31ce940a6
--- /dev/null
+++ b/internal/config/cache/help.go
@@ -0,0 +1,48 @@
+// Copyright (c) 2015-2023 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cache
+
+import "github.com/minio/minio/internal/config"
+
+var (
+ defaultHelpPostfix = func(key string) string {
+ return config.DefaultHelpPostfix(DefaultKVS, key)
+ }
+
+ // Help - provides help for cache config
+ Help = config.HelpKVS{
+ config.HelpKV{
+ Key: Enable,
+ Type: "on|off",
+ Description: "set to enable various caching optimizations" + defaultHelpPostfix(Enable),
+ Optional: true,
+ },
+ config.HelpKV{
+ Key: Endpoint,
+ Type: "string",
+ Description: "remote endpoint where MinIO will cache GET/HEAD metadata values such as ETag, ModTime" + defaultHelpPostfix(Endpoint),
+ Optional: true,
+ },
+ config.HelpKV{
+ Key: ObjectSize,
+ Type: "string",
+ Description: "maximum object size below which data is cached and fetched remotely from DRAM if possible" + defaultHelpPostfix(ObjectSize),
+ Optional: true,
+ },
+ }
+)
diff --git a/internal/config/cache/remote.go b/internal/config/cache/remote.go
new file mode 100644
index 000000000..8db4e2f55
--- /dev/null
+++ b/internal/config/cache/remote.go
@@ -0,0 +1,111 @@
+package cache
+
+import (
+ "net/http"
+ "regexp"
+ "strconv"
+ "time"
+
+ "github.com/minio/minio/internal/amztime"
+ xhttp "github.com/minio/minio/internal/http"
+)
+
+//go:generate msgp -file=$GOFILE
+
+// ObjectInfo represents the object information cached remotely
+type ObjectInfo struct {
+ Key string `json:"key"`
+ Bucket string `json:"bucket"`
+ ETag string `json:"etag"`
+ ModTime time.Time `json:"modTime"`
+ StatusCode int `json:"statusCode"`
+
+ // Optional elements
+ CacheControl string `json:"cacheControl,omitempty" msg:",omitempty"`
+ Expires string `json:"expires,omitempty" msg:",omitempty"`
+ Metadata map[string]string `json:"metadata,omitempty" msg:",omitempty"`
+ Range string `json:"range,omitempty" msg:",omitempty"`
+ PartNumber int `json:"partNumber,omitempty" msg:",omitempty"`
+ Size int64 `json:"size,omitempty" msg:",omitempty"` // Full size of the object
+ Data []byte `json:"data,omitempty" msg:",omitempty"` // Data can container full data of the object or partial
+}
+
+// WriteHeaders writes the response headers for conditional requests
+func (oi ObjectInfo) WriteHeaders(w http.ResponseWriter, preamble, statusCode func()) {
+ preamble()
+
+ if !oi.ModTime.IsZero() {
+ w.Header().Set(xhttp.LastModified, oi.ModTime.UTC().Format(http.TimeFormat))
+ }
+
+ if oi.ETag != "" {
+ w.Header()[xhttp.ETag] = []string{"\"" + oi.ETag + "\""}
+ }
+
+ if oi.Expires != "" {
+ w.Header().Set(xhttp.Expires, oi.Expires)
+ }
+
+ if oi.CacheControl != "" {
+ w.Header().Set(xhttp.CacheControl, oi.CacheControl)
+ }
+
+ statusCode()
+}
+
+// CondCheck represents the conditional request made to the remote cache
+// for validation during GET/HEAD object requests.
+type CondCheck struct {
+ ObjectInfo
+ IfMatch string `json:"ifMatch,omitempty" msg:",omitempty"`
+ IfNoneMatch string `json:"ifNoneMatch,omitempty" msg:",omitempty"`
+ IfModifiedSince *time.Time `json:"ifModSince,omitempty" msg:",omitempty"`
+ IfUnModifiedSince *time.Time `json:"ifUnmodSince,omitempty" msg:",omitempty"`
+ IfRange string `json:"ifRange,omitempty" msg:",omitempty"`
+ IfPartNumber int `json:"ifPartNumber,omitempty" msg:",omitempty"`
+}
+
+// IsSet tells the cache lookup to avoid sending a request
+func (r *CondCheck) IsSet() bool {
+ if r == nil {
+ return false
+ }
+ return r.IfMatch != "" || r.IfNoneMatch != "" || r.IfModifiedSince != nil || r.IfUnModifiedSince != nil || r.IfRange != ""
+}
+
+var etagRegex = regexp.MustCompile("\"*?([^\"]*?)\"*?$")
+
+// canonicalizeETag returns ETag with leading and trailing double-quotes removed,
+// if any present
+func canonicalizeETag(etag string) string {
+ return etagRegex.ReplaceAllString(etag, "$1")
+}
+
+// Init - populates the input values, initializes CondCheck
+// before sending the request remotely.
+func (r *CondCheck) Init(bucket, object string, header http.Header) {
+ r.Key = object
+ r.Bucket = bucket
+
+ ifModifiedSinceHeader := header.Get(xhttp.IfModifiedSince)
+ if ifModifiedSinceHeader != "" {
+ if givenTime, err := amztime.ParseHeader(ifModifiedSinceHeader); err == nil {
+ r.IfModifiedSince = &givenTime
+ }
+ }
+ ifUnmodifiedSinceHeader := header.Get(xhttp.IfUnmodifiedSince)
+ if ifUnmodifiedSinceHeader != "" {
+ if givenTime, err := amztime.ParseHeader(ifUnmodifiedSinceHeader); err == nil {
+ r.IfUnModifiedSince = &givenTime
+ }
+ }
+ r.IfMatch = canonicalizeETag(header.Get(xhttp.IfMatch))
+ r.IfNoneMatch = canonicalizeETag(header.Get(xhttp.IfNoneMatch))
+ r.IfRange = header.Get(xhttp.Range)
+ ifPartNumberHeader := header.Get(xhttp.PartNumber)
+ if ifPartNumberHeader != "" {
+ if partNumber, err := strconv.Atoi(ifPartNumberHeader); err == nil {
+ r.IfPartNumber = partNumber
+ }
+ }
+}
diff --git a/internal/config/cache/remote_gen.go b/internal/config/cache/remote_gen.go
new file mode 100644
index 000000000..b6b7e7e62
--- /dev/null
+++ b/internal/config/cache/remote_gen.go
@@ -0,0 +1,795 @@
+package cache
+
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
+
+import (
+ "time"
+
+ "github.com/tinylib/msgp/msgp"
+)
+
+// DecodeMsg implements msgp.Decodable
+func (z *CondCheck) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "ObjectInfo":
+ err = z.ObjectInfo.DecodeMsg(dc)
+ if err != nil {
+ err = msgp.WrapError(err, "ObjectInfo")
+ return
+ }
+ case "IfMatch":
+ z.IfMatch, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "IfMatch")
+ return
+ }
+ case "IfNoneMatch":
+ z.IfNoneMatch, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "IfNoneMatch")
+ return
+ }
+ case "IfModifiedSince":
+ if dc.IsNil() {
+ err = dc.ReadNil()
+ if err != nil {
+ err = msgp.WrapError(err, "IfModifiedSince")
+ return
+ }
+ z.IfModifiedSince = nil
+ } else {
+ if z.IfModifiedSince == nil {
+ z.IfModifiedSince = new(time.Time)
+ }
+ *z.IfModifiedSince, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "IfModifiedSince")
+ return
+ }
+ }
+ case "IfUnModifiedSince":
+ if dc.IsNil() {
+ err = dc.ReadNil()
+ if err != nil {
+ err = msgp.WrapError(err, "IfUnModifiedSince")
+ return
+ }
+ z.IfUnModifiedSince = nil
+ } else {
+ if z.IfUnModifiedSince == nil {
+ z.IfUnModifiedSince = new(time.Time)
+ }
+ *z.IfUnModifiedSince, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "IfUnModifiedSince")
+ return
+ }
+ }
+ case "IfRange":
+ z.IfRange, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "IfRange")
+ return
+ }
+ case "IfPartNumber":
+ z.IfPartNumber, err = dc.ReadInt()
+ if err != nil {
+ err = msgp.WrapError(err, "IfPartNumber")
+ return
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z *CondCheck) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 7
+ // write "ObjectInfo"
+ err = en.Append(0x87, 0xaa, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x49, 0x6e, 0x66, 0x6f)
+ if err != nil {
+ return
+ }
+ err = z.ObjectInfo.EncodeMsg(en)
+ if err != nil {
+ err = msgp.WrapError(err, "ObjectInfo")
+ return
+ }
+ // write "IfMatch"
+ err = en.Append(0xa7, 0x49, 0x66, 0x4d, 0x61, 0x74, 0x63, 0x68)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.IfMatch)
+ if err != nil {
+ err = msgp.WrapError(err, "IfMatch")
+ return
+ }
+ // write "IfNoneMatch"
+ err = en.Append(0xab, 0x49, 0x66, 0x4e, 0x6f, 0x6e, 0x65, 0x4d, 0x61, 0x74, 0x63, 0x68)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.IfNoneMatch)
+ if err != nil {
+ err = msgp.WrapError(err, "IfNoneMatch")
+ return
+ }
+ // write "IfModifiedSince"
+ err = en.Append(0xaf, 0x49, 0x66, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65)
+ if err != nil {
+ return
+ }
+ if z.IfModifiedSince == nil {
+ err = en.WriteNil()
+ if err != nil {
+ return
+ }
+ } else {
+ err = en.WriteTime(*z.IfModifiedSince)
+ if err != nil {
+ err = msgp.WrapError(err, "IfModifiedSince")
+ return
+ }
+ }
+ // write "IfUnModifiedSince"
+ err = en.Append(0xb1, 0x49, 0x66, 0x55, 0x6e, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65)
+ if err != nil {
+ return
+ }
+ if z.IfUnModifiedSince == nil {
+ err = en.WriteNil()
+ if err != nil {
+ return
+ }
+ } else {
+ err = en.WriteTime(*z.IfUnModifiedSince)
+ if err != nil {
+ err = msgp.WrapError(err, "IfUnModifiedSince")
+ return
+ }
+ }
+ // write "IfRange"
+ err = en.Append(0xa7, 0x49, 0x66, 0x52, 0x61, 0x6e, 0x67, 0x65)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.IfRange)
+ if err != nil {
+ err = msgp.WrapError(err, "IfRange")
+ return
+ }
+ // write "IfPartNumber"
+ err = en.Append(0xac, 0x49, 0x66, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt(z.IfPartNumber)
+ if err != nil {
+ err = msgp.WrapError(err, "IfPartNumber")
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z *CondCheck) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 7
+ // string "ObjectInfo"
+ o = append(o, 0x87, 0xaa, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x49, 0x6e, 0x66, 0x6f)
+ o, err = z.ObjectInfo.MarshalMsg(o)
+ if err != nil {
+ err = msgp.WrapError(err, "ObjectInfo")
+ return
+ }
+ // string "IfMatch"
+ o = append(o, 0xa7, 0x49, 0x66, 0x4d, 0x61, 0x74, 0x63, 0x68)
+ o = msgp.AppendString(o, z.IfMatch)
+ // string "IfNoneMatch"
+ o = append(o, 0xab, 0x49, 0x66, 0x4e, 0x6f, 0x6e, 0x65, 0x4d, 0x61, 0x74, 0x63, 0x68)
+ o = msgp.AppendString(o, z.IfNoneMatch)
+ // string "IfModifiedSince"
+ o = append(o, 0xaf, 0x49, 0x66, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65)
+ if z.IfModifiedSince == nil {
+ o = msgp.AppendNil(o)
+ } else {
+ o = msgp.AppendTime(o, *z.IfModifiedSince)
+ }
+ // string "IfUnModifiedSince"
+ o = append(o, 0xb1, 0x49, 0x66, 0x55, 0x6e, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x53, 0x69, 0x6e, 0x63, 0x65)
+ if z.IfUnModifiedSince == nil {
+ o = msgp.AppendNil(o)
+ } else {
+ o = msgp.AppendTime(o, *z.IfUnModifiedSince)
+ }
+ // string "IfRange"
+ o = append(o, 0xa7, 0x49, 0x66, 0x52, 0x61, 0x6e, 0x67, 0x65)
+ o = msgp.AppendString(o, z.IfRange)
+ // string "IfPartNumber"
+ o = append(o, 0xac, 0x49, 0x66, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72)
+ o = msgp.AppendInt(o, z.IfPartNumber)
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *CondCheck) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "ObjectInfo":
+ bts, err = z.ObjectInfo.UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ObjectInfo")
+ return
+ }
+ case "IfMatch":
+ z.IfMatch, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "IfMatch")
+ return
+ }
+ case "IfNoneMatch":
+ z.IfNoneMatch, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "IfNoneMatch")
+ return
+ }
+ case "IfModifiedSince":
+ if msgp.IsNil(bts) {
+ bts, err = msgp.ReadNilBytes(bts)
+ if err != nil {
+ return
+ }
+ z.IfModifiedSince = nil
+ } else {
+ if z.IfModifiedSince == nil {
+ z.IfModifiedSince = new(time.Time)
+ }
+ *z.IfModifiedSince, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "IfModifiedSince")
+ return
+ }
+ }
+ case "IfUnModifiedSince":
+ if msgp.IsNil(bts) {
+ bts, err = msgp.ReadNilBytes(bts)
+ if err != nil {
+ return
+ }
+ z.IfUnModifiedSince = nil
+ } else {
+ if z.IfUnModifiedSince == nil {
+ z.IfUnModifiedSince = new(time.Time)
+ }
+ *z.IfUnModifiedSince, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "IfUnModifiedSince")
+ return
+ }
+ }
+ case "IfRange":
+ z.IfRange, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "IfRange")
+ return
+ }
+ case "IfPartNumber":
+ z.IfPartNumber, bts, err = msgp.ReadIntBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "IfPartNumber")
+ return
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z *CondCheck) Msgsize() (s int) {
+ s = 1 + 11 + z.ObjectInfo.Msgsize() + 8 + msgp.StringPrefixSize + len(z.IfMatch) + 12 + msgp.StringPrefixSize + len(z.IfNoneMatch) + 16
+ if z.IfModifiedSince == nil {
+ s += msgp.NilSize
+ } else {
+ s += msgp.TimeSize
+ }
+ s += 18
+ if z.IfUnModifiedSince == nil {
+ s += msgp.NilSize
+ } else {
+ s += msgp.TimeSize
+ }
+ s += 8 + msgp.StringPrefixSize + len(z.IfRange) + 13 + msgp.IntSize
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *ObjectInfo) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "Key":
+ z.Key, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Key")
+ return
+ }
+ case "Bucket":
+ z.Bucket, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ case "ETag":
+ z.ETag, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "ETag")
+ return
+ }
+ case "ModTime":
+ z.ModTime, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "ModTime")
+ return
+ }
+ case "StatusCode":
+ z.StatusCode, err = dc.ReadInt()
+ if err != nil {
+ err = msgp.WrapError(err, "StatusCode")
+ return
+ }
+ case "CacheControl":
+ z.CacheControl, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "CacheControl")
+ return
+ }
+ case "Expires":
+ z.Expires, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Expires")
+ return
+ }
+ case "Metadata":
+ var zb0002 uint32
+ zb0002, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata")
+ return
+ }
+ if z.Metadata == nil {
+ z.Metadata = make(map[string]string, zb0002)
+ } else if len(z.Metadata) > 0 {
+ for key := range z.Metadata {
+ delete(z.Metadata, key)
+ }
+ }
+ for zb0002 > 0 {
+ zb0002--
+ var za0001 string
+ var za0002 string
+ za0001, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata")
+ return
+ }
+ za0002, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata", za0001)
+ return
+ }
+ z.Metadata[za0001] = za0002
+ }
+ case "Range":
+ z.Range, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Range")
+ return
+ }
+ case "PartNumber":
+ z.PartNumber, err = dc.ReadInt()
+ if err != nil {
+ err = msgp.WrapError(err, "PartNumber")
+ return
+ }
+ case "Size":
+ z.Size, err = dc.ReadInt64()
+ if err != nil {
+ err = msgp.WrapError(err, "Size")
+ return
+ }
+ case "Data":
+ z.Data, err = dc.ReadBytes(z.Data)
+ if err != nil {
+ err = msgp.WrapError(err, "Data")
+ return
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z *ObjectInfo) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 12
+ // write "Key"
+ err = en.Append(0x8c, 0xa3, 0x4b, 0x65, 0x79)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Key)
+ if err != nil {
+ err = msgp.WrapError(err, "Key")
+ return
+ }
+ // write "Bucket"
+ err = en.Append(0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Bucket)
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ // write "ETag"
+ err = en.Append(0xa4, 0x45, 0x54, 0x61, 0x67)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.ETag)
+ if err != nil {
+ err = msgp.WrapError(err, "ETag")
+ return
+ }
+ // write "ModTime"
+ err = en.Append(0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65)
+ if err != nil {
+ return
+ }
+ err = en.WriteTime(z.ModTime)
+ if err != nil {
+ err = msgp.WrapError(err, "ModTime")
+ return
+ }
+ // write "StatusCode"
+ err = en.Append(0xaa, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt(z.StatusCode)
+ if err != nil {
+ err = msgp.WrapError(err, "StatusCode")
+ return
+ }
+ // write "CacheControl"
+ err = en.Append(0xac, 0x43, 0x61, 0x63, 0x68, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.CacheControl)
+ if err != nil {
+ err = msgp.WrapError(err, "CacheControl")
+ return
+ }
+ // write "Expires"
+ err = en.Append(0xa7, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Expires)
+ if err != nil {
+ err = msgp.WrapError(err, "Expires")
+ return
+ }
+ // write "Metadata"
+ err = en.Append(0xa8, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61)
+ if err != nil {
+ return
+ }
+ err = en.WriteMapHeader(uint32(len(z.Metadata)))
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata")
+ return
+ }
+ for za0001, za0002 := range z.Metadata {
+ err = en.WriteString(za0001)
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata")
+ return
+ }
+ err = en.WriteString(za0002)
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata", za0001)
+ return
+ }
+ }
+ // write "Range"
+ err = en.Append(0xa5, 0x52, 0x61, 0x6e, 0x67, 0x65)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Range)
+ if err != nil {
+ err = msgp.WrapError(err, "Range")
+ return
+ }
+ // write "PartNumber"
+ err = en.Append(0xaa, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt(z.PartNumber)
+ if err != nil {
+ err = msgp.WrapError(err, "PartNumber")
+ return
+ }
+ // write "Size"
+ err = en.Append(0xa4, 0x53, 0x69, 0x7a, 0x65)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt64(z.Size)
+ if err != nil {
+ err = msgp.WrapError(err, "Size")
+ return
+ }
+ // write "Data"
+ err = en.Append(0xa4, 0x44, 0x61, 0x74, 0x61)
+ if err != nil {
+ return
+ }
+ err = en.WriteBytes(z.Data)
+ if err != nil {
+ err = msgp.WrapError(err, "Data")
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z *ObjectInfo) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 12
+ // string "Key"
+ o = append(o, 0x8c, 0xa3, 0x4b, 0x65, 0x79)
+ o = msgp.AppendString(o, z.Key)
+ // string "Bucket"
+ o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
+ o = msgp.AppendString(o, z.Bucket)
+ // string "ETag"
+ o = append(o, 0xa4, 0x45, 0x54, 0x61, 0x67)
+ o = msgp.AppendString(o, z.ETag)
+ // string "ModTime"
+ o = append(o, 0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65)
+ o = msgp.AppendTime(o, z.ModTime)
+ // string "StatusCode"
+ o = append(o, 0xaa, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65)
+ o = msgp.AppendInt(o, z.StatusCode)
+ // string "CacheControl"
+ o = append(o, 0xac, 0x43, 0x61, 0x63, 0x68, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c)
+ o = msgp.AppendString(o, z.CacheControl)
+ // string "Expires"
+ o = append(o, 0xa7, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73)
+ o = msgp.AppendString(o, z.Expires)
+ // string "Metadata"
+ o = append(o, 0xa8, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61)
+ o = msgp.AppendMapHeader(o, uint32(len(z.Metadata)))
+ for za0001, za0002 := range z.Metadata {
+ o = msgp.AppendString(o, za0001)
+ o = msgp.AppendString(o, za0002)
+ }
+ // string "Range"
+ o = append(o, 0xa5, 0x52, 0x61, 0x6e, 0x67, 0x65)
+ o = msgp.AppendString(o, z.Range)
+ // string "PartNumber"
+ o = append(o, 0xaa, 0x50, 0x61, 0x72, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72)
+ o = msgp.AppendInt(o, z.PartNumber)
+ // string "Size"
+ o = append(o, 0xa4, 0x53, 0x69, 0x7a, 0x65)
+ o = msgp.AppendInt64(o, z.Size)
+ // string "Data"
+ o = append(o, 0xa4, 0x44, 0x61, 0x74, 0x61)
+ o = msgp.AppendBytes(o, z.Data)
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *ObjectInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "Key":
+ z.Key, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Key")
+ return
+ }
+ case "Bucket":
+ z.Bucket, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ case "ETag":
+ z.ETag, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ETag")
+ return
+ }
+ case "ModTime":
+ z.ModTime, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ModTime")
+ return
+ }
+ case "StatusCode":
+ z.StatusCode, bts, err = msgp.ReadIntBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "StatusCode")
+ return
+ }
+ case "CacheControl":
+ z.CacheControl, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "CacheControl")
+ return
+ }
+ case "Expires":
+ z.Expires, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Expires")
+ return
+ }
+ case "Metadata":
+ var zb0002 uint32
+ zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata")
+ return
+ }
+ if z.Metadata == nil {
+ z.Metadata = make(map[string]string, zb0002)
+ } else if len(z.Metadata) > 0 {
+ for key := range z.Metadata {
+ delete(z.Metadata, key)
+ }
+ }
+ for zb0002 > 0 {
+ var za0001 string
+ var za0002 string
+ zb0002--
+ za0001, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata")
+ return
+ }
+ za0002, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Metadata", za0001)
+ return
+ }
+ z.Metadata[za0001] = za0002
+ }
+ case "Range":
+ z.Range, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Range")
+ return
+ }
+ case "PartNumber":
+ z.PartNumber, bts, err = msgp.ReadIntBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "PartNumber")
+ return
+ }
+ case "Size":
+ z.Size, bts, err = msgp.ReadInt64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Size")
+ return
+ }
+ case "Data":
+ z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data)
+ if err != nil {
+ err = msgp.WrapError(err, "Data")
+ return
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z *ObjectInfo) Msgsize() (s int) {
+ s = 1 + 4 + msgp.StringPrefixSize + len(z.Key) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.ETag) + 8 + msgp.TimeSize + 11 + msgp.IntSize + 13 + msgp.StringPrefixSize + len(z.CacheControl) + 8 + msgp.StringPrefixSize + len(z.Expires) + 9 + msgp.MapHeaderSize
+ if z.Metadata != nil {
+ for za0001, za0002 := range z.Metadata {
+ _ = za0002
+ s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
+ }
+ }
+ s += 6 + msgp.StringPrefixSize + len(z.Range) + 11 + msgp.IntSize + 5 + msgp.Int64Size + 5 + msgp.BytesPrefixSize + len(z.Data)
+ return
+}
diff --git a/internal/config/cache/remote_gen_test.go b/internal/config/cache/remote_gen_test.go
new file mode 100644
index 000000000..86a1a2b92
--- /dev/null
+++ b/internal/config/cache/remote_gen_test.go
@@ -0,0 +1,236 @@
+package cache
+
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/tinylib/msgp/msgp"
+)
+
+func TestMarshalUnmarshalCondCheck(t *testing.T) {
+ v := CondCheck{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgCondCheck(b *testing.B) {
+ v := CondCheck{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgCondCheck(b *testing.B) {
+ v := CondCheck{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalCondCheck(b *testing.B) {
+ v := CondCheck{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecodeCondCheck(t *testing.T) {
+ v := CondCheck{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecodeCondCheck Msgsize() is inaccurate")
+ }
+
+ vn := CondCheck{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncodeCondCheck(b *testing.B) {
+ v := CondCheck{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecodeCondCheck(b *testing.B) {
+ v := CondCheck{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestMarshalUnmarshalObjectInfo(t *testing.T) {
+ v := ObjectInfo{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgObjectInfo(b *testing.B) {
+ v := ObjectInfo{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgObjectInfo(b *testing.B) {
+ v := ObjectInfo{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalObjectInfo(b *testing.B) {
+ v := ObjectInfo{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecodeObjectInfo(t *testing.T) {
+ v := ObjectInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecodeObjectInfo Msgsize() is inaccurate")
+ }
+
+ vn := ObjectInfo{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncodeObjectInfo(b *testing.B) {
+ v := ObjectInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecodeObjectInfo(b *testing.B) {
+ v := ObjectInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index 080734d63..623fda661 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -183,6 +183,7 @@ var SubSystemsDynamic = set.CreateStringSet(
AuditWebhookSubSys,
AuditKafkaSubSys,
StorageClassSubSys,
+ CacheSubSys,
)
// SubSystemsSingleTargets - subsystems which only support single target.
@@ -203,6 +204,7 @@ var SubSystemsSingleTargets = set.CreateStringSet(
ScannerSubSys,
SubnetSubSys,
CallhomeSubSys,
+ CacheSubSys,
)
// Constant separators