mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
do not use large buffers if not necessary (#11220)
without this change, there is a performance regression for small objects GETs, this makes the overall speed to go back to pre '59d363' commit days.
This commit is contained in:
parent
cb7fc99368
commit
d0027c3c41
@ -91,7 +91,6 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i
|
|||||||
// ReadAt() implementation which verifies the bitrot hash available as part of the stream.
|
// ReadAt() implementation which verifies the bitrot hash available as part of the stream.
|
||||||
type streamingBitrotReader struct {
|
type streamingBitrotReader struct {
|
||||||
disk StorageAPI
|
disk StorageAPI
|
||||||
data []byte
|
|
||||||
rc io.Reader
|
rc io.Reader
|
||||||
volume string
|
volume string
|
||||||
filePath string
|
filePath string
|
||||||
@ -123,14 +122,10 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
|
|||||||
// For the first ReadAt() call we need to open the stream for reading.
|
// For the first ReadAt() call we need to open the stream for reading.
|
||||||
b.currOffset = offset
|
b.currOffset = offset
|
||||||
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset
|
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset
|
||||||
if len(b.data) == 0 {
|
|
||||||
b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset)
|
b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset != b.currOffset {
|
if offset != b.currOffset {
|
||||||
@ -159,11 +154,10 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns streaming bitrot reader implementation.
|
// Returns streaming bitrot reader implementation.
|
||||||
func newStreamingBitrotReader(disk StorageAPI, data []byte, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader {
|
func newStreamingBitrotReader(disk StorageAPI, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader {
|
||||||
h := algo.New()
|
h := algo.New()
|
||||||
return &streamingBitrotReader{
|
return &streamingBitrotReader{
|
||||||
disk,
|
disk,
|
||||||
data,
|
|
||||||
nil,
|
nil,
|
||||||
volume,
|
volume,
|
||||||
filePath,
|
filePath,
|
||||||
|
@ -103,9 +103,9 @@ func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, alg
|
|||||||
return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize)
|
return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBitrotReader(disk StorageAPI, data []byte, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt {
|
func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt {
|
||||||
if algo == HighwayHash256S {
|
if algo == HighwayHash256S {
|
||||||
return newStreamingBitrotReader(disk, data, bucket, filePath, tillOffset, algo, shardSize)
|
return newStreamingBitrotReader(disk, bucket, filePath, tillOffset, algo, shardSize)
|
||||||
}
|
}
|
||||||
return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum)
|
return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum)
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) {
|
|||||||
}
|
}
|
||||||
writer.(io.Closer).Close()
|
writer.(io.Closer).Close()
|
||||||
|
|
||||||
reader := newBitrotReader(disk, nil, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10)
|
reader := newBitrotReader(disk, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10)
|
||||||
b := make([]byte, 10)
|
b := make([]byte, 10)
|
||||||
if _, err = reader.ReadAt(b, 0); err != nil {
|
if _, err = reader.ReadAt(b, 0); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -51,7 +51,7 @@ func init() {
|
|||||||
logger.RegisterError(config.FmtError)
|
logger.RegisterError(config.FmtError)
|
||||||
|
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
rand.Seed(time.Now().UTC().UnixNano())
|
||||||
globalDNSCache = xhttp.NewDNSCache(10*time.Second, 3*time.Second)
|
globalDNSCache = xhttp.NewDNSCache(10*time.Second, 10*time.Second)
|
||||||
|
|
||||||
initGlobalContext()
|
initGlobalContext()
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ func TestErasureDecode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data)
|
tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data)
|
||||||
|
|
||||||
bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize())
|
bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize())
|
||||||
}
|
}
|
||||||
|
|
||||||
writer := bytes.NewBuffer(nil)
|
writer := bytes.NewBuffer(nil)
|
||||||
@ -164,7 +164,7 @@ func TestErasureDecode(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data)
|
tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data)
|
||||||
bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize())
|
bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize())
|
||||||
}
|
}
|
||||||
for j := range disks[:test.offDisks] {
|
for j := range disks[:test.offDisks] {
|
||||||
if bitrotReaders[j] == nil {
|
if bitrotReaders[j] == nil {
|
||||||
@ -270,7 +270,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tillOffset := erasure.ShardFileOffset(offset, readLen, length)
|
tillOffset := erasure.ShardFileOffset(offset, readLen, length)
|
||||||
bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
|
bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||||
}
|
}
|
||||||
err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil)
|
err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil)
|
||||||
closeBitrotReaders(bitrotReaders)
|
closeBitrotReaders(bitrotReaders)
|
||||||
@ -332,7 +332,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tillOffset := erasure.ShardFileOffset(0, size, size)
|
tillOffset := erasure.ShardFileOffset(0, size, size)
|
||||||
bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
|
bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||||
}
|
}
|
||||||
if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil {
|
if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -99,7 +99,7 @@ func TestErasureHeal(t *testing.T) {
|
|||||||
readers := make([]io.ReaderAt, len(disks))
|
readers := make([]io.ReaderAt, len(disks))
|
||||||
for i, disk := range disks {
|
for i, disk := range disks {
|
||||||
shardFilesize := erasure.ShardFileSize(test.size)
|
shardFilesize := erasure.ShardFileSize(test.size)
|
||||||
readers[i] = newBitrotReader(disk, nil, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize())
|
readers[i] = newBitrotReader(disk, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize())
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup stale disks for the test case
|
// setup stale disks for the test case
|
||||||
|
@ -186,7 +186,7 @@ func TestListOnlineDisks(t *testing.T) {
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "")
|
||||||
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||||
@ -287,7 +287,7 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
_, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "")
|
||||||
readQuorum := len(erasureDisks) / 2
|
readQuorum := len(erasureDisks) / 2
|
||||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||||
t.Fatalf("Failed to read xl meta data %v", reducedErr)
|
t.Fatalf("Failed to read xl meta data %v", reducedErr)
|
||||||
@ -295,7 +295,7 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||||||
|
|
||||||
// Test that all disks are returned without any failures with
|
// Test that all disks are returned without any failures with
|
||||||
// unmodified meta data
|
// unmodified meta data
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to read xl meta data %v", err)
|
t.Fatalf("Failed to read xl meta data %v", err)
|
||||||
}
|
}
|
||||||
|
@ -395,7 +395,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||||||
if latestMeta.XLV1 {
|
if latestMeta.XLV1 {
|
||||||
partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber))
|
partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber))
|
||||||
}
|
}
|
||||||
readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
|
readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
|
||||||
}
|
}
|
||||||
writers := make([]io.Writer, len(outDatedDisks))
|
writers := make([]io.Writer, len(outDatedDisks))
|
||||||
for i, disk := range outDatedDisks {
|
for i, disk := range outDatedDisks {
|
||||||
@ -811,7 +811,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
|||||||
storageEndpoints := er.getEndpoints()
|
storageEndpoints := er.getEndpoints()
|
||||||
|
|
||||||
// Read metadata files from all the disks
|
// Read metadata files from all the disks
|
||||||
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
|
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID)
|
||||||
|
|
||||||
if isAllNotFound(errs) {
|
if isAllNotFound(errs) {
|
||||||
err = toObjectErr(errFileNotFound, bucket, object)
|
err = toObjectErr(errFileNotFound, bucket, object)
|
||||||
|
@ -67,7 +67,7 @@ func TestHealing(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
disk := er.getDisks()[0]
|
disk := er.getDisks()[0]
|
||||||
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -84,7 +84,7 @@ func TestHealing(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -113,7 +113,7 @@ func TestHealing(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
|
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -214,7 +214,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
t.Fatalf("Failed to heal object - %v", err)
|
t.Fatalf("Failed to heal object - %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "")
|
||||||
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||||
@ -239,7 +239,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
t.Errorf("Expected nil but received %v", err)
|
t.Errorf("Expected nil but received %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "")
|
||||||
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||||
@ -265,7 +265,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
t.Errorf("Expected nil but received %v", err)
|
t.Errorf("Expected nil but received %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "")
|
||||||
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
|
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||||
|
@ -115,7 +115,7 @@ func hashOrder(key string, cardinality int) []int {
|
|||||||
|
|
||||||
// Reads all `xl.meta` metadata as a FileInfo slice.
|
// Reads all `xl.meta` metadata as a FileInfo slice.
|
||||||
// Returns error slice indicating the failed metadata reads.
|
// Returns error slice indicating the failed metadata reads.
|
||||||
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) {
|
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
||||||
metadataArray := make([]FileInfo, len(disks))
|
metadataArray := make([]FileInfo, len(disks))
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
@ -126,7 +126,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
|
|||||||
if disks[index] == nil {
|
if disks[index] == nil {
|
||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData)
|
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) {
|
if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) {
|
||||||
logger.LogOnceIf(ctx, err, disks[index].String())
|
logger.LogOnceIf(ctx, err, disks[index].String())
|
||||||
|
@ -46,7 +46,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
|
|||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false)
|
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
|
||||||
|
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -113,7 +113,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
}
|
}
|
||||||
for _, uploadIDDir := range uploadIDDirs {
|
for _, uploadIDDir := range uploadIDDirs {
|
||||||
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
||||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
|
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -127,7 +127,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, tmpDir := range tmpDirs {
|
for _, tmpDir := range tmpDirs {
|
||||||
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
|
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -181,7 +181,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
|
|||||||
if populatedUploadIds.Contains(uploadID) {
|
if populatedUploadIds.Contains(uploadID) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
|
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, toObjectErr(err, bucket, object)
|
return result, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -371,7 +371,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
|
partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
|
||||||
uploadIDPath, "", false)
|
uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||||
@ -474,7 +474,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Read metadata again because it might be updated with parallel upload of another part.
|
// Read metadata again because it might be updated with parallel upload of another part.
|
||||||
partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if reducedErr == errErasureWriteQuorum {
|
if reducedErr == errErasureWriteQuorum {
|
||||||
return pi, toObjectErr(reducedErr, bucket, object)
|
return pi, toObjectErr(reducedErr, bucket, object)
|
||||||
@ -552,7 +552,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
|||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false)
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID)
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||||
@ -600,7 +600,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
|||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||||
@ -704,7 +704,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||||
@ -889,7 +889,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
|
|||||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "", false)
|
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||||
|
@ -58,7 +58,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
|||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false)
|
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID)
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||||
@ -157,7 +157,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
|||||||
unlockOnDefer = true
|
unlockOnDefer = true
|
||||||
}
|
}
|
||||||
|
|
||||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, true, opts)
|
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toObjectErr(err, bucket, object)
|
return nil, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -298,7 +298,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||||||
}
|
}
|
||||||
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
|
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
|
||||||
partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber))
|
partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber))
|
||||||
readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset,
|
readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset,
|
||||||
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
|
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
|
||||||
|
|
||||||
// Prefer local disks
|
// Prefer local disks
|
||||||
@ -337,7 +337,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||||||
|
|
||||||
// getObject wrapper for erasure GetObject
|
// getObject wrapper for erasure GetObject
|
||||||
func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error {
|
func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error {
|
||||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, true, opts)
|
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -364,11 +364,11 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
|
|||||||
return er.getObjectInfo(ctx, bucket, object, opts)
|
return er.getObjectInfo(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, readData bool, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
|
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData)
|
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||||
|
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -410,7 +410,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||||||
|
|
||||||
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
||||||
func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, false, opts)
|
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return objInfo, toObjectErr(err, bucket, object)
|
return objInfo, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -1073,7 +1073,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
|||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
|
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||||
|
|
||||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1134,7 +1134,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
|||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
|
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||||
|
|
||||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -339,18 +339,11 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
|||||||
return erasureDisks
|
return erasureDisks
|
||||||
}
|
}
|
||||||
z.serverPools[0].erasureDisksMu.Unlock()
|
z.serverPools[0].erasureDisksMu.Unlock()
|
||||||
switch f {
|
|
||||||
case 0, 2:
|
|
||||||
// Fetch object from store.
|
// Fetch object from store.
|
||||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
||||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||||
}
|
}
|
||||||
case 1:
|
|
||||||
if err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts); err != nil {
|
|
||||||
t.Errorf("Expected GetObject to succeed, but failed with %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -528,7 +521,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false)
|
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "")
|
||||||
|
|
||||||
parts1SC := globalStorageClass
|
parts1SC := globalStorageClass
|
||||||
|
|
||||||
@ -541,7 +534,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false)
|
parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "")
|
||||||
parts2SC := globalStorageClass
|
parts2SC := globalStorageClass
|
||||||
|
|
||||||
// Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class
|
// Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class
|
||||||
@ -553,7 +546,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false)
|
parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "")
|
||||||
parts3SC := globalStorageClass
|
parts3SC := globalStorageClass
|
||||||
|
|
||||||
// Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class
|
// Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class
|
||||||
@ -571,7 +564,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false)
|
parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "")
|
||||||
parts4SC := storageclass.Config{
|
parts4SC := storageclass.Config{
|
||||||
Standard: storageclass.StorageClass{
|
Standard: storageclass.StorageClass{
|
||||||
Parity: 6,
|
Parity: 6,
|
||||||
@ -594,7 +587,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false)
|
parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "")
|
||||||
parts5SC := storageclass.Config{
|
parts5SC := storageclass.Config{
|
||||||
RRS: storageclass.StorageClass{
|
RRS: storageclass.StorageClass{
|
||||||
Parity: 2,
|
Parity: 2,
|
||||||
@ -616,7 +609,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false)
|
parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "")
|
||||||
parts6SC := storageclass.Config{
|
parts6SC := storageclass.Config{
|
||||||
RRS: storageclass.StorageClass{
|
RRS: storageclass.StorageClass{
|
||||||
Parity: 2,
|
Parity: 2,
|
||||||
@ -639,7 +632,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
t.Fatalf("Failed to putObject %v", err)
|
t.Fatalf("Failed to putObject %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false)
|
parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "")
|
||||||
parts7SC := storageclass.Config{
|
parts7SC := storageclass.Config{
|
||||||
Standard: storageclass.StorageClass{
|
Standard: storageclass.StorageClass{
|
||||||
Parity: 5,
|
Parity: 5,
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
@ -79,22 +80,19 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
|||||||
|
|
||||||
// We have written all the blocks, write the last remaining block.
|
// We have written all the blocks, write the last remaining block.
|
||||||
if write < int64(len(block)) {
|
if write < int64(len(block)) {
|
||||||
n, err := dst.Write(block[:write])
|
n, err := io.Copy(dst, bytes.NewReader(block[:write]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.ErrClosedPipe {
|
if err != io.ErrClosedPipe {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
}
|
}
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
if int64(n) != write {
|
totalWritten += n
|
||||||
return 0, io.ErrShortWrite
|
|
||||||
}
|
|
||||||
totalWritten += int64(n)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy the block.
|
// Copy the block.
|
||||||
n, err := dst.Write(block)
|
n, err := io.Copy(dst, bytes.NewReader(block))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
||||||
if err != io.ErrClosedPipe {
|
if err != io.ErrClosedPipe {
|
||||||
@ -103,15 +101,11 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if n != len(block) {
|
|
||||||
return 0, io.ErrShortWrite
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decrement output size.
|
// Decrement output size.
|
||||||
write -= int64(n)
|
write -= n
|
||||||
|
|
||||||
// Increment written.
|
// Increment written.
|
||||||
totalWritten += int64(n)
|
totalWritten += n
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success.
|
// Success.
|
||||||
|
@ -388,7 +388,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false)
|
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
time.Sleep(retryDelay)
|
time.Sleep(retryDelay)
|
||||||
retries++
|
retries++
|
||||||
@ -397,7 +397,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), false, ObjectOptions{})
|
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
|
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
|
||||||
case ObjectNotFound:
|
case ObjectNotFound:
|
||||||
@ -463,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false)
|
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
time.Sleep(retryDelay)
|
time.Sleep(retryDelay)
|
||||||
retries++
|
retries++
|
||||||
@ -471,7 +471,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Load first part metadata...
|
// Load first part metadata...
|
||||||
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), false, ObjectOptions{})
|
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
time.Sleep(retryDelay)
|
time.Sleep(retryDelay)
|
||||||
retries++
|
retries++
|
||||||
|
@ -252,11 +252,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
|
|||||||
return d.disk.DeleteVersion(ctx, volume, path, fi)
|
return d.disk.DeleteVersion(ctx, volume, path, fi)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
|
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||||
if err := d.calcError(); err != nil {
|
if err := d.calcError(); err != nil {
|
||||||
return FileInfo{}, err
|
return FileInfo{}, err
|
||||||
}
|
}
|
||||||
return d.disk.ReadVersion(ctx, volume, path, versionID, readData)
|
return d.disk.ReadVersion(ctx, volume, path, versionID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
|
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
|
||||||
|
@ -151,9 +151,6 @@ type FileInfo struct {
|
|||||||
MarkDeleted bool // mark this version as deleted
|
MarkDeleted bool // mark this version as deleted
|
||||||
DeleteMarkerReplicationStatus string
|
DeleteMarkerReplicationStatus string
|
||||||
VersionPurgeStatus VersionPurgeStatusType
|
VersionPurgeStatus VersionPurgeStatusType
|
||||||
|
|
||||||
// Data (actual data for the object)
|
|
||||||
Data []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// VersionPurgeStatusKey denotes purge status in metadata
|
// VersionPurgeStatusKey denotes purge status in metadata
|
||||||
|
@ -245,8 +245,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
|||||||
err = msgp.WrapError(err)
|
err = msgp.WrapError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if zb0001 != 18 {
|
if zb0001 != 17 {
|
||||||
err = msgp.ArrayError{Wanted: 18, Got: zb0001}
|
err = msgp.ArrayError{Wanted: 17, Got: zb0001}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
z.Volume, err = dc.ReadString()
|
z.Volume, err = dc.ReadString()
|
||||||
@ -375,18 +375,13 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
|||||||
}
|
}
|
||||||
z.VersionPurgeStatus = VersionPurgeStatusType(zb0004)
|
z.VersionPurgeStatus = VersionPurgeStatusType(zb0004)
|
||||||
}
|
}
|
||||||
z.Data, err = dc.ReadBytes(z.Data)
|
|
||||||
if err != nil {
|
|
||||||
err = msgp.WrapError(err, "Data")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeMsg implements msgp.Encodable
|
// EncodeMsg implements msgp.Encodable
|
||||||
func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||||
// array header, size 18
|
// array header, size 17
|
||||||
err = en.Append(0xdc, 0x0, 0x12)
|
err = en.Append(0xdc, 0x0, 0x11)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -494,19 +489,14 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
|||||||
err = msgp.WrapError(err, "VersionPurgeStatus")
|
err = msgp.WrapError(err, "VersionPurgeStatus")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = en.WriteBytes(z.Data)
|
|
||||||
if err != nil {
|
|
||||||
err = msgp.WrapError(err, "Data")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalMsg implements msgp.Marshaler
|
// MarshalMsg implements msgp.Marshaler
|
||||||
func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||||
o = msgp.Require(b, z.Msgsize())
|
o = msgp.Require(b, z.Msgsize())
|
||||||
// array header, size 18
|
// array header, size 17
|
||||||
o = append(o, 0xdc, 0x0, 0x12)
|
o = append(o, 0xdc, 0x0, 0x11)
|
||||||
o = msgp.AppendString(o, z.Volume)
|
o = msgp.AppendString(o, z.Volume)
|
||||||
o = msgp.AppendString(o, z.Name)
|
o = msgp.AppendString(o, z.Name)
|
||||||
o = msgp.AppendString(o, z.VersionID)
|
o = msgp.AppendString(o, z.VersionID)
|
||||||
@ -539,7 +529,6 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
|||||||
o = msgp.AppendBool(o, z.MarkDeleted)
|
o = msgp.AppendBool(o, z.MarkDeleted)
|
||||||
o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus)
|
o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus)
|
||||||
o = msgp.AppendString(o, string(z.VersionPurgeStatus))
|
o = msgp.AppendString(o, string(z.VersionPurgeStatus))
|
||||||
o = msgp.AppendBytes(o, z.Data)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -551,8 +540,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
|||||||
err = msgp.WrapError(err)
|
err = msgp.WrapError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if zb0001 != 18 {
|
if zb0001 != 17 {
|
||||||
err = msgp.ArrayError{Wanted: 18, Got: zb0001}
|
err = msgp.ArrayError{Wanted: 17, Got: zb0001}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
z.Volume, bts, err = msgp.ReadStringBytes(bts)
|
z.Volume, bts, err = msgp.ReadStringBytes(bts)
|
||||||
@ -681,11 +670,6 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
|||||||
}
|
}
|
||||||
z.VersionPurgeStatus = VersionPurgeStatusType(zb0004)
|
z.VersionPurgeStatus = VersionPurgeStatusType(zb0004)
|
||||||
}
|
}
|
||||||
z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data)
|
|
||||||
if err != nil {
|
|
||||||
err = msgp.WrapError(err, "Data")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
o = bts
|
o = bts
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -703,7 +687,7 @@ func (z *FileInfo) Msgsize() (s int) {
|
|||||||
for za0003 := range z.Parts {
|
for za0003 := range z.Parts {
|
||||||
s += z.Parts[za0003].Msgsize()
|
s += z.Parts[za0003].Msgsize()
|
||||||
}
|
}
|
||||||
s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data)
|
s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ type StorageAPI interface {
|
|||||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
|
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
|
||||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
||||||
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||||
ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error)
|
ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error)
|
||||||
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
|
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
|
||||||
|
|
||||||
// File operations.
|
// File operations.
|
||||||
|
@ -398,12 +398,11 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
|
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Set(storageRESTVolume, volume)
|
values.Set(storageRESTVolume, volume)
|
||||||
values.Set(storageRESTFilePath, path)
|
values.Set(storageRESTFilePath, path)
|
||||||
values.Set(storageRESTVersionID, versionID)
|
values.Set(storageRESTVersionID, versionID)
|
||||||
values.Set(storageRESTReadData, strconv.FormatBool(readData))
|
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -59,7 +59,6 @@ const (
|
|||||||
storageRESTDirPath = "dir-path"
|
storageRESTDirPath = "dir-path"
|
||||||
storageRESTFilePath = "file-path"
|
storageRESTFilePath = "file-path"
|
||||||
storageRESTVersionID = "version-id"
|
storageRESTVersionID = "version-id"
|
||||||
storageRESTReadData = "read-data"
|
|
||||||
storageRESTTotalVersions = "total-versions"
|
storageRESTTotalVersions = "total-versions"
|
||||||
storageRESTSrcVolume = "source-volume"
|
storageRESTSrcVolume = "source-volume"
|
||||||
storageRESTSrcPath = "source-path"
|
storageRESTSrcPath = "source-path"
|
||||||
|
@ -327,12 +327,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
|
|||||||
volume := vars[storageRESTVolume]
|
volume := vars[storageRESTVolume]
|
||||||
filePath := vars[storageRESTFilePath]
|
filePath := vars[storageRESTFilePath]
|
||||||
versionID := vars[storageRESTVersionID]
|
versionID := vars[storageRESTVersionID]
|
||||||
readData, err := strconv.ParseBool(vars[storageRESTReadData])
|
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID)
|
||||||
if err != nil {
|
|
||||||
s.writeErrorResponse(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.writeErrorResponse(w, err)
|
s.writeErrorResponse(w, err)
|
||||||
return
|
return
|
||||||
@ -521,10 +516,14 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
|
|||||||
|
|
||||||
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
||||||
|
|
||||||
|
if length >= readBlockSize {
|
||||||
bufp := s.storage.pool.Get().(*[]byte)
|
bufp := s.storage.pool.Get().(*[]byte)
|
||||||
defer s.storage.pool.Put(bufp)
|
defer s.storage.pool.Put(bufp)
|
||||||
|
|
||||||
io.CopyBuffer(w, rc, *bufp)
|
io.CopyBuffer(w, rc, *bufp)
|
||||||
|
} else {
|
||||||
|
io.Copy(w, rc)
|
||||||
|
}
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1042,7 +1041,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
|||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTReadData)...)
|
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
|
||||||
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
|
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
|
||||||
storageRESTDstVolume, storageRESTDstPath)...)
|
storageRESTDstVolume, storageRESTDstPath)...)
|
||||||
|
@ -272,12 +272,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
|
|||||||
return p.storage.WriteMetadata(ctx, volume, path, fi)
|
return p.storage.WriteMetadata(ctx, volume, path, fi)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
|
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||||
if err = p.checkDiskStale(); err != nil {
|
if err = p.checkDiskStale(); err != nil {
|
||||||
return fi, err
|
return fi, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.storage.ReadVersion(ctx, volume, path, versionID, readData)
|
return p.storage.ReadVersion(ctx, volume, path, versionID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
|
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
|
||||||
|
@ -64,9 +64,6 @@ const (
|
|||||||
// Size of each buffer.
|
// Size of each buffer.
|
||||||
readAheadBufSize = 1 << 20
|
readAheadBufSize = 1 << 20
|
||||||
|
|
||||||
// Small file threshold below which the metadata accompanies the data.
|
|
||||||
smallFileThreshold = 32 * humanize.KiByte
|
|
||||||
|
|
||||||
// XL metadata file carries per object metadata.
|
// XL metadata file carries per object metadata.
|
||||||
xlStorageFormatFile = "xl.meta"
|
xlStorageFormatFile = "xl.meta"
|
||||||
)
|
)
|
||||||
@ -1127,12 +1124,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
|
|||||||
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
|
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
|
||||||
// for all objects less than `128KiB` this call returns data as well
|
// for all objects less than `128KiB` this call returns data as well
|
||||||
// along with metadata.
|
// along with metadata.
|
||||||
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
|
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||||
volumeDir, err := s.getVolDir(volume)
|
|
||||||
if err != nil {
|
|
||||||
return fi, err
|
|
||||||
}
|
|
||||||
|
|
||||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errFileNotFound {
|
if err == errFileNotFound {
|
||||||
@ -1167,62 +1159,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||||||
return fi, errFileNotFound
|
return fi, errFileNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
fi, err = getFileInfo(buf, volume, path, versionID)
|
return getFileInfo(buf, volume, path, versionID)
|
||||||
if err != nil {
|
|
||||||
return fi, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if readData {
|
|
||||||
// Reading data for small objects when
|
|
||||||
// - object has not yet transitioned
|
|
||||||
// - object size lesser than 32KiB
|
|
||||||
// - object has maximum of 1 parts
|
|
||||||
if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size < smallFileThreshold && len(fi.Parts) == 1 {
|
|
||||||
fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number)))
|
|
||||||
if err != nil {
|
|
||||||
return fi, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fi, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err error) {
|
|
||||||
var file *os.File
|
|
||||||
if globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
|
|
||||||
file, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666)
|
|
||||||
} else {
|
|
||||||
file, err = os.Open(filePath)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
if osIsNotExist(err) {
|
|
||||||
// Check if the object doesn't exist because its bucket
|
|
||||||
// is missing in order to return the correct error.
|
|
||||||
_, err = os.Stat(volumeDir)
|
|
||||||
if err != nil && osIsNotExist(err) {
|
|
||||||
return nil, errVolumeNotFound
|
|
||||||
}
|
|
||||||
return nil, errFileNotFound
|
|
||||||
} else if osIsPermission(err) {
|
|
||||||
return nil, errFileAccessDenied
|
|
||||||
} else if isSysErrNotDir(err) || isSysErrIsDir(err) {
|
|
||||||
return nil, errFileNotFound
|
|
||||||
} else if isSysErrHandleInvalid(err) {
|
|
||||||
// This case is special and needs to be handled for windows.
|
|
||||||
return nil, errFileNotFound
|
|
||||||
} else if isSysErrIO(err) {
|
|
||||||
return nil, errFaultyDisk
|
|
||||||
} else if isSysErrTooManyFiles(err) {
|
|
||||||
return nil, errTooManyOpenFiles
|
|
||||||
} else if isSysErrInvalidArg(err) {
|
|
||||||
return nil, errFileNotFound
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
return ioutil.ReadAll(file)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadAll reads from r until an error or EOF and returns the data it read.
|
// ReadAll reads from r until an error or EOF and returns the data it read.
|
||||||
|
Loading…
Reference in New Issue
Block a user