Add support of fallocate for FS and XL backends (#3032)

This commit is contained in:
Anis Elleuch 2016-10-29 20:44:44 +01:00 committed by Harshavardhana
parent 0b3282ac9f
commit a47ce7ab22
17 changed files with 415 additions and 50 deletions

25
cmd/fallocate.go Normal file
View File

@ -0,0 +1,25 @@
// +build !linux
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
// Fallocate is not POSIX and not supported under Windows
// Always return successful
func Fallocate(fd int, offset int64, len int64) error {
return nil
}

31
cmd/fallocate_linux.go Normal file
View File

@ -0,0 +1,31 @@
// +build linux
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "syscall"
// Fallocate uses the linux Fallocate syscall, which helps us to be
// sure that subsequent writes on a file just created will not fail,
// in addition, file allocation will be contigous on the disk
func Fallocate(fd int, offset int64, len int64) error {
return syscall.Fallocate(fd,
1, // FALLOC_FL_KEEP_SIZE
offset,
len)
}

View File

@ -450,6 +450,15 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
bufSize = size bufSize = size
} }
buf := make([]byte, int(bufSize)) buf := make([]byte, int(bufSize))
if size > 0 {
// Prepare file to avoid disk fragmentation
err := fs.storage.PrepareFile(minioMetaBucket, tmpPartPath, size)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
}
}
bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tmpPartPath) bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tmpPartPath)
if cErr != nil { if cErr != nil {
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath) fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
@ -599,6 +608,18 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
} }
func (fs fsObjects) totalObjectSize(fsMeta fsMetaV1, parts []completePart) (int64, error) {
objSize := int64(0)
for _, part := range parts {
partIdx := fsMeta.ObjectPartIndex(part.PartNumber)
if partIdx == -1 {
return 0, InvalidPart{}
}
objSize += fsMeta.Parts[partIdx].Size
}
return objSize, nil
}
// CompleteMultipartUpload - completes an ongoing multipart // CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client. // transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual // Returns an md5sum calculated by concatenating all the individual
@ -668,6 +689,19 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// Allocate staging buffer. // Allocate staging buffer.
var buf = make([]byte, readSizeV1) var buf = make([]byte, readSizeV1)
var objSize int64
objSize, err = fs.totalObjectSize(fsMeta, parts)
if err != nil {
return "", traceError(err)
}
if objSize > 0 {
// Prepare file to avoid disk fragmentation
err = fs.storage.PrepareFile(minioMetaBucket, tempObj, objSize)
if err != nil {
return "", traceError(err)
}
}
// Loop through all parts, validate them and then commit to disk. // Loop through all parts, validate them and then commit to disk.
for i, part := range parts { for i, part := range parts {

View File

@ -90,13 +90,18 @@ func TestPutObjectPartFaultyDisk(t *testing.T) {
for i := 1; i <= 7; i++ { for i := 1; i <= 7; i++ {
// Faulty disk generates errFaultyDisk at 'i' storage api call number // Faulty disk generates errFaultyDisk at 'i' storage api call number
fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
if _, err := fs.PutObjectPart(bucketName, objectName, uploadID, 1, dataLen, bytes.NewReader(data), md5Hex, sha256sum); errorCause(err) != errFaultyDisk { md5sum, err := fs.PutObjectPart(bucketName, objectName, uploadID, 1, dataLen, bytes.NewReader(data), md5Hex, sha256sum)
if errorCause(err) != errFaultyDisk {
if errorCause(err) == nil {
t.Fatalf("Test %d shouldn't succeed, md5sum = %s\n", i, md5sum)
}
switch i { switch i {
case 1: case 1:
if !isSameType(errorCause(err), BucketNotFound{}) { if !isSameType(errorCause(err), BucketNotFound{}) {
t.Fatal("Unexpected error ", err) t.Fatal("Unexpected error ", err)
} }
case 2, 4: case 3:
case 2, 4, 5:
if !isSameType(errorCause(err), InvalidUploadID{}) { if !isSameType(errorCause(err), InvalidUploadID{}) {
t.Fatal("Unexpected error ", err) t.Fatal("Unexpected error ", err)
} }

View File

@ -388,6 +388,15 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object) return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
} }
} else { } else {
// Prepare file to avoid disk fragmentation
if size > 0 {
err = fs.storage.PrepareFile(minioMetaBucket, tempObj, size)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
}
// Allocate a buffer to Read() from request body // Allocate a buffer to Read() from request body
bufSize := int64(readSizeV1) bufSize := int64(readSizeV1)
if size > 0 && bufSize > size { if size > 0 && bufSize > size {

View File

@ -108,6 +108,13 @@ func (d *naughtyDisk) ReadFile(volume string, path string, offset int64, buf []b
return d.disk.ReadFile(volume, path, offset, buf) return d.disk.ReadFile(volume, path, offset, buf)
} }
func (d *naughtyDisk) PrepareFile(volume, path string, length int64) error {
if err := d.calcError(); err != nil {
return err
}
return d.disk.PrepareFile(volume, path, length)
}
func (d *naughtyDisk) AppendFile(volume, path string, buf []byte) error { func (d *naughtyDisk) AppendFile(volume, path string, buf []byte) error {
if err := d.calcError(); err != nil { if err := d.calcError(); err != nil {
return err return err

View File

@ -22,6 +22,26 @@ import (
"syscall" "syscall"
) )
// Function not implemented error
func isSysErrNoSys(err error) bool {
return err != nil && err == syscall.ENOSYS
}
// Not supported error
func isSysErrOpNotSupported(err error) bool {
return err != nil && err == syscall.EOPNOTSUPP
}
// No space left on device error
func isSysErrNoSpace(err error) bool {
return err != nil && err == syscall.ENOSPC
}
// Input/output error
func isSysErrIO(err error) bool {
return err != nil && err == syscall.EIO
}
// Check if the given error corresponds to ENOTDIR (is not a directory) // Check if the given error corresponds to ENOTDIR (is not a directory)
func isSysErrNotDir(err error) bool { func isSysErrNotDir(err error) bool {
if pathErr, ok := err.(*os.PathError); ok { if pathErr, ok := err.(*os.PathError); ok {

View File

@ -546,6 +546,121 @@ func (s *posix) ReadFile(volume string, path string, offset int64, buf []byte) (
return int64(m), err return int64(m), err
} }
func (s *posix) createFile(volume, path string) (f *os.File, err error) {
defer func() {
if err == syscall.EIO {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if s.ioErrCount > maxAllowedIOError {
return nil, errFaultyDisk
}
// Validate if disk is free.
if err = s.checkDiskFree(); err != nil {
return nil, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(preparePath(volumeDir))
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
}
return nil, err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return nil, err
}
// Verify if the file already exists and is not of regular type.
var st os.FileInfo
if st, err = os.Stat(preparePath(filePath)); err == nil {
if !st.Mode().IsRegular() {
return nil, errIsNotRegular
}
} else {
// Create top level directories if they don't exist.
// with mode 0777 mkdir honors system umask.
if err = mkdirAll(preparePath(slashpath.Dir(filePath)), 0777); err != nil {
// File path cannot be verified since one of the parents is a file.
if isSysErrNotDir(err) {
return nil, errFileAccessDenied
} else if isSysErrPathNotFound(err) {
// Add specific case for windows.
return nil, errFileAccessDenied
}
return nil, err
}
}
w, err := os.OpenFile(preparePath(filePath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
// File path cannot be verified since one of the parents is a file.
if isSysErrNotDir(err) {
return nil, errFileAccessDenied
}
return nil, err
}
return w, nil
}
// PrepareFile - run prior actions before creating a new file for optimization purposes
// Currenty we use fallocate when available to avoid disk fragmentation as much as possible
func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
// It doesn't make sense to create a negative-sized file
if fileSize <= 0 {
return errInvalidArgument
}
defer func() {
if err == syscall.EIO {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if s.ioErrCount > maxAllowedIOError {
return errFaultyDisk
}
// Create file if not found
w, err := s.createFile(volume, path)
if err != nil {
return err
}
// Close upon return.
defer w.Close()
// Allocate needed disk space to append data
e := Fallocate(int(w.Fd()), 0, fileSize)
// Ignore errors when Fallocate is not supported in the current system
if e != nil && !isSysErrNoSys(e) && !isSysErrOpNotSupported(e) {
switch {
case isSysErrNoSpace(e):
err = errDiskFull
case isSysErrIO(e):
err = e
default:
// For errors: EBADF, EINTR, EINVAL, ENODEV, EPERM, ESPIPE and ETXTBSY
// Appending was failed anyway, returns unexpected error
err = errUnexpected
}
return err
}
return nil
}
// AppendFile - append a byte array at path, if file doesn't exist at // AppendFile - append a byte array at path, if file doesn't exist at
// path this call explicitly creates it. // path this call explicitly creates it.
func (s *posix) AppendFile(volume, path string, buf []byte) (err error) { func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
@ -559,57 +674,11 @@ func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
return errFaultyDisk return errFaultyDisk
} }
// Validate if disk is free. // Create file if not found
if err = s.checkDiskFree(); err != nil { w, err := s.createFile(volume, path)
return err
}
volumeDir, err := s.getVolDir(volume)
if err != nil { if err != nil {
return err return err
} }
// Stat a volume entry.
_, err = os.Stat(preparePath(volumeDir))
if err != nil {
if os.IsNotExist(err) {
return errVolumeNotFound
}
return err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return err
}
// Verify if the file already exists and is not of regular type.
var st os.FileInfo
if st, err = os.Stat(preparePath(filePath)); err == nil {
if !st.Mode().IsRegular() {
return errIsNotRegular
}
}
// Create top level directories if they don't exist.
// with mode 0777 mkdir honors system umask.
if err = mkdirAll(preparePath(slashpath.Dir(filePath)), 0777); err != nil {
// File path cannot be verified since one of the parents is a file.
if isSysErrNotDir(err) {
return errFileAccessDenied
} else if isSysErrPathNotFound(err) {
// Add specific case for windows.
return errFileAccessDenied
}
return err
}
// Creates the named file with mode 0666 (before umask), or starts appending
// to an existig file.
w, err := os.OpenFile(preparePath(filePath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
// File path cannot be verified since one of the parents is a file.
if isSysErrNotDir(err) {
return errFileAccessDenied
}
return err
}
// Close upon return. // Close upon return.
defer w.Close() defer w.Close()

View File

@ -1126,6 +1126,100 @@ func TestAppendFile(t *testing.T) {
} }
} }
// Test posix.PrepareFile()
func TestPrepareFile(t *testing.T) {
// create posix test setup
posixStorage, path, err := newPosixTestSetup()
if err != nil {
t.Fatalf("Unable to create posix test setup, %s", err)
}
defer removeAll(path)
// Setup test environment.
if err = posixStorage.MakeVol("success-vol"); err != nil {
t.Fatalf("Unable to create volume, %s", err)
}
if err = os.Mkdir(slashpath.Join(path, "success-vol", "object-as-dir"), 0777); err != nil {
t.Fatalf("Unable to create directory, %s", err)
}
testCases := []struct {
fileName string
expectedErr error
}{
{"myobject", nil},
{"path/to/my/object", nil},
// Test to append to previously created file.
{"myobject", nil},
// Test to use same path of previously created file.
{"path/to/my/testobject", nil},
{"object-as-dir", errIsNotRegular},
// path segment uses previously uploaded object.
{"myobject/testobject", errFileAccessDenied},
// One path segment length is > 255 chars long.
{"path/to/my/object0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001", errFileNameTooLong},
}
// Add path length > 1024 test specially as OS X system does not support 1024 long path.
err = errFileNameTooLong
if runtime.GOOS != "darwin" {
err = nil
}
// path length is 1024 chars long.
testCases = append(testCases, struct {
fileName string
expectedErr error
}{"level0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001/level0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002/level0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003/object000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001", err})
for _, testCase := range testCases {
if err = posixStorage.PrepareFile("success-vol", testCase.fileName, 16); err != testCase.expectedErr {
t.Errorf("Case: %s, expected: %s, got: %s", testCase, testCase.expectedErr, err)
}
}
// Test for permission denied.
if runtime.GOOS != "windows" {
// Initialize posix storage layer for permission denied error.
posixStorage, err = newPosix("/usr")
if err != nil {
t.Fatalf("Unable to initialize posix, %s", err)
}
if err = posixStorage.PrepareFile("bin", "yes", 16); !os.IsPermission(err) {
t.Errorf("expected: Permission error, got: %s", err)
}
}
// Test case with invalid file size which should be strictly positive
err = posixStorage.PrepareFile("bn", "yes", -3)
if err != errInvalidArgument {
t.Fatalf("should fail: %v", err)
}
// Test case with invalid volume name.
// A valid volume name should be atleast of size 3.
err = posixStorage.PrepareFile("bn", "yes", 16)
if err != errInvalidArgument {
t.Fatalf("expected: \"Invalid argument error\", got: \"%s\"", err)
}
// Test case with IO error count > max limit.
// setting ioErrCnt to 6.
// should fail with errFaultyDisk.
if posixType, ok := posixStorage.(*posix); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(6)
err = posixType.PrepareFile("abc", "yes", 16)
if err != errFaultyDisk {
t.Fatalf("Expected \"Faulty Disk\", got: \"%s\"", err)
}
} else {
t.Fatalf("Expected the StorageAPI to be of type *posix")
}
}
// Test posix.RenameFile() // Test posix.RenameFile()
func TestRenameFile(t *testing.T) { func TestRenameFile(t *testing.T) {
// create posix test setup // create posix test setup

View File

@ -35,6 +35,7 @@ type StorageAPI interface {
// File operations. // File operations.
ListDir(volume, dirPath string) ([]string, error) ListDir(volume, dirPath string) ([]string, error)
ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error)
PrepareFile(volume string, path string, len int64) (err error)
AppendFile(volume string, path string, buf []byte) (err error) AppendFile(volume string, path string, buf []byte) (err error)
RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error
StatFile(volume string, path string) (file FileInfo, err error) StatFile(volume string, path string) (file FileInfo, err error)

View File

@ -186,6 +186,18 @@ func (n networkStorage) DeleteVol(volume string) error {
// File operations. // File operations.
func (n networkStorage) PrepareFile(volume, path string, length int64) (err error) {
reply := GenericReply{}
if err = n.rpcClient.Call("Storage.PrepareFileHandler", &PrepareFileArgs{
Vol: volume,
Path: path,
Size: length,
}, &reply); err != nil {
return toStorageErr(err)
}
return nil
}
// CreateFile - create file. // CreateFile - create file.
func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) {
reply := GenericReply{} reply := GenericReply{}

View File

@ -243,6 +243,10 @@ func (s *TestRPCStorageSuite) testRPCStorageFileOps(t *testing.T) {
if err != nil { if err != nil {
t.Error("Unable to initiate MakeVol", err) t.Error("Unable to initiate MakeVol", err)
} }
err = storageDisk.PrepareFile("myvol", "file1", int64(len([]byte("Hello, world"))))
if err != nil {
t.Error("Unable to initiate AppendFile", err)
}
err = storageDisk.AppendFile("myvol", "file1", []byte("Hello, world")) err = storageDisk.AppendFile("myvol", "file1", []byte("Hello, world"))
if err != nil { if err != nil {
t.Error("Unable to initiate AppendFile", err) t.Error("Unable to initiate AppendFile", err)

View File

@ -61,6 +61,21 @@ type ReadFileArgs struct {
Size int Size int
} }
// PrepareFileArgs represents append file RPC arguments.
type PrepareFileArgs struct {
// Authentication token generated by Login.
GenericArgs
// Name of the volume.
Vol string
// Name of the path.
Path string
// Size of the file to be prepared
Size int64
}
// AppendFileArgs represents append file RPC arguments. // AppendFileArgs represents append file RPC arguments.
type AppendFileArgs struct { type AppendFileArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.

View File

@ -180,6 +180,14 @@ func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err
return err return err
} }
// PrepareFileHandler - prepare file handler is rpc wrapper to prepare file.
func (s *storageServer) PrepareFileHandler(args *PrepareFileArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
return s.storage.PrepareFile(args.Vol, args.Path, args.Size)
}
// AppendFileHandler - append file handler is rpc wrapper to append file. // AppendFileHandler - append file handler is rpc wrapper to append file.
func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *GenericReply) error { func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {

View File

@ -69,3 +69,16 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
} // Exhausted all disks - return false. } // Exhausted all disks - return false.
return false return false
} }
// Calculate the space occupied by an object in a single disk
func (xl xlObjects) sizeOnDisk(fileSize int64, blockSize int64, dataBlocks int) int64 {
numBlocks := fileSize / blockSize
chunkSize := getChunkSize(blockSize, dataBlocks)
sizeInDisk := numBlocks * chunkSize
remaining := fileSize % blockSize
if remaining > 0 {
sizeInDisk += getChunkSize(remaining, dataBlocks)
}
return sizeInDisk
}

View File

@ -424,6 +424,15 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete. // Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
defer xl.deleteObject(minioMetaBucket, tmpPartPath) defer xl.deleteObject(minioMetaBucket, tmpPartPath)
if size > 0 {
for _, disk := range onlineDisks {
if disk != nil {
actualSize := xl.sizeOnDisk(size, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
disk.PrepareFile(minioMetaBucket, tmpPartPath, actualSize)
}
}
}
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum)
if err != nil { if err != nil {

View File

@ -448,6 +448,15 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// delete. // delete.
defer xl.deleteObject(minioMetaTmpBucket, tempObj) defer xl.deleteObject(minioMetaTmpBucket, tempObj)
if size > 0 {
for _, disk := range onlineDisks {
if disk != nil {
actualSize := xl.sizeOnDisk(size, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
disk.PrepareFile(minioMetaBucket, tempErasureObj, actualSize)
}
}
}
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum)
if err != nil { if err != nil {