fs: Migration should handle bucketConfigs as regular objects. (#4482)

Current code failed to anticipate the existence of files
which could have been created to corrupt the namespace such
as `policy.json` file created at the bucket top level.

In the current release creating such as file conflicts
with the namespace for future bucket policy operations.
We implemented migration of backend format to avoid situations
such as these.

This PR handles this situation, makes sure that the
erroneous files should have been moved properly.

Fixes #4478
This commit is contained in:
Harshavardhana 2017-06-06 12:15:35 -07:00 committed by GitHub
parent f99987e47c
commit 976870a391
3 changed files with 169 additions and 33 deletions

View File

@ -123,24 +123,31 @@ func fsMkdir(dirPath string) (err error) {
return nil
}
func fsStat(statLoc string) (os.FileInfo, error) {
if statLoc == "" {
return nil, traceError(errInvalidArgument)
}
if err := checkPathLength(statLoc); err != nil {
return nil, traceError(err)
}
fi, err := osStat(preparePath(statLoc))
if err != nil {
return nil, traceError(err)
}
return fi, nil
}
// Lookup if directory exists, returns directory
// attributes upon success.
func fsStatDir(statDir string) (os.FileInfo, error) {
if statDir == "" {
return nil, traceError(errInvalidArgument)
}
if err := checkPathLength(statDir); err != nil {
return nil, traceError(err)
}
fi, err := osStat(preparePath(statDir))
fi, err := fsStat(statDir)
if err != nil {
if os.IsNotExist(err) {
if os.IsNotExist(errorCause(err)) {
return nil, traceError(errVolumeNotFound)
} else if os.IsPermission(err) {
} else if os.IsPermission(errorCause(err)) {
return nil, traceError(errVolumeAccessDenied)
}
return nil, traceError(err)
return nil, err
}
if !fi.IsDir() {
@ -152,26 +159,18 @@ func fsStatDir(statDir string) (os.FileInfo, error) {
// Lookup if file exists, returns file attributes upon success
func fsStatFile(statFile string) (os.FileInfo, error) {
if statFile == "" {
return nil, traceError(errInvalidArgument)
}
if err := checkPathLength(statFile); err != nil {
return nil, traceError(err)
}
fi, err := osStat(preparePath(statFile))
fi, err := fsStat(statFile)
if err != nil {
if os.IsNotExist(err) {
if os.IsNotExist(errorCause(err)) {
return nil, traceError(errFileNotFound)
} else if os.IsPermission(err) {
} else if os.IsPermission(errorCause(err)) {
return nil, traceError(errFileAccessDenied)
} else if isSysErrNotDir(err) {
} else if isSysErrNotDir(errorCause(err)) {
return nil, traceError(errFileAccessDenied)
} else if isSysErrPathNotFound(err) {
} else if isSysErrPathNotFound(errorCause(err)) {
return nil, traceError(errFileNotFound)
}
return nil, traceError(err)
return nil, err
}
if fi.IsDir() {
return nil, traceError(errFileAccessDenied)

View File

@ -113,12 +113,43 @@ func fsReaddirMetaBuckets(fsPath string) ([]string, error) {
return f.Readdirnames(-1)
}
// List of all bucket metadata configs.
var bucketMetadataConfigs = []string{
bucketNotificationConfig,
bucketListenerConfig,
bucketPolicyConfig,
}
// Migrates bucket metadata configs, ignores all other files.
func migrateBucketMetadataConfigs(metaBucket, bucket, tmpBucket string) error {
for _, bucketMetaFile := range bucketMetadataConfigs {
fi, err := fsStat(pathJoin(metaBucket, tmpBucket, bucketMetaFile))
if err != nil {
// There are no such files or directories found,
// proceed to next bucket metadata config.
if os.IsNotExist(errorCause(err)) {
continue
}
return err
}
// Bucket metadata is a file, move it as an actual bucket config.
if fi.Mode().IsRegular() {
if err = fsRenameFile(pathJoin(metaBucket, tmpBucket, bucketMetaFile),
pathJoin(metaBucket, bucket, bucketMetaFile)); err != nil {
if errorCause(err) != errFileNotFound {
return err
}
}
}
// All other file types are ignored.
}
// Success.
return nil
}
// Attempts to migrate old object metadata files to newer format
//
// i.e
@ -152,14 +183,9 @@ func migrateFSFormatV1ToV2(fsPath, fsUUID string) (err error) {
return err
}
/// Rename all bucket metadata files to newly created `bucket`.
for _, bucketMetaFile := range bucketMetadataConfigs {
if err = fsRenameFile(pathJoin(metaBucket, tmpBucket, bucketMetaFile),
pathJoin(metaBucket, bucket, bucketMetaFile)); err != nil {
if errorCause(err) != errFileNotFound {
return err
}
}
// Migrate all the bucket metadata configs.
if err = migrateBucketMetadataConfigs(metaBucket, bucket, tmpBucket); err != nil {
return err
}
// Finally rename the temporary bucket to `bucket/objects` directory.
@ -169,6 +195,7 @@ func migrateFSFormatV1ToV2(fsPath, fsUUID string) (err error) {
return err
}
}
}
log.Printf("Migrating bucket metadata format from \"%s\" to newer format \"%s\"... completed successfully.", fsFormatV1, fsFormatV2)

View File

@ -207,6 +207,116 @@ func TestFSMigrateObjectWithErr(t *testing.T) {
}
// Tests migrating FS format with .minio.sys/buckets filled with
// objects such as policy.json/fs.json, notification.xml/fs.json
// listener.json/fs.json.
func TestFSMigrateObjectWithBucketConfigObjects(t *testing.T) {
// Prepare for testing
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer removeAll(disk)
// Assign a new UUID.
uuid := mustGetUUID()
// Initialize meta volume, if volume already exists ignores it.
if err := initMetaVolumeFS(disk, uuid); err != nil {
t.Fatal(err)
}
fsFormatPath := pathJoin(disk, minioMetaBucket, fsFormatJSONFile)
formatCfg := &formatConfigV1{
Version: "1",
Format: "fs",
FS: &fsFormat{
Version: "1",
},
}
lk, err := lock.LockedOpenFile(preparePath(fsFormatPath), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
t.Fatal(err)
}
_, err = formatCfg.WriteTo(lk)
lk.Close()
if err != nil {
t.Fatal("Should not fail here", err)
}
// Construct the full path of fs.json
fsPath1 := pathJoin(bucketMetaPrefix, "testvolume1", bucketPolicyConfig, fsMetaJSONFile)
fsPath1 = pathJoin(disk, minioMetaBucket, fsPath1)
fsMetaJSON := `{"version":"1.0.0","format":"fs","minio":{"release":"DEVELOPMENT.2017-03-27T02-26-33Z"},"meta":{"etag":"467886be95c8ecfd71a2900e3f461b4f"}`
if _, err = fsCreateFile(fsPath1, bytes.NewReader([]byte(fsMetaJSON)), nil, 0); err != nil {
t.Fatal(err)
}
// Construct the full path of fs.json
fsPath2 := pathJoin(bucketMetaPrefix, "testvolume2", bucketNotificationConfig, fsMetaJSONFile)
fsPath2 = pathJoin(disk, minioMetaBucket, fsPath2)
fsMetaJSON = `{"version":"1.0.0","format":"fs","minio":{"release":"DEVELOPMENT.2017-03-27T02-26-33Z"},"meta":{"etag":"467886be95c8ecfd71a2900eff461b4d"}`
if _, err = fsCreateFile(fsPath2, bytes.NewReader([]byte(fsMetaJSON)), nil, 0); err != nil {
t.Fatal(err)
}
// Construct the full path of fs.json
fsPath3 := pathJoin(bucketMetaPrefix, "testvolume3", bucketListenerConfig, fsMetaJSONFile)
fsPath3 = pathJoin(disk, minioMetaBucket, fsPath3)
fsMetaJSON = `{"version":"1.0.0","format":"fs","minio":{"release":"DEVELOPMENT.2017-03-27T02-26-33Z"},"meta":{"etag":"467886be95c8ecfd71a2900eff461b4d"}`
if _, err = fsCreateFile(fsPath3, bytes.NewReader([]byte(fsMetaJSON)), nil, 0); err != nil {
t.Fatal(err)
}
if err = initFormatFS(disk, mustGetUUID()); err != nil {
t.Fatal("Should not fail here", err)
}
fsPath1 = pathJoin(bucketMetaPrefix, "testvolume1", objectMetaPrefix, bucketPolicyConfig, fsMetaJSONFile)
fsPath1 = pathJoin(disk, minioMetaBucket, fsPath1)
fi, err := fsStatFile(fsPath1)
if err != nil {
t.Fatal("Path should exist and accessible after migration", err)
}
if fi.IsDir() {
t.Fatalf("Unexpected path %s should be a file", fsPath1)
}
fsPath2 = pathJoin(bucketMetaPrefix, "testvolume2", objectMetaPrefix, bucketNotificationConfig, fsMetaJSONFile)
fsPath2 = pathJoin(disk, minioMetaBucket, fsPath2)
fi, err = fsStatFile(fsPath2)
if err != nil {
t.Fatal("Path should exist and accessible after migration", err)
}
if fi.IsDir() {
t.Fatalf("Unexpected path %s should be a file", fsPath2)
}
fsPath3 = pathJoin(bucketMetaPrefix, "testvolume3", objectMetaPrefix, bucketListenerConfig, fsMetaJSONFile)
fsPath3 = pathJoin(disk, minioMetaBucket, fsPath3)
fi, err = fsStatFile(fsPath3)
if err != nil {
t.Fatal("Path should exist and accessible after migration", err)
}
if fi.IsDir() {
t.Fatalf("Unexpected path %s should be a file", fsPath3)
}
formatCfg = &formatConfigV1{}
lk, err = lock.LockedOpenFile(preparePath(fsFormatPath), os.O_RDONLY, 0600)
if err != nil {
t.Fatal(err)
}
_, err = formatCfg.ReadFrom(lk)
lk.Close()
if err != nil {
t.Fatal("Should not fail here", err)
}
if formatCfg.FS.Version != fsFormatV2 {
t.Fatalf("Unexpected version detected expected \"%s\", got %s", fsFormatV2, formatCfg.FS.Version)
}
}
// Tests migrating FS format with .minio.sys/buckets filled with
// object metadata.
func TestFSMigrateObjectWithObjects(t *testing.T) {