atomic/fs: use safe package for atomic writes, even in multipart.

This commit is contained in:
Harshavardhana 2016-04-06 16:05:30 -07:00
parent dfba4ff397
commit ff4e04d942
6 changed files with 100 additions and 65 deletions

View File

@ -68,7 +68,6 @@ checkdocker:
getdeps: checks getdeps: checks
@go get -u github.com/golang/lint/golint && echo "Installed golint:" @go get -u github.com/golang/lint/golint && echo "Installed golint:"
@go get -u golang.org/x/tools/cmd/vet && echo "Installed vet:"
@go get -u github.com/fzipp/gocyclo && echo "Installed gocyclo:" @go get -u github.com/fzipp/gocyclo && echo "Installed gocyclo:"
@go get -u github.com/remyoudompheng/go-misc/deadcode && echo "Installed deadcode:" @go get -u github.com/remyoudompheng/go-misc/deadcode && echo "Installed deadcode:"
@go get -u github.com/client9/misspell/cmd/misspell && echo "Installed misspell:" @go get -u github.com/client9/misspell/cmd/misspell && echo "Installed misspell:"

View File

@ -31,6 +31,7 @@ import (
"github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
"github.com/skyrings/skyring-common/tools/uuid" "github.com/skyrings/skyring-common/tools/uuid"
) )
@ -57,38 +58,41 @@ func removeFileTree(fileName string, level string) error {
return nil return nil
} }
func safeRemoveFile(file *os.File) error { // Takes an input stream and safely writes to disk, additionally
if e := file.Close(); e != nil { // verifies checksum.
return e
}
return os.Remove(file.Name())
}
func safeWriteFile(fileName string, data io.Reader, size int64, md5sum string) error { func safeWriteFile(fileName string, data io.Reader, size int64, md5sum string) error {
tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") safeFile, e := safe.CreateFileWithSuffix(fileName, "-")
if e != nil { if e != nil {
return e return e
} }
md5Hasher := md5.New() md5Hasher := md5.New()
multiWriter := io.MultiWriter(md5Hasher, tempFile) multiWriter := io.MultiWriter(md5Hasher, safeFile)
if _, e := io.CopyN(multiWriter, data, size); e != nil { if size > 0 {
safeRemoveFile(tempFile) if _, e = io.CopyN(multiWriter, data, size); e != nil {
return e // Closes the file safely and removes it in a single atomic operation.
safeFile.CloseAndRemove()
return e
}
} else {
if _, e = io.Copy(multiWriter, data); e != nil {
// Closes the file safely and removes it in a single atomic operation.
safeFile.CloseAndRemove()
return e
}
} }
tempFile.Close()
dataMd5sum := hex.EncodeToString(md5Hasher.Sum(nil)) dataMd5sum := hex.EncodeToString(md5Hasher.Sum(nil))
if md5sum != "" && !isMD5SumEqual(md5sum, dataMd5sum) { if md5sum != "" && !isMD5SumEqual(md5sum, dataMd5sum) {
os.Remove(tempFile.Name()) // Closes the file safely and removes it in a single atomic operation.
safeFile.CloseAndRemove()
return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum} return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum}
} }
if e := os.Rename(tempFile.Name(), fileName); e != nil { // Safely close the file and atomically renames it the actual filePath.
os.Remove(tempFile.Name()) safeFile.Close()
return e
}
// Safely wrote the file.
return nil return nil
} }
@ -290,11 +294,11 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i
return "", probe.NewError(e) return "", probe.NewError(e)
} }
partFile := filepath.Join(fs.path, configDir, bucket, object, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex)) partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex)
if e := safeWriteFile(partFile, data, size, md5Hex); e != nil { partFilePath := filepath.Join(fs.path, configDir, bucket, object, partSuffix)
if e := safeWriteFile(partFilePath, data, size, md5Hex); e != nil {
return "", probe.NewError(e) return "", probe.NewError(e)
} }
return md5Hex, nil return md5Hex, nil
} }
@ -360,7 +364,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa
return ObjectInfo{}, err.Trace(md5Sums...) return ObjectInfo{}, err.Trace(md5Sums...)
} }
tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.") completeObjectFile := filepath.Join(metaObjectDir, uploadID+".complete.")
safeFile, e := safe.CreateFileWithSuffix(completeObjectFile, "-")
if e != nil { if e != nil {
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
@ -373,18 +378,21 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa
var partFile *os.File var partFile *os.File
partFile, e = os.Open(partFileStr) partFile, e = os.Open(partFileStr)
if e != nil { if e != nil {
safeRemoveFile(tempFile) // Remove the complete file safely.
safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} else if _, e = io.Copy(tempFile, partFile); e != nil { } else if _, e = io.Copy(safeFile, partFile); e != nil {
safeRemoveFile(tempFile) // Remove the complete file safely.
safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
partFile.Close() // Close part file after successful copy. partFile.Close() // Close part file after successful copy.
} }
tempFile.Close() // All parts concatenated, safely close the temp file.
safeFile.Close()
// Stat to gather fresh stat info. // Stat to gather fresh stat info.
objSt, e := os.Stat(tempFile.Name()) objSt, e := os.Stat(completeObjectFile)
if e != nil { if e != nil {
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
@ -392,11 +400,11 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa
bucketPath := filepath.Join(fs.path, bucket) bucketPath := filepath.Join(fs.path, bucket)
objectPath := filepath.Join(bucketPath, object) objectPath := filepath.Join(bucketPath, object)
if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil { if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil {
os.Remove(tempFile.Name()) os.Remove(completeObjectFile)
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
if e = os.Rename(tempFile.Name(), objectPath); e != nil { if e = os.Rename(completeObjectFile, objectPath); e != nil {
os.Remove(tempFile.Name()) os.Remove(completeObjectFile)
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }

View File

@ -27,10 +27,10 @@ import (
"encoding/hex" "encoding/hex"
"runtime" "runtime"
"github.com/minio/minio/pkg/atomic"
"github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
) )
// isDirEmpty - returns whether given directory is empty or not. // isDirEmpty - returns whether given directory is empty or not.
@ -239,7 +239,7 @@ func (fs Filesystem) PutObject(bucket string, object string, size int64, data io
} }
// Write object. // Write object.
file, e := atomic.FileCreateWithPrefix(objectPath, md5Hex+"$tmpobject") safeFile, e := safe.CreateFileWithPrefix(objectPath, md5Hex+"$tmpobject")
if e != nil { if e != nil {
switch e := e.(type) { switch e := e.(type) {
case *os.PathError: case *os.PathError:
@ -253,23 +253,22 @@ func (fs Filesystem) PutObject(bucket string, object string, size int64, data io
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
} }
defer file.Close()
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
// Instantiate a new multi writer. // Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, file) multiWriter := io.MultiWriter(md5Writer, safeFile)
// Instantiate checksum hashers and create a multiwriter. // Instantiate checksum hashers and create a multiwriter.
if size > 0 { if size > 0 {
if _, e = io.CopyN(multiWriter, data, size); e != nil { if _, e = io.CopyN(multiWriter, data, size); e != nil {
file.CloseAndPurge() safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
} else { } else {
if _, e = io.Copy(multiWriter, data); e != nil { if _, e = io.Copy(multiWriter, data); e != nil {
file.CloseAndPurge() safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
} }
@ -282,7 +281,7 @@ func (fs Filesystem) PutObject(bucket string, object string, size int64, data io
} }
// Set stat again to get the latest metadata. // Set stat again to get the latest metadata.
st, e := os.Stat(file.Name()) st, e := os.Stat(safeFile.Name())
if e != nil { if e != nil {
return ObjectInfo{}, probe.NewError(e) return ObjectInfo{}, probe.NewError(e)
} }
@ -302,6 +301,10 @@ func (fs Filesystem) PutObject(bucket string, object string, size int64, data io
MD5Sum: newMD5Hex, MD5Sum: newMD5Hex,
ContentType: contentType, ContentType: contentType,
} }
// Safely close and atomically rename the file.
safeFile.Close()
return newObject, nil return newObject, nil
} }

View File

@ -30,8 +30,8 @@ import (
"sync" "sync"
"github.com/fatih/structs" "github.com/fatih/structs"
"github.com/minio/minio/pkg/atomic"
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
) )
// Config - generic config interface functions // Config - generic config interface functions
@ -178,15 +178,15 @@ func (d config) Version() string {
// If the file does not exist, writeFile creates it; // If the file does not exist, writeFile creates it;
// otherwise writeFile truncates it before writing. // otherwise writeFile truncates it before writing.
func writeFile(filename string, data []byte) *probe.Error { func writeFile(filename string, data []byte) *probe.Error {
atomicFile, e := atomic.FileCreate(filename) safeFile, e := safe.CreateFile(filename)
if e != nil { if e != nil {
return probe.NewError(e) return probe.NewError(e)
} }
_, e = atomicFile.Write(data) _, e = safeFile.Write(data)
if e != nil { if e != nil {
return probe.NewError(e) return probe.NewError(e)
} }
e = atomicFile.Close() e = safeFile.Close()
if e != nil { if e != nil {
return probe.NewError(e) return probe.NewError(e)
} }

View File

@ -14,10 +14,10 @@
* limitations under the License. * limitations under the License.
*/ */
// NOTE - Rename() not guaranteed to be atomic on all filesystems which are not fully POSIX compatible // NOTE - Rename() not guaranteed to be safe on all filesystems which are not fully POSIX compatible
// Package atomic provides atomic file write semantics by leveraging Rename's() atomicity. // Package safe provides safe file write semantics by leveraging Rename's() safeity.
package atomic package safe
import ( import (
"io/ioutil" "io/ioutil"
@ -25,14 +25,14 @@ import (
"path/filepath" "path/filepath"
) )
// File container provided for atomic file writes // File provides for safe file writes.
type File struct { type File struct {
*os.File *os.File
file string file string
} }
// CloseAndSync sync file to disk and close, returns an error if any // SyncClose sync file to disk and close, returns an error if any
func (f *File) CloseAndSync() error { func (f *File) SyncClose() error {
// sync to the disk // sync to the disk
if err := f.File.Sync(); err != nil { if err := f.File.Sync(); err != nil {
return err return err
@ -49,15 +49,16 @@ func (f *File) Close() error {
if err := f.File.Close(); err != nil { if err := f.File.Close(); err != nil {
return err return err
} }
// atomic rename to final destination // safe rename to final destination
if err := os.Rename(f.Name(), f.file); err != nil { if err := os.Rename(f.Name(), f.file); err != nil {
return err return err
} }
return nil return nil
} }
// CloseAndPurge removes the temp file, closes the transaction and returns an error if any // CloseAndRemove closes the temp file, and safely removes it. Returns
func (f *File) CloseAndPurge() error { // error if any.
func (f *File) CloseAndRemove() error {
// close the embedded fd // close the embedded fd
if err := f.File.Close(); err != nil { if err := f.File.Close(); err != nil {
return err return err
@ -68,15 +69,39 @@ func (f *File) CloseAndPurge() error {
return nil return nil
} }
// FileCreate creates a new file at filePath for atomic writes, it also creates parent directories if they don't exist // CreateFile creates a new file at filePath for safe writes, it also
func FileCreate(filePath string) (*File, error) { // creates parent directories if they don't exist.
return FileCreateWithPrefix(filePath, "$deleteme.") func CreateFile(filePath string) (*File, error) {
return CreateFileWithPrefix(filePath, "$deleteme.")
} }
// FileCreateWithPrefix creates a new file at filePath for atomic writes, it also creates parent directories if they don't exist // CreateFileWithSuffix is similar to CreateFileWithPrefix, but the
// prefix specifies the prefix of the temporary files so that cleaning stale temp files is easy // second argument is treated as suffix for the temporary files.
func FileCreateWithPrefix(filePath string, prefix string) (*File, error) { func CreateFileWithSuffix(filePath string, suffix string) (*File, error) {
// if parent directories do not exist, ioutil.TempFile doesn't create them // If parent directories do not exist, ioutil.TempFile doesn't create them
// handle such a case with os.MkdirAll()
if err := os.MkdirAll(filepath.Dir(filePath), 0700); err != nil {
return nil, err
}
f, err := ioutil.TempFile(filepath.Dir(filePath), filepath.Base(filePath)+suffix)
if err != nil {
return nil, err
}
if err = os.Chmod(f.Name(), 0600); err != nil {
if err = os.Remove(f.Name()); err != nil {
return nil, err
}
return nil, err
}
return &File{File: f, file: filePath}, nil
}
// CreateFileWithPrefix creates a new file at filePath for safe
// writes, it also creates parent directories if they don't exist.
// prefix specifies the prefix of the temporary files so that cleaning
// stale temp files is easy.
func CreateFileWithPrefix(filePath string, prefix string) (*File, error) {
// If parent directories do not exist, ioutil.TempFile doesn't create them
// handle such a case with os.MkdirAll() // handle such a case with os.MkdirAll()
if err := os.MkdirAll(filepath.Dir(filePath), 0700); err != nil { if err := os.MkdirAll(filepath.Dir(filePath), 0700); err != nil {
return nil, err return nil, err

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package atomic package safe
import ( import (
"io/ioutil" "io/ioutil"
@ -34,7 +34,7 @@ type MySuite struct {
var _ = Suite(&MySuite{}) var _ = Suite(&MySuite{})
func (s *MySuite) SetUpSuite(c *C) { func (s *MySuite) SetUpSuite(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "atomic-") root, err := ioutil.TempDir(os.TempDir(), "safe-")
c.Assert(err, IsNil) c.Assert(err, IsNil)
s.root = root s.root = root
} }
@ -43,8 +43,8 @@ func (s *MySuite) TearDownSuite(c *C) {
os.RemoveAll(s.root) os.RemoveAll(s.root)
} }
func (s *MySuite) TestAtomic(c *C) { func (s *MySuite) TestSafe(c *C) {
f, err := FileCreate(filepath.Join(s.root, "testfile")) f, err := CreateFile(filepath.Join(s.root, "testfile"))
c.Assert(err, IsNil) c.Assert(err, IsNil)
_, err = os.Stat(filepath.Join(s.root, "testfile")) _, err = os.Stat(filepath.Join(s.root, "testfile"))
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
@ -54,12 +54,12 @@ func (s *MySuite) TestAtomic(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
} }
func (s *MySuite) TestAtomicPurge(c *C) { func (s *MySuite) TestSafePurge(c *C) {
f, err := FileCreate(filepath.Join(s.root, "purgefile")) f, err := CreateFile(filepath.Join(s.root, "purgefile"))
c.Assert(err, IsNil) c.Assert(err, IsNil)
_, err = os.Stat(filepath.Join(s.root, "purgefile")) _, err = os.Stat(filepath.Join(s.root, "purgefile"))
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
err = f.CloseAndPurge() err = f.CloseAndRemove()
c.Assert(err, IsNil) c.Assert(err, IsNil)
err = f.Close() err = f.Close()
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))