XL: Change AppendFile() to return only error (#1932)

AppendFile ensures that it appends the entire buffer. Returns
an error otherwise, this patch removes the necessity for the
caller to look for 'n' return on short writes.

Ref #1893
This commit is contained in:
Harshavardhana 2016-06-19 15:31:13 -07:00 committed by Anand Babu (AB) Periasamy
parent e1aad066c6
commit 50d25ca94a
11 changed files with 35 additions and 87 deletions

View File

@ -137,15 +137,11 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist
defer wg.Done()
// Pick the block from the distribution.
blockIndex := distribution[index] - 1
n, wErr := disk.AppendFile(volume, path, enBlocks[blockIndex])
wErr := disk.AppendFile(volume, path, enBlocks[blockIndex])
if wErr != nil {
wErrs[index] = wErr
return
}
if n != int64(len(enBlocks[blockIndex])) {
wErrs[index] = errUnexpected
return
}
// Calculate hash for each blocks.
hashWriters[blockIndex].Write(enBlocks[blockIndex])

View File

@ -483,7 +483,7 @@ func healFormatXL(bootstrapDisks []StorageAPI) error {
return err
}
// Fresh disk without format.json
_, _ = bootstrapDisks[index].AppendFile(minioMetaBucket, formatConfigFile, formatBytes)
_ = bootstrapDisks[index].AppendFile(minioMetaBucket, formatConfigFile, formatBytes)
// Ignore any error from AppendFile() as
// quorum might still be there to be operational.
}
@ -605,13 +605,9 @@ func initFormatXL(storageDisks []StorageAPI) (err error) {
if err != nil {
return err
}
n, err := disk.AppendFile(minioMetaBucket, formatConfigFile, formatBytes)
if err != nil {
if err = disk.AppendFile(minioMetaBucket, formatConfigFile, formatBytes); err != nil {
return err
}
if n != int64(len(formatBytes)) {
return errUnexpected
}
}
return nil
}

View File

@ -97,13 +97,9 @@ func writeFSFormatData(storage StorageAPI, fsFormat formatConfigV1) error {
return err
}
// fsFormatJSONFile - format.json file stored in minioMetaBucket(.minio) directory.
n, err := storage.AppendFile(minioMetaBucket, fsFormatJSONFile, metadataBytes)
if err != nil {
if err = storage.AppendFile(minioMetaBucket, fsFormatJSONFile, metadataBytes); err != nil {
return err
}
if n != int64(len(metadataBytes)) {
return errUnexpected
}
return nil
}
@ -113,12 +109,8 @@ func (fs fsObjects) writeFSMetadata(bucket, prefix string, fsMeta fsMetaV1) erro
if err != nil {
return err
}
n, err := fs.storage.AppendFile(bucket, path.Join(prefix, fsMetaJSONFile), metadataBytes)
if err != nil {
if err = fs.storage.AppendFile(bucket, path.Join(prefix, fsMetaJSONFile), metadataBytes); err != nil {
return err
}
if n != int64(len(metadataBytes)) {
return errUnexpected
}
return nil
}

View File

@ -310,13 +310,9 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
// Update md5 writer.
md5Writer.Write(buf[:n])
m, err := fs.storage.AppendFile(minioMetaBucket, tmpPartPath, buf[:n])
if err != nil {
if err = fs.storage.AppendFile(minioMetaBucket, tmpPartPath, buf[:n]); err != nil {
return "", toObjectErr(err, bucket, object)
}
if m != int64(len(buf[:n])) {
return "", toObjectErr(errUnexpected, bucket, object)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
@ -520,8 +516,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
return "", toObjectErr(err, minioMetaBucket, multipartPartFile)
}
n, err = fs.storage.AppendFile(minioMetaBucket, tempObj, buffer[:n])
if err != nil {
if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buffer[:n]); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempObj)
}
offset += n

View File

@ -270,7 +270,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
if size == 0 {
// For size 0 we write a 0byte file.
_, err := fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
err := fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
if err != nil {
return "", toObjectErr(err, bucket, object)
}
@ -287,13 +287,10 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
if n > 0 {
// Update md5 writer.
md5Writer.Write(buf[:n])
m, wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n])
wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n])
if wErr != nil {
return "", toObjectErr(wErr, bucket, object)
}
if m != int64(n) {
return "", toObjectErr(errUnexpected, bucket, object)
}
}
if rErr == io.EOF {
break

View File

@ -430,7 +430,7 @@ func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n
// AppendFile - append a byte array at path, if file doesn't exist at
// path this call explicitly creates it.
func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) {
func (s posix) AppendFile(volume, path string, buf []byte) (err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
@ -438,54 +438,55 @@ func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error)
}()
if s.ioErrCount > maxAllowedIOError {
return 0, errFaultyDisk
return errFaultyDisk
}
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return 0, err
return err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return 0, err
return err
}
// Stat a volume entry.
_, err = os.Stat(preparePath(volumeDir))
if err != nil {
if os.IsNotExist(err) {
return 0, errVolumeNotFound
return errVolumeNotFound
}
return 0, err
return err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return 0, err
return err
}
// Verify if the file already exists and is not of regular type.
var st os.FileInfo
if st, err = os.Stat(preparePath(filePath)); err == nil {
if st.IsDir() {
return 0, errIsNotRegular
return errIsNotRegular
}
}
// Create top level directories if they don't exist.
if err = mkdirAll(filepath.Dir(filePath), 0700); err != nil {
return 0, err
return err
}
w, err := os.OpenFile(preparePath(filePath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
// File path cannot be verified since one of the parents is a file.
if strings.Contains(err.Error(), "not a directory") {
return 0, errFileAccessDenied
return errFileAccessDenied
}
return 0, err
return err
}
// Close upon return.
defer w.Close()
// Return io.Copy
return io.Copy(w, bytes.NewReader(buf))
_, err = io.Copy(w, bytes.NewReader(buf))
return err
}
// StatFile - get file info.

View File

@ -145,15 +145,16 @@ func (n networkStorage) DeleteVol(volume string) error {
// File operations.
// CreateFile - create file.
func (n networkStorage) AppendFile(volume, path string, buffer []byte) (m int64, err error) {
func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) {
reply := GenericReply{}
if err = n.rpcClient.Call("Storage.AppendFileHandler", AppendFileArgs{
Vol: volume,
Path: path,
Buffer: buffer,
}, &m); err != nil {
return 0, toStorageErr(err)
}, &reply); err != nil {
return toStorageErr(err)
}
return m, nil
return nil
}
// StatFile - get latest Stat information for a file at path.

View File

@ -86,31 +86,18 @@ func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *int64) error {
}
// AppendFileHandler - append file handler is rpc wrapper to append file.
func (s *storageServer) AppendFileHandler(arg *AppendFileArgs, reply *int64) error {
n, err := s.storage.AppendFile(arg.Vol, arg.Path, arg.Buffer)
if err != nil {
return err
}
reply = &n
return nil
func (s *storageServer) AppendFileHandler(arg *AppendFileArgs, reply *GenericReply) error {
return s.storage.AppendFile(arg.Vol, arg.Path, arg.Buffer)
}
// DeleteFileHandler - delete file handler is rpc wrapper to delete file.
func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error {
err := s.storage.DeleteFile(arg.Vol, arg.Path)
if err != nil {
return err
}
return nil
return s.storage.DeleteFile(arg.Vol, arg.Path)
}
// RenameFileHandler - rename file handler is rpc wrapper to rename file.
func (s *storageServer) RenameFileHandler(arg *RenameFileArgs, reply *GenericReply) error {
err := s.storage.RenameFile(arg.SrcVol, arg.SrcPath, arg.DstVol, arg.DstPath)
if err != nil {
return err
}
return nil
return s.storage.RenameFile(arg.SrcVol, arg.SrcPath, arg.DstVol, arg.DstPath)
}
// Initialize new storage rpc.

View File

@ -27,7 +27,7 @@ type StorageAPI interface {
// File operations.
ListDir(volume, dirPath string) ([]string, error)
ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error)
AppendFile(volume string, path string, buf []byte) (n int64, err error)
AppendFile(volume string, path string, buf []byte) (err error)
RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error
StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error)

View File

@ -259,14 +259,7 @@ func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) er
return err
}
// Persist marshalled data.
n, err := disk.AppendFile(bucket, jsonFile, metadataBytes)
if err != nil {
return err
}
if n != int64(len(metadataBytes)) {
return errUnexpected
}
return nil
return disk.AppendFile(bucket, jsonFile, metadataBytes)
}
// deleteAllXLMetadata - deletes all partially written `xl.json` depending on errs.

View File

@ -105,15 +105,10 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk
errs[index] = wErr
return
}
n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes)
if wErr != nil {
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil {
errs[index] = wErr
return
}
if n != int64(len(uploadsBytes)) {
errs[index] = errUnexpected
return
}
if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil {
errs[index] = wErr
return
@ -219,15 +214,10 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
return
}
// Write `uploads.json` to disk.
n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes)
if wErr != nil {
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil {
errs[index] = wErr
return
}
if n != int64(len(uploadsJSONBytes)) {
errs[index] = errUnexpected
return
}
wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
if wErr != nil {
if dErr := disk.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil {