mirror of
https://github.com/minio/minio.git
synced 2024-12-23 21:55:53 -05:00
fix: consistent replies for incorrect range requests on replicated buckets (#14345)
Propagate error from replication proxy target correctly to the client if range GET is unsatisfiable.
This commit is contained in:
parent
80ef1ae51c
commit
1e39ca39c3
@ -20,6 +20,7 @@ package cmd
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -27,6 +28,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/bucket/replication"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
)
|
||||
|
||||
//go:generate msgp -file=$GOFILE
|
||||
@ -699,3 +701,26 @@ func newBucketResyncStatus(bucket string) BucketReplicationResyncStatus {
|
||||
Version: resyncMetaVersion,
|
||||
}
|
||||
}
|
||||
|
||||
var contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`)
|
||||
|
||||
// parse size from content-range header
|
||||
func parseSizeFromContentRange(h http.Header) (sz int64, err error) {
|
||||
cr := h.Get(xhttp.ContentRange)
|
||||
if cr == "" {
|
||||
return sz, fmt.Errorf("Content-Range not set")
|
||||
}
|
||||
parts := contentRangeRegexp.FindStringSubmatch(cr)
|
||||
if len(parts) != 4 {
|
||||
return sz, fmt.Errorf("invalid Content-Range header %s", cr)
|
||||
}
|
||||
if parts[3] == "*" {
|
||||
return -1, nil
|
||||
}
|
||||
var usz uint64
|
||||
usz, err = strconv.ParseUint(parts[3], 10, 64)
|
||||
if err != nil {
|
||||
return sz, err
|
||||
}
|
||||
return int64(usz), nil
|
||||
}
|
||||
|
@ -1531,16 +1531,21 @@ func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
|
||||
go globalReplicationStats.loadInitialReplicationMetrics(ctx)
|
||||
}
|
||||
|
||||
type proxyResult struct {
|
||||
Proxy bool
|
||||
Err error
|
||||
}
|
||||
|
||||
// get Reader from replication target if active-active replication is in place and
|
||||
// this node returns a 404
|
||||
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (gr *GetObjectReader, proxy bool) {
|
||||
tgt, oi, proxy := proxyHeadToRepTarget(ctx, bucket, object, opts, proxyTargets)
|
||||
if !proxy {
|
||||
return nil, false
|
||||
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (gr *GetObjectReader, proxy proxyResult, err error) {
|
||||
tgt, oi, proxy := proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
|
||||
if !proxy.Proxy {
|
||||
return nil, proxy, nil
|
||||
}
|
||||
fn, off, length, err := NewGetObjectReader(rs, oi, opts)
|
||||
fn, _, _, err := NewGetObjectReader(nil, oi, opts)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
return nil, proxy, err
|
||||
}
|
||||
gopts := miniogo.GetObjectOptions{
|
||||
VersionID: opts.VersionID,
|
||||
@ -1548,30 +1553,40 @@ func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs
|
||||
Internal: miniogo.AdvancedGetOptions{
|
||||
ReplicationProxyRequest: "true",
|
||||
},
|
||||
PartNumber: opts.PartNumber,
|
||||
}
|
||||
// get correct offsets for encrypted object
|
||||
if off >= 0 && length >= 0 {
|
||||
if err := gopts.SetRange(off, off+length-1); err != nil {
|
||||
return nil, false
|
||||
if rs != nil {
|
||||
h, err := rs.ToHeader()
|
||||
if err != nil {
|
||||
return nil, proxy, err
|
||||
}
|
||||
gopts.Set(xhttp.Range, h)
|
||||
}
|
||||
// Make sure to match ETag when proxying.
|
||||
if err = gopts.SetMatchETag(oi.ETag); err != nil {
|
||||
return nil, false
|
||||
return nil, proxy, err
|
||||
}
|
||||
c := miniogo.Core{Client: tgt.Client}
|
||||
obj, _, _, err := c.GetObject(ctx, bucket, object, gopts)
|
||||
obj, _, h, err := c.GetObject(ctx, bucket, object, gopts)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
return nil, proxy, err
|
||||
}
|
||||
closeReader := func() { obj.Close() }
|
||||
|
||||
reader, err := fn(obj, h, closeReader)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
return nil, proxy, err
|
||||
}
|
||||
reader.ObjInfo = oi.Clone()
|
||||
return reader, true
|
||||
if rs != nil {
|
||||
contentSize, err := parseSizeFromContentRange(h)
|
||||
if err != nil {
|
||||
return nil, proxy, err
|
||||
}
|
||||
reader.ObjInfo.Size = contentSize
|
||||
}
|
||||
|
||||
return reader, proxyResult{Proxy: true}, nil
|
||||
}
|
||||
|
||||
func getproxyTargets(ctx context.Context, bucket, object string, opts ObjectOptions) (tgts *madmin.BucketTargets) {
|
||||
@ -1590,11 +1605,11 @@ func getproxyTargets(ctx context.Context, bucket, object string, opts ObjectOpti
|
||||
return tgts
|
||||
}
|
||||
|
||||
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgt *TargetClient, oi ObjectInfo, proxy bool) {
|
||||
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgt *TargetClient, oi ObjectInfo, proxy proxyResult) {
|
||||
// this option is set when active-active replication is in place between site A -> B,
|
||||
// and site B does not have the object yet.
|
||||
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
|
||||
return nil, oi, false
|
||||
return nil, oi, proxy
|
||||
}
|
||||
for _, t := range proxyTargets.Targets {
|
||||
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, t.Arn)
|
||||
@ -1612,9 +1627,22 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec
|
||||
Internal: miniogo.AdvancedGetOptions{
|
||||
ReplicationProxyRequest: "true",
|
||||
},
|
||||
PartNumber: opts.PartNumber,
|
||||
}
|
||||
if rs != nil {
|
||||
h, err := rs.ToHeader()
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err))
|
||||
continue
|
||||
}
|
||||
gopts.Set(xhttp.Range, h)
|
||||
}
|
||||
|
||||
objInfo, err := tgt.StatObject(ctx, t.TargetBucket, object, gopts)
|
||||
if err != nil {
|
||||
if isErrInvalidRange(ErrorRespToObjectError(err, bucket, object)) {
|
||||
return nil, oi, proxyResult{Err: err}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1645,15 +1673,15 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec
|
||||
if ok {
|
||||
oi.ContentEncoding = ce
|
||||
}
|
||||
return tgt, oi, true
|
||||
return tgt, oi, proxyResult{Proxy: true}
|
||||
}
|
||||
return nil, oi, false
|
||||
return nil, oi, proxy
|
||||
}
|
||||
|
||||
// get object info from replication target if active-active replication is in place and
|
||||
// this node returns a 404
|
||||
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (oi ObjectInfo, proxy bool) {
|
||||
_, oi, proxy = proxyHeadToRepTarget(ctx, bucket, object, opts, proxyTargets)
|
||||
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (oi ObjectInfo, proxy proxyResult) {
|
||||
_, oi, proxy = proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
|
||||
return oi, proxy
|
||||
}
|
||||
|
||||
|
@ -174,3 +174,26 @@ func (h *HTTPRangeSpec) String(resourceSize int64) string {
|
||||
}
|
||||
return fmt.Sprintf("%d-%d", off, off+length-1)
|
||||
}
|
||||
|
||||
// ToHeader returns the Range header value.
|
||||
func (h *HTTPRangeSpec) ToHeader() (string, error) {
|
||||
if h == nil {
|
||||
return "", nil
|
||||
}
|
||||
start := strconv.Itoa(int(h.Start))
|
||||
end := strconv.Itoa(int(h.End))
|
||||
switch {
|
||||
case h.Start >= 0 && h.End >= 0:
|
||||
if h.Start > h.End {
|
||||
return "", errInvalidRange
|
||||
}
|
||||
case h.IsSuffixLength:
|
||||
end = strconv.Itoa(int(h.Start * -1))
|
||||
start = ""
|
||||
case h.Start > -1:
|
||||
end = ""
|
||||
default:
|
||||
return "", fmt.Errorf("does not have valid range value")
|
||||
}
|
||||
return fmt.Sprintf("bytes=%s-%s", start, end), nil
|
||||
}
|
||||
|
@ -101,3 +101,46 @@ func TestHTTPRequestRangeSpec(t *testing.T) {
|
||||
t.Errorf("Case %d: Expected errInvalidRange but: %v %v %d %d %v", i, rs, err1, o, l, err2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPRequestRangeToHeader(t *testing.T) {
|
||||
validRangeSpecs := []struct {
|
||||
spec string
|
||||
errExpected bool
|
||||
}{
|
||||
{"bytes=0-", false},
|
||||
{"bytes=1-", false},
|
||||
|
||||
{"bytes=0-9", false},
|
||||
{"bytes=1-10", false},
|
||||
{"bytes=1-1", false},
|
||||
{"bytes=2-5", false},
|
||||
|
||||
{"bytes=-5", false},
|
||||
{"bytes=-1", false},
|
||||
{"bytes=-1000", false},
|
||||
{"bytes=", true},
|
||||
{"bytes= ", true},
|
||||
{"byte=", true},
|
||||
{"bytes=A-B", true},
|
||||
{"bytes=1-B", true},
|
||||
{"bytes=B-1", true},
|
||||
{"bytes=-1-1", true},
|
||||
}
|
||||
for i, testCase := range validRangeSpecs {
|
||||
rs, err := parseRequestRangeSpec(testCase.spec)
|
||||
if err != nil {
|
||||
if !testCase.errExpected || err == nil && testCase.errExpected {
|
||||
t.Errorf("unexpected err: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
h, err := rs.ToHeader()
|
||||
if err != nil && !testCase.errExpected || err == nil && testCase.errExpected {
|
||||
t.Errorf("expected error with invalid range: %v", err)
|
||||
}
|
||||
if h != testCase.spec {
|
||||
t.Errorf("Case %d: translated to incorrect header: %s expected: %s",
|
||||
i, h, testCase.spec)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -701,3 +701,7 @@ func isErrMethodNotAllowed(err error) bool {
|
||||
var methodNotAllowed MethodNotAllowed
|
||||
return errors.As(err, &methodNotAllowed)
|
||||
}
|
||||
|
||||
func isErrInvalidRange(err error) bool {
|
||||
return errors.As(err, &errInvalidRange)
|
||||
}
|
||||
|
@ -412,20 +412,28 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
if err != nil {
|
||||
var (
|
||||
reader *GetObjectReader
|
||||
proxy bool
|
||||
proxy proxyResult
|
||||
)
|
||||
proxytgts := getproxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
// proxy to replication target if active-active replication is in place.
|
||||
reader, proxy = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts)
|
||||
if reader != nil && proxy {
|
||||
reader, proxy, err = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts)
|
||||
if err != nil && !isErrObjectNotFound(ErrorRespToObjectError(err, bucket, object)) &&
|
||||
!isErrVersionNotFound(ErrorRespToObjectError(err, bucket, object)) {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication proxy failed for %s/%s(%s) - %w", bucket, object, opts.VersionID, err))
|
||||
}
|
||||
if reader != nil && proxy.Proxy && err == nil {
|
||||
gr = reader
|
||||
}
|
||||
}
|
||||
if reader == nil || !proxy {
|
||||
if reader == nil || !proxy.Proxy {
|
||||
if isErrPreconditionFailed(err) {
|
||||
return
|
||||
}
|
||||
if proxy.Err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, proxy.Err), r.URL)
|
||||
return
|
||||
}
|
||||
if globalBucketVersioningSys.Enabled(bucket) && gr != nil {
|
||||
if !gr.ObjInfo.VersionPurgeStatus.Empty() {
|
||||
// Shows the replication status of a permanent delete of a version
|
||||
@ -631,22 +639,28 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
|
||||
return
|
||||
}
|
||||
// Get request range.
|
||||
var rs *HTTPRangeSpec
|
||||
rangeHeader := r.Header.Get(xhttp.Range)
|
||||
|
||||
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
var (
|
||||
proxy bool
|
||||
proxy proxyResult
|
||||
oi ObjectInfo
|
||||
)
|
||||
// proxy HEAD to replication target if active-active replication configured on bucket
|
||||
proxytgts := getproxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, opts, proxytgts)
|
||||
if proxy {
|
||||
if rangeHeader != "" {
|
||||
rs, _ = parseRequestRangeSpec(rangeHeader)
|
||||
}
|
||||
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
|
||||
if proxy.Proxy {
|
||||
objInfo = oi
|
||||
}
|
||||
}
|
||||
if !proxy {
|
||||
if !proxy.Proxy {
|
||||
if globalBucketVersioningSys.Enabled(bucket) {
|
||||
switch {
|
||||
case !objInfo.VersionPurgeStatus.Empty():
|
||||
@ -703,9 +717,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
|
||||
return
|
||||
}
|
||||
|
||||
// Get request range.
|
||||
var rs *HTTPRangeSpec
|
||||
rangeHeader := r.Header.Get(xhttp.Range)
|
||||
if rangeHeader != "" {
|
||||
// Both 'Range' and 'partNumber' cannot be specified at the same time
|
||||
if opts.PartNumber > 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user