diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 14ec3b356..4cb313c34 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc. + * MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -142,20 +142,19 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts) } - var objInfo ObjectInfo - objInfo, err = xl.getObjectInfo(ctx, bucket, object, opts) + meta, metaArr, onlineDisks, err := xl.getObjectXLMeta(ctx, bucket, object, opts) if err != nil { return nil, toObjectErr(err, bucket, object) } - fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts) + fn, off, length, nErr := NewGetObjectReader(rs, meta.ToObjectInfo(bucket, object), opts) if nErr != nil { return nil, nErr } pr, pw := io.Pipe() go func() { - err := xl.getObject(ctx, bucket, object, off, length, pw, "", opts) + err := xl.getObjectWithXLMeta(ctx, bucket, object, off, length, pw, "", opts, meta, metaArr, onlineDisks) pw.CloseWithError(err) }() @@ -173,12 +172,6 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r // startOffset indicates the starting read location of the object. // length indicates the total length of the object. func (xl xlObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { - return xl.getObject(ctx, bucket, object, startOffset, length, writer, etag, opts) -} - -// getObject wrapper for xl GetObject -func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { - if err := checkGetObjArgs(ctx, bucket, object); err != nil { return err } @@ -202,28 +195,10 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO return toObjectErr(err, bucket, object) } - // Read metadata associated with the object from all disks. - metaArr, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object) - - // get Quorum for this object - readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs) - if err != nil { - return toObjectErr(err, bucket, object) - } - - if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { - return toObjectErr(reducedErr, bucket, object) - } - - // List all online disks. - onlineDisks, modTime := listOnlineDisks(xl.getDisks(), metaArr, errs) - - // Pick latest valid metadata. - xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum) - if err != nil { - return err - } + return xl.getObject(ctx, bucket, object, startOffset, length, writer, etag, opts) +} +func (xl xlObjects) getObjectWithXLMeta(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions, xlMeta xlMetaV1, metaArr []xlMetaV1, onlineDisks []StorageAPI) error { // Reorder online disks based on erasure distribution order. onlineDisks = shuffleDisks(onlineDisks, xlMeta.Erasure.Distribution) @@ -330,6 +305,15 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO return nil } +// getObject wrapper for xl GetObject +func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { + xlMeta, metaArr, onlineDisks, err := xl.getObjectXLMeta(ctx, bucket, object, opts) + if err != nil { + return toObjectErr(err, bucket, object) + } + return xl.getObjectWithXLMeta(ctx, bucket, object, startOffset, length, writer, etag, opts, xlMeta, metaArr, onlineDisks) +} + // getObjectInfoDir - This getObjectInfo is specific to object directory lookup. func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (ObjectInfo, error) { storageDisks := xl.getDisks() @@ -383,8 +367,7 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, op return info, nil } -// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. -func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string, opt ObjectOptions) (objInfo ObjectInfo, err error) { +func (xl xlObjects) getObjectXLMeta(ctx context.Context, bucket, object string, opt ObjectOptions) (xlMeta xlMetaV1, metaArr []xlMetaV1, onlineDisks []StorageAPI, err error) { disks := xl.getDisks() // Read metadata associated with the object from all disks. @@ -392,21 +375,30 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string, op readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs) if err != nil { - return objInfo, err + return xlMeta, nil, nil, err } - // List all the file commit ids from parts metadata. - modTimes := listObjectModtimes(metaArr, errs) - - // Reduce list of UUIDs to a single common value. - modTime, _ := commonTime(modTimes) + if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { + return xlMeta, nil, nil, toObjectErr(reducedErr, bucket, object) + } + // List all online disks. + onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum) + xlMeta, err = pickValidXLMeta(ctx, metaArr, modTime, readQuorum) + if err != nil { + return xlMeta, nil, nil, err + } + + return xlMeta, metaArr, onlineDisks, nil +} + +// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. +func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string, opt ObjectOptions) (objInfo ObjectInfo, err error) { + xlMeta, _, _, err := xl.getObjectXLMeta(ctx, bucket, object, opt) if err != nil { return objInfo, err } - return xlMeta.ToObjectInfo(bucket, object), nil }