fix: avoid metadata cache without data for all callers (#19935)

This commit is contained in:
Harshavardhana 2024-06-14 06:28:35 -07:00 committed by GitHub
parent c50b64027d
commit c91d1ec2e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 34 additions and 121 deletions

View File

@ -51,7 +51,6 @@ import (
sse "github.com/minio/minio/internal/bucket/encryption" sse "github.com/minio/minio/internal/bucket/encryption"
objectlock "github.com/minio/minio/internal/bucket/object/lock" objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
@ -1342,22 +1341,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
continue continue
} }
asize, err := objInfo.GetActualSize()
if err != nil {
asize = objInfo.Size
}
globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: getDecryptedETag(formValues, objInfo, false),
ModTime: objInfo.ModTime,
Expires: objInfo.Expires.UTC().Format(http.TimeFormat),
CacheControl: objInfo.CacheControl,
Metadata: cleanReservedKeys(objInfo.UserDefined),
Size: asize,
})
fanOutResp = append(fanOutResp, minio.PutObjectFanOutResponse{ fanOutResp = append(fanOutResp, minio.PutObjectFanOutResponse{
Key: objInfo.Name, Key: objInfo.Name,
ETag: getDecryptedETag(formValues, objInfo, false), ETag: getDecryptedETag(formValues, objInfo, false),
@ -1452,22 +1435,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
w.Header().Set(xhttp.Location, obj) w.Header().Set(xhttp.Location, obj)
} }
asize, err := objInfo.GetActualSize()
if err != nil {
asize = objInfo.Size
}
defer globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: etag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Metadata: cleanReservedKeys(objInfo.UserDefined),
Size: asize,
})
// Notify object created event. // Notify object created event.
defer sendEvent(eventArgs{ defer sendEvent(eventArgs{
EventName: event.ObjectCreatedPost, EventName: event.ObjectCreatedPost,

View File

@ -610,8 +610,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm)) w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5)) w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
} }
// Never store encrypted objects in cache.
update = false
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
} }
@ -620,39 +618,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header)) hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header))
} }
var buf *bytebufferpool.ByteBuffer
if update {
if globalCacheConfig.MatchesSize(objInfo.Size) {
buf = bytebufferpool.Get()
defer bytebufferpool.Put(buf)
}
defer func() {
var data []byte
if buf != nil {
data = buf.Bytes()
}
asize, err := objInfo.GetActualSize()
if err != nil {
asize = objInfo.Size
}
globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: objInfo.ETag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Metadata: cleanReservedKeys(objInfo.UserDefined),
Range: rangeHeader,
PartNumber: opts.PartNumber,
Size: asize,
Data: data,
})
}()
}
if err = setObjectHeaders(ctx, w, objInfo, rs, opts); err != nil { if err = setObjectHeaders(ctx, w, objInfo, rs, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return
@ -665,6 +630,12 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
setHeadGetRespHeaders(w, r.Form) setHeadGetRespHeaders(w, r.Form)
var buf *bytebufferpool.ByteBuffer
if update && globalCacheConfig.MatchesSize(objInfo.Size) {
buf = bytebufferpool.Get()
defer bytebufferpool.Put(buf)
}
var iw io.Writer var iw io.Writer
iw = w iw = w
if buf != nil { if buf != nil {
@ -696,6 +667,31 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
return return
} }
defer func() {
var data []byte
if buf != nil {
data = buf.Bytes()
}
if len(data) == 0 {
return
}
globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: objInfo.ETag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Metadata: cleanReservedKeys(objInfo.UserDefined),
Range: rangeHeader,
PartNumber: opts.PartNumber,
Size: int64(len(data)),
Data: data,
})
}()
// Notify object accessed via a GET request. // Notify object accessed via a GET request.
sendEvent(eventArgs{ sendEvent(eventArgs{
EventName: event.ObjectAccessedGet, EventName: event.ObjectAccessedGet,
@ -1939,22 +1935,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
Host: handlers.GetSourceIP(r), Host: handlers.GetSourceIP(r),
}) })
asize, err := objInfo.GetActualSize()
if err != nil {
asize = objInfo.Size
}
defer globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: objInfo.ETag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Size: asize,
Metadata: cleanReservedKeys(objInfo.UserDefined),
})
if !remoteCallRequired && !globalTierConfigMgr.Empty() { if !remoteCallRequired && !globalTierConfigMgr.Empty() {
// Schedule object for immediate transition if eligible. // Schedule object for immediate transition if eligible.
objInfo.ETag = origETag objInfo.ETag = origETag
@ -2343,9 +2323,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
data = buf.Bytes() data = buf.Bytes()
} }
asize, err := objInfo.GetActualSize() if len(data) == 0 {
if err != nil { return
asize = objInfo.Size
} }
globalCacheConfig.Set(&cache.ObjectInfo{ globalCacheConfig.Set(&cache.ObjectInfo{
@ -2355,7 +2334,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
ModTime: objInfo.ModTime, ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(), Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl, CacheControl: objInfo.CacheControl,
Size: asize, Size: int64(len(data)),
Metadata: cleanReservedKeys(objInfo.UserDefined), Metadata: cleanReservedKeys(objInfo.UserDefined),
Data: data, Data: data,
}) })
@ -2712,22 +2691,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType) scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
} }
asize, err := objInfo.GetActualSize()
if err != nil {
asize = objInfo.Size
}
defer globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: objInfo.ETag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Size: asize,
Metadata: cleanReservedKeys(objInfo.UserDefined),
})
// Notify object created event. // Notify object created event.
evt := eventArgs{ evt := eventArgs{
EventName: event.ObjectCreatedPut, EventName: event.ObjectCreatedPut,

View File

@ -36,7 +36,6 @@ import (
sse "github.com/minio/minio/internal/bucket/encryption" sse "github.com/minio/minio/internal/bucket/encryption"
objectlock "github.com/minio/minio/internal/bucket/object/lock" objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/crypto"
@ -1054,22 +1053,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
} }
sendEvent(evt) sendEvent(evt)
asize, err := objInfo.GetActualSize()
if err != nil {
asize = objInfo.Size
}
defer globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: objInfo.ETag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Size: asize,
Metadata: cleanReservedKeys(objInfo.UserDefined),
})
if objInfo.NumVersions > int(scannerExcessObjectVersions.Load()) { if objInfo.NumVersions > int(scannerExcessObjectVersions.Load()) {
evt.EventName = event.ObjectManyVersions evt.EventName = event.ObjectManyVersions
sendEvent(evt) sendEvent(evt)