mirror of https://github.com/minio/minio.git
fix padding error and compatible with uploaded objects (#13803)
This commit is contained in:
parent
a7c430355a
commit
7460fb8349
|
@ -282,3 +282,51 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read
|
|||
|
||||
return bytesWritten, derr
|
||||
}
|
||||
|
||||
// Heal reads from readers, reconstruct shards and writes the data to the writers.
|
||||
func (e Erasure) Heal(ctx context.Context, writers []io.Writer, readers []io.ReaderAt, totalLength int64) (derr error) {
|
||||
if len(writers) != e.parityBlocks+e.dataBlocks {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
reader := newParallelReader(readers, e, 0, totalLength)
|
||||
|
||||
startBlock := int64(0)
|
||||
endBlock := totalLength / e.blockSize
|
||||
if totalLength%e.blockSize != 0 {
|
||||
endBlock++
|
||||
}
|
||||
|
||||
var bufs [][]byte
|
||||
for block := startBlock; block < endBlock; block++ {
|
||||
var err error
|
||||
bufs, err = reader.Read(bufs)
|
||||
if len(bufs) > 0 {
|
||||
if errors.Is(err, errFileNotFound) || errors.Is(err, errFileCorrupt) {
|
||||
if derr == nil {
|
||||
derr = err
|
||||
}
|
||||
}
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = e.DecodeDataAndParityBlocks(ctx, bufs); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
w := parallelWriter{
|
||||
writers: writers,
|
||||
writeQuorum: 1,
|
||||
errs: make([]error, len(writers)),
|
||||
}
|
||||
|
||||
err = w.Write(ctx, bufs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return derr
|
||||
}
|
||||
|
|
|
@ -24,9 +24,6 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
)
|
||||
|
||||
var erasureHealTests = []struct {
|
||||
|
@ -137,15 +134,8 @@ func TestErasureHeal(t *testing.T) {
|
|||
staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
// Number of buffers, max 2GB
|
||||
n := (2 * humanize.GiByte) / (int(test.blocksize) * 2)
|
||||
|
||||
// Initialize byte pool once for all sets, bpool size is set to
|
||||
// setCount * setDriveCount with each memory upto blockSizeV2.
|
||||
bp := bpool.NewBytePoolCap(n, int(test.blocksize), int(test.blocksize)*2)
|
||||
|
||||
// test case setup is complete - now call Heal()
|
||||
err = erasure.Heal(context.Background(), readers, staleWriters, test.size, bp)
|
||||
err = erasure.Heal(context.Background(), staleWriters, readers, test.size)
|
||||
closeBitrotReaders(readers)
|
||||
closeBitrotWriters(staleWriters)
|
||||
if err != nil && !test.shouldFail {
|
||||
|
|
|
@ -457,10 +457,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||
}
|
||||
|
||||
erasureInfo := latestMeta.Erasure
|
||||
bp := er.bp
|
||||
if erasureInfo.BlockSize == blockSizeV1 {
|
||||
bp = er.bpOld
|
||||
}
|
||||
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
|
||||
partSize := latestMeta.Parts[partIndex].Size
|
||||
partActualSize := latestMeta.Parts[partIndex].ActualSize
|
||||
|
@ -491,7 +487,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
}
|
||||
err = erasure.Heal(ctx, readers, writers, partSize, bp)
|
||||
err = erasure.Heal(ctx, writers, readers, partSize)
|
||||
closeBitrotReaders(readers)
|
||||
closeBitrotWriters(writers)
|
||||
if err != nil {
|
||||
|
|
|
@ -21,6 +21,8 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
|
@ -649,3 +651,126 @@ func TestHealEmptyDirectoryErasure(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealLastDataShard(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dataSize int64
|
||||
}{
|
||||
{"4KiB", 4 * humanize.KiByte},
|
||||
{"64KiB", 64 * humanize.KiByte},
|
||||
{"128KiB", 128 * humanize.KiByte},
|
||||
{"1MiB", 1 * humanize.MiByte},
|
||||
{"5MiB", 5 * humanize.MiByte},
|
||||
{"10MiB", 10 * humanize.MiByte},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
nDisks := 16
|
||||
fsDirs, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
obj, _, err := initObjectLayer(ctx, mustGetPoolEndpoints(fsDirs...))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
bucket := "bucket"
|
||||
object := "object"
|
||||
|
||||
data := make([]byte, test.dataSize)
|
||||
_, err = rand.Read(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var opts ObjectOptions
|
||||
|
||||
err = obj.MakeBucketWithLocation(ctx, bucket, BucketOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a bucket - %v", err)
|
||||
}
|
||||
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
actualH := sha256.New()
|
||||
_, err = io.Copy(actualH, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
actualSha256 := actualH.Sum(nil)
|
||||
|
||||
z := obj.(*erasureServerPools)
|
||||
er := z.serverPools[0].getHashedSet(object)
|
||||
|
||||
disks := er.getDisks()
|
||||
distribution := hashOrder(pathJoin(bucket, object), nDisks)
|
||||
shuffledDisks := shuffleDisks(disks, distribution)
|
||||
|
||||
// remove last data shard
|
||||
err = removeAll(pathJoin(shuffledDisks[11].String(), bucket, object))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete a file - %v", err)
|
||||
}
|
||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
firstGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{})
|
||||
defer firstGr.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
firstHealedH := sha256.New()
|
||||
_, err = io.Copy(firstHealedH, firstGr)
|
||||
if err != nil {
|
||||
t.Fatal()
|
||||
}
|
||||
firstHealedDataSha256 := firstHealedH.Sum(nil)
|
||||
|
||||
if !bytes.Equal(actualSha256, firstHealedDataSha256) {
|
||||
t.Fatal("object healed wrong")
|
||||
}
|
||||
|
||||
// remove another data shard
|
||||
err = removeAll(pathJoin(shuffledDisks[1].String(), bucket, object))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete a file - %v", err)
|
||||
}
|
||||
|
||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
secondGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{})
|
||||
defer secondGr.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
secondHealedH := sha256.New()
|
||||
_, err = io.Copy(secondHealedH, secondGr)
|
||||
if err != nil {
|
||||
t.Fatal()
|
||||
}
|
||||
secondHealedDataSha256 := secondHealedH.Sum(nil)
|
||||
|
||||
if !bytes.Equal(actualSha256, secondHealedDataSha256) {
|
||||
t.Fatal("object healed wrong")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
// Heal heals the shard files on non-nil writers. Note that the quorum passed is 1
|
||||
// as healing should continue even if it has been successful healing only one shard file.
|
||||
func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error {
|
||||
r, w := xioutil.WaitPipe()
|
||||
go func() {
|
||||
_, err := e.Decode(ctx, w, readers, 0, size, size, nil)
|
||||
w.CloseWithError(err)
|
||||
}()
|
||||
|
||||
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
|
||||
var buffer []byte
|
||||
switch {
|
||||
case size == 0:
|
||||
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
||||
case size >= e.blockSize:
|
||||
buffer = bp.Get()
|
||||
defer bp.Put(buffer)
|
||||
case size < e.blockSize:
|
||||
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
|
||||
buffer = make([]byte, size, 2*size+int64(e.parityBlocks+e.dataBlocks-1))
|
||||
}
|
||||
|
||||
// quorum is 1 because CreateFile should continue writing as long as we are writing to even 1 disk.
|
||||
n, err := e.Encode(ctx, r, writers, buffer, 1)
|
||||
if err == nil && n != size {
|
||||
logger.LogIf(ctx, errLessData)
|
||||
err = errLessData
|
||||
}
|
||||
r.CloseWithError(err)
|
||||
return err
|
||||
}
|
2
go.mod
2
go.mod
|
@ -40,7 +40,7 @@ require (
|
|||
github.com/klauspost/cpuid/v2 v2.0.9
|
||||
github.com/klauspost/pgzip v1.2.5
|
||||
github.com/klauspost/readahead v1.3.1
|
||||
github.com/klauspost/reedsolomon v1.9.13
|
||||
github.com/klauspost/reedsolomon v1.9.15
|
||||
github.com/lib/pq v1.9.0
|
||||
github.com/miekg/dns v1.1.43
|
||||
github.com/minio/cli v1.22.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -953,8 +953,8 @@ github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE
|
|||
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
|
||||
github.com/klauspost/readahead v1.3.1 h1:QqXNYvm+VvqYcbrRT4LojUciM0XrznFRIDrbHiJtu/0=
|
||||
github.com/klauspost/readahead v1.3.1/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg=
|
||||
github.com/klauspost/reedsolomon v1.9.13 h1:Xr0COKf7F0ACTXUNnz2ZFCWlUKlUTAUX3y7BODdUxqU=
|
||||
github.com/klauspost/reedsolomon v1.9.13/go.mod h1:eqPAcE7xar5CIzcdfwydOEdcmchAKAP/qs14y4GCBOk=
|
||||
github.com/klauspost/reedsolomon v1.9.15 h1:g2erWKD2M6rgnPf89fCji6jNlhMKMdXcuNHMW1SYCIo=
|
||||
github.com/klauspost/reedsolomon v1.9.15/go.mod h1:eqPAcE7xar5CIzcdfwydOEdcmchAKAP/qs14y4GCBOk=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
|
|
Loading…
Reference in New Issue