fs: Migrate object metadata to objects directory. (#4195)

Fixes #3352
This commit is contained in:
Harshavardhana
2017-05-05 08:49:09 -07:00
committed by GitHub
parent 99ddd35343
commit 76f4f20609
13 changed files with 775 additions and 93 deletions

View File

@@ -24,6 +24,7 @@ import (
"io"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"sort"
"syscall"
@@ -72,15 +73,117 @@ func initMetaVolumeFS(fsPath, fsUUID string) error {
}
// Migrate FS object is a place holder code for all
// FS format migrations.
func migrateFSObject(fsPath, fsUUID string) (err error) {
// Writing message here is important for servers being upgraded.
log.Println("Please do not stop the server.")
ch := make(chan os.Signal)
defer signal.Stop(ch)
defer close(ch)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
go func() {
for {
_, ok := <-ch
if !ok {
break
}
log.Println("Please wait server is being upgraded..")
}
}()
return migrateFSFormatV1ToV2(fsPath, fsUUID)
}
// List all buckets at meta bucket prefix in `.minio.sys/buckets/` path.
// This is implemented to avoid a bug on windows with using readDir().
func fsReaddirMetaBuckets(fsPath string) ([]string, error) {
f, err := os.Open(preparePath(pathJoin(fsPath, minioMetaBucket, bucketConfigPrefix)))
if err != nil {
if os.IsNotExist(err) {
return nil, errFileNotFound
} else if os.IsPermission(err) {
return nil, errFileAccessDenied
}
return nil, err
}
return f.Readdirnames(-1)
}
var bucketMetadataConfigs = []string{
bucketNotificationConfig,
bucketListenerConfig,
bucketPolicyConfig,
}
// Attempts to migrate old object metadata files to newer format
//
// i.e
// -------------------------------------------------------
// .minio.sys/buckets/<bucket_name>/<object_path>/fs.json - V1
// -------------------------------------------------------
// .minio.sys/buckets/<bucket_name>/objects/<object_path>/fs.json - V2
// -------------------------------------------------------
//
func migrateFSFormatV1ToV2(fsPath, fsUUID string) (err error) {
metaBucket := pathJoin(fsPath, minioMetaBucket, bucketConfigPrefix)
var buckets []string
buckets, err = fsReaddirMetaBuckets(fsPath)
if err != nil && err != errFileNotFound {
return err
}
// Migrate all buckets present.
for _, bucket := range buckets {
// Temporary bucket of form .UUID-bucket.
tmpBucket := fmt.Sprintf(".%s-%s", fsUUID, bucket)
// Rename existing bucket as `.UUID-bucket`.
if err = fsRenameFile(pathJoin(metaBucket, bucket), pathJoin(metaBucket, tmpBucket)); err != nil {
return err
}
// Create a new bucket name with name as `bucket`.
if err = fsMkdir(pathJoin(metaBucket, bucket)); err != nil {
return err
}
/// Rename all bucket metadata files to newly created `bucket`.
for _, bucketMetaFile := range bucketMetadataConfigs {
if err = fsRenameFile(pathJoin(metaBucket, tmpBucket, bucketMetaFile),
pathJoin(metaBucket, bucket, bucketMetaFile)); err != nil {
if errorCause(err) != errFileNotFound {
return err
}
}
}
// Finally rename the temporary bucket to `bucket/objects` directory.
if err = fsRenameFile(pathJoin(metaBucket, tmpBucket),
pathJoin(metaBucket, bucket, objectMetaPrefix)); err != nil {
if errorCause(err) != errFileNotFound {
return err
}
}
}
log.Printf("Migrating bucket metadata format from \"%s\" to newer format \"%s\"... completed successfully.", fsFormatV1, fsFormatV2)
// If all goes well we return success.
return nil
}
// newFSObjectLayer - initialize new fs object layer.
func newFSObjectLayer(fsPath string) (ObjectLayer, error) {
if fsPath == "" {
return nil, errInvalidArgument
}
var err error
// Disallow relative paths, figure out absolute paths.
fsPath, err = filepath.Abs(fsPath)
fsPath, err := filepath.Abs(fsPath)
if err != nil {
return nil, err
}
@@ -108,26 +211,6 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) {
return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
}
// Load `format.json`.
format, err := loadFormatFS(fsPath)
if err != nil && err != errUnformattedDisk {
return nil, fmt.Errorf("Unable to load 'format.json', %s", err)
}
// If the `format.json` doesn't exist create one.
if err == errUnformattedDisk {
fsFormatPath := pathJoin(fsPath, minioMetaBucket, fsFormatJSONFile)
// Initialize format.json, if already exists overwrite it.
if serr := saveFormatFS(fsFormatPath, newFSFormatV1()); serr != nil {
return nil, fmt.Errorf("Unable to initialize 'format.json', %s", serr)
}
}
// Validate if we have the same format.
if err == nil && format.Format != "fs" {
return nil, fmt.Errorf("Unable to recognize backend format, Disk is not in FS format. %s", format.Format)
}
// Initialize fs objects.
fs := &fsObjects{
fsPath: fsPath,
@@ -141,6 +224,17 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) {
},
}
// Initialize `format.json`.
if err = initFormatFS(fsPath, fsUUID); err != nil {
return nil, err
}
// Once initialized hold read lock for the entire operation
// of filesystem backend.
if _, err = fs.rwPool.Open(pathJoin(fsPath, minioMetaBucket, fsFormatJSONFile)); err != nil {
return nil, err
}
// Initialize and load bucket policies.
err = initBucketPolicies(fs)
if err != nil {
@@ -159,6 +253,9 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) {
// Should be called when process shuts down.
func (fs fsObjects) Shutdown() error {
// Close the format.json read lock.
fs.rwPool.Close(pathJoin(fs.fsPath, minioMetaBucket, fsFormatJSONFile))
// Cleanup and delete tmp uuid.
return fsRemoveAll(pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID))
}
@@ -238,7 +335,7 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) {
return nil, traceError(err)
}
var bucketInfos []BucketInfo
entries, err := readDir(preparePath(fs.fsPath))
entries, err := readDir(fs.fsPath)
if err != nil {
return nil, toObjectErr(traceError(errDiskNotFound))
}
@@ -322,7 +419,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
// Check if this request is only metadata update.
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if cpMetadataOnly {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, srcBucket, srcObject, fsMetaJSONFile)
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, srcBucket, objectMetaPrefix, srcObject, fsMetaJSONFile)
var wlk *lock.LockedFile
wlk, err = fs.rwPool.Write(fsMetaPath)
if err != nil {
@@ -395,7 +492,7 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
}
if bucket != minioMetaBucket {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, objectMetaPrefix, object, fsMetaJSONFile)
_, err = fs.rwPool.Open(fsMetaPath)
if err != nil && err != errFileNotFound {
return toObjectErr(traceError(err), bucket, object)
@@ -437,7 +534,7 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (fs fsObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) {
fsMeta := fsMetaV1{}
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, objectMetaPrefix, object, fsMetaJSONFile)
// Read `fs.json` to perhaps contend with
// parallel Put() operations.
@@ -520,7 +617,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
var wlk *lock.LockedFile
if bucket != minioMetaBucket {
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fsMetaJSONFile)
fsMetaPath := pathJoin(bucketMetaDir, bucket, objectMetaPrefix, object, fsMetaJSONFile)
wlk, err = fs.rwPool.Create(fsMetaPath)
if err != nil {
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
@@ -647,7 +744,7 @@ func (fs fsObjects) DeleteObject(bucket, object string) error {
}
minioMetaBucketDir := pathJoin(fs.fsPath, minioMetaBucket)
fsMetaPath := pathJoin(minioMetaBucketDir, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
fsMetaPath := pathJoin(minioMetaBucketDir, bucketMetaPrefix, bucket, objectMetaPrefix, object, fsMetaJSONFile)
if bucket != minioMetaBucket {
rwlk, lerr := fs.rwPool.Write(fsMetaPath)
if lerr == nil {
@@ -701,7 +798,7 @@ func (fs fsObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc {
// getObjectETag is a helper function, which returns only the md5sum
// of the file on the disk.
func (fs fsObjects) getObjectETag(bucket, entry string) (string, error) {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, entry, fsMetaJSONFile)
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, objectMetaPrefix, entry, fsMetaJSONFile)
// Read `fs.json` to perhaps contend with
// parallel Put() operations.