fix: return proper errors Get/HeadObject for deleteMarkers (#9957)

This commit is contained in:
Harshavardhana 2020-07-02 16:17:27 -07:00 committed by GitHub
parent 4c266df863
commit 810a4f0723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 105 additions and 34 deletions

View File

@ -629,7 +629,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
// Object might have been deleted, by the time heal // Object might have been deleted, by the time heal
// was attempted, we should ignore this object and // was attempted, we should ignore this object and
// return success. // return success.
if isErrObjectNotFound(res.err) { if isErrObjectNotFound(res.err) || isErrVersionNotFound(res.err) {
return nil return nil
} }
@ -657,7 +657,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
if res.err != nil { if res.err != nil {
// Object might have been deleted, by the time heal // Object might have been deleted, by the time heal
// was attempted, we should ignore this object and return success. // was attempted, we should ignore this object and return success.
if isErrObjectNotFound(res.err) { if isErrObjectNotFound(res.err) || isErrVersionNotFound(res.err) {
return nil return nil
} }
// Only report object error // Only report object error
@ -769,7 +769,7 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
}, madmin.HealItemBucketMetadata) }, madmin.HealItemBucketMetadata)
// Object might have been deleted, by the time heal // Object might have been deleted, by the time heal
// was attempted we ignore this object an move on. // was attempted we ignore this object an move on.
if isErrObjectNotFound(herr) { if isErrObjectNotFound(herr) || isErrVersionNotFound(herr) {
return nil return nil
} }
return herr return herr

View File

@ -41,7 +41,7 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
// checkUploadIDExists - verify if a given uploadID exists and is valid. // checkUploadIDExists - verify if a given uploadID exists and is valid.
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error { func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error {
_, err := er.getObjectInfo(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), ObjectOptions{}) _, _, _, err := er.getObjectFileInfo(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), ObjectOptions{})
return err return err
} }

View File

@ -145,7 +145,20 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
fn, off, length, nErr := NewGetObjectReader(rs, fi.ToObjectInfo(bucket, object), opts) objInfo := fi.ToObjectInfo(bucket, object)
if objInfo.DeleteMarker {
if opts.VersionID == "" {
return &GetObjectReader{
ObjInfo: objInfo,
}, toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return &GetObjectReader{
ObjInfo: objInfo,
}, toObjectErr(errMethodNotAllowed, bucket, object)
}
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts)
if nErr != nil { if nErr != nil {
return nil, nErr return nil, nErr
} }
@ -310,6 +323,14 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s
if err != nil { if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
if fi.Deleted {
if opts.VersionID == "" {
return toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return toObjectErr(errMethodNotAllowed, bucket, object)
}
return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, etag, opts, fi, metaArr, onlineDisks) return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, etag, opts, fi, metaArr, onlineDisks)
} }
@ -358,12 +379,7 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return info, nil return info, nil
} }
info, err = er.getObjectInfo(ctx, bucket, object, opts) return er.getObjectInfo(ctx, bucket, object, opts)
if err != nil {
return info, toObjectErr(err, bucket, object)
}
return info, nil
} }
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
@ -397,7 +413,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts) fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts)
if err != nil { if err != nil {
return objInfo, err return objInfo, toObjectErr(err, bucket, object)
} }
if fi.Deleted { if fi.Deleted {

View File

@ -465,19 +465,22 @@ func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string
} }
for _, zone := range z.zones { for _, zone := range z.zones {
gr, err := zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) gr, err = zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
if err != nil { if err != nil {
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue continue
} }
nsUnlocker() nsUnlocker()
return nil, err return gr, err
} }
gr.cleanUpFns = append(gr.cleanUpFns, nsUnlocker) gr.cleanUpFns = append(gr.cleanUpFns, nsUnlocker)
return gr, nil return gr, nil
} }
nsUnlocker() nsUnlocker()
return nil, ObjectNotFound{Bucket: bucket, Object: object} if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
return gr, ObjectNotFound{Bucket: bucket, Object: object}
} }
func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
@ -491,9 +494,10 @@ func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, sta
if z.SingleZone() { if z.SingleZone() {
return z.zones[0].GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts) return z.zones[0].GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
} }
for _, zone := range z.zones { for _, zone := range z.zones {
if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil { if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil {
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue continue
} }
return err return err
@ -503,7 +507,7 @@ func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, sta
return ObjectNotFound{Bucket: bucket, Object: object} return ObjectNotFound{Bucket: bucket, Object: object}
} }
func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
// Lock the object before reading. // Lock the object before reading.
lk := z.NewNSLock(ctx, bucket, object) lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalObjectTimeout); err != nil { if err := lk.GetRLock(globalObjectTimeout); err != nil {
@ -515,16 +519,19 @@ func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string,
return z.zones[0].GetObjectInfo(ctx, bucket, object, opts) return z.zones[0].GetObjectInfo(ctx, bucket, object, opts)
} }
for _, zone := range z.zones { for _, zone := range z.zones {
objInfo, err := zone.GetObjectInfo(ctx, bucket, object, opts) objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts)
if err != nil { if err != nil {
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue continue
} }
return objInfo, err return objInfo, err
} }
return objInfo, nil return objInfo, nil
} }
return ObjectInfo{}, ObjectNotFound{Bucket: bucket, Object: object} if opts.VersionID != "" {
return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
return objInfo, ObjectNotFound{Bucket: bucket, Object: object}
} }
// PutObject - writes an object to least used erasure zone. // PutObject - writes an object to least used erasure zone.
@ -565,7 +572,7 @@ func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object s
if err == nil { if err == nil {
return objInfo, nil return objInfo, nil
} }
if err != nil && !isErrObjectNotFound(err) { if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
break break
} }
} }
@ -595,7 +602,7 @@ func (z *erasureZones) DeleteObjects(ctx context.Context, bucket string, objects
deletedObjects, errs := zone.DeleteObjects(ctx, bucket, objects, opts) deletedObjects, errs := zone.DeleteObjects(ctx, bucket, objects, opts)
for i, derr := range errs { for i, derr := range errs {
if derrs[i] == nil { if derrs[i] == nil {
if derr != nil && !isErrObjectNotFound(derr) { if derr != nil && !isErrObjectNotFound(derr) && !isErrVersionNotFound(derr) {
derrs[i] = derr derrs[i] = derr
} }
} }
@ -1918,7 +1925,7 @@ func (z *erasureZones) HealObject(ctx context.Context, bucket, object, versionID
for _, zone := range z.zones { for _, zone := range z.zones {
result, err := zone.HealObject(ctx, bucket, object, versionID, opts) result, err := zone.HealObject(ctx, bucket, object, versionID, opts)
if err != nil { if err != nil {
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue continue
} }
return result, err return result, err
@ -2022,7 +2029,7 @@ func (z *erasureZones) PutObjectTags(ctx context.Context, bucket, object string,
for _, zone := range z.zones { for _, zone := range z.zones {
err := zone.PutObjectTags(ctx, bucket, object, tags, opts) err := zone.PutObjectTags(ctx, bucket, object, tags, opts)
if err != nil { if err != nil {
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue continue
} }
return err return err
@ -2050,7 +2057,7 @@ func (z *erasureZones) DeleteObjectTags(ctx context.Context, bucket, object stri
for _, zone := range z.zones { for _, zone := range z.zones {
err := zone.DeleteObjectTags(ctx, bucket, object, opts) err := zone.DeleteObjectTags(ctx, bucket, object, opts)
if err != nil { if err != nil {
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue continue
} }
return err return err
@ -2078,7 +2085,7 @@ func (z *erasureZones) GetObjectTags(ctx context.Context, bucket, object string,
for _, zone := range z.zones { for _, zone := range z.zones {
tags, err := zone.GetObjectTags(ctx, bucket, object, opts) tags, err := zone.GetObjectTags(ctx, bucket, object, opts)
if err != nil { if err != nil {
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue continue
} }
return tags, err return tags, err

View File

@ -214,7 +214,8 @@ func guessIsHealthCheckReq(req *http.Request) bool {
aType := getRequestAuthType(req) aType := getRequestAuthType(req)
return aType == authTypeAnonymous && (req.Method == http.MethodGet || req.Method == http.MethodHead) && return aType == authTypeAnonymous && (req.Method == http.MethodGet || req.Method == http.MethodHead) &&
(req.URL.Path == healthCheckPathPrefix+healthCheckLivenessPath || (req.URL.Path == healthCheckPathPrefix+healthCheckLivenessPath ||
req.URL.Path == healthCheckPathPrefix+healthCheckReadinessPath) req.URL.Path == healthCheckPathPrefix+healthCheckReadinessPath ||
req.URL.Path == healthCheckPathPrefix+healthCheckClusterPath)
} }
// guessIsMetricsReq - returns true if incoming request looks // guessIsMetricsReq - returns true if incoming request looks

View File

@ -519,6 +519,12 @@ func isErrObjectNotFound(err error) bool {
return errors.As(err, &objNotFound) return errors.As(err, &objNotFound)
} }
// isErrVersionNotFound - Check if error type is VersionNotFound.
func isErrVersionNotFound(err error) bool {
var versionNotFound VersionNotFound
return errors.As(err, &versionNotFound)
}
// PreConditionFailed - Check if copy precondition failed // PreConditionFailed - Check if copy precondition failed
type PreConditionFailed struct{} type PreConditionFailed struct{}

View File

@ -116,7 +116,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
// get gateway encryption options // get gateway encryption options
opts, err := getOpts(ctx, r, bucket, object) opts, err := getOpts(ctx, r, bucket, object)
if err != nil { if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
@ -192,6 +192,14 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
objInfo, err := getObjectInfo(ctx, bucket, object, opts) objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil { if err != nil {
if globalBucketVersioningSys.Enabled(bucket) {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if objInfo.VersionID != "" && objInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
@ -256,7 +264,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
case crypto.SSEC.IsEncrypted(objInfo.UserDefined): case crypto.SSEC.IsEncrypted(objInfo.UserDefined):
// Validate the SSE-C Key set in the header. // Validate the SSE-C Key set in the header.
if _, err = crypto.SSEC.UnsealObjectKey(r.Header, objInfo.UserDefined, bucket, object); err != nil { if _, err = crypto.SSEC.UnsealObjectKey(r.Header, objInfo.UserDefined, bucket, object); err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
w.Header().Set(crypto.SSECAlgorithm, r.Header.Get(crypto.SSECAlgorithm)) w.Header().Set(crypto.SSECAlgorithm, r.Header.Get(crypto.SSECAlgorithm))
@ -313,7 +321,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
// get gateway encryption options // get gateway encryption options
opts, err := getOpts(ctx, r, bucket, object) opts, err := getOpts(ctx, r, bucket, object)
if err != nil { if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
@ -379,6 +387,14 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts) gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
if err != nil { if err != nil {
if globalBucketVersioningSys.Enabled(bucket) && gr != nil {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
@ -435,6 +451,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
statusCodeWritten = true statusCodeWritten = true
w.WriteHeader(http.StatusPartialContent) w.WriteHeader(http.StatusPartialContent)
} }
// Write object content to response body // Write object content to response body
if _, err = io.Copy(httpWriter, gr); err != nil { if _, err = io.Copy(httpWriter, gr); err != nil {
if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet
@ -552,6 +569,14 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
objInfo, err := getObjectInfo(ctx, bucket, object, opts) objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil { if err != nil {
if globalBucketVersioningSys.Enabled(bucket) {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if objInfo.VersionID != "" && objInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
}
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return return
} }
@ -861,6 +886,14 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if isErrPreconditionFailed(err) { if isErrPreconditionFailed(err) {
return return
} }
if globalBucketVersioningSys.Enabled(srcBucket) && gr != nil {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
@ -1408,7 +1441,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
var opts ObjectOptions var opts ObjectOptions
opts, err = putOpts(ctx, r, bucket, object, metadata) opts, err = putOpts(ctx, r, bucket, object, metadata)
if err != nil { if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
@ -1620,7 +1653,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
opts, err := putOpts(ctx, r, bucket, object, metadata) opts, err := putOpts(ctx, r, bucket, object, metadata)
if err != nil { if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
newMultipartUpload := objectAPI.NewMultipartUpload newMultipartUpload := objectAPI.NewMultipartUpload
@ -1771,6 +1804,14 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
if isErrPreconditionFailed(err) { if isErrPreconditionFailed(err) {
return return
} }
if globalBucketVersioningSys.Enabled(srcBucket) && gr != nil {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }

View File

@ -756,7 +756,7 @@ next:
} }
} }
if err != nil && !isErrObjectNotFound(err) { if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// Ignore object not found error. // Ignore object not found error.
return toJSONError(ctx, err, args.BucketName, "") return toJSONError(ctx, err, args.BucketName, "")
} }