Merge pull request #952 from harshavardhana/server-pkg

fs/bucket: Move bucket metadata into buckets.json
This commit is contained in:
Harshavardhana 2015-11-01 21:27:52 -08:00
commit ef04507bc4
7 changed files with 165 additions and 74 deletions

View File

@ -16,46 +16,35 @@
package fs package fs
import (
"os"
"path/filepath"
)
// IsPrivateBucket - is private bucket // IsPrivateBucket - is private bucket
func (fs Filesystem) IsPrivateBucket(bucket string) bool { func (fs Filesystem) IsPrivateBucket(bucket string) bool {
fs.lock.Lock() fs.lock.Lock()
defer fs.lock.Unlock() defer fs.lock.Unlock()
// get bucket path bucketMetadata, ok := fs.buckets.Metadata[bucket]
bucketDir := filepath.Join(fs.path, bucket) if !ok {
fi, err := os.Stat(bucketDir)
if err != nil {
return true return true
} }
return permToACL(fi.Mode()).IsPrivate() return bucketMetadata.ACL.IsPrivate()
} }
// IsPublicBucket - is public bucket // IsPublicBucket - is public bucket
func (fs Filesystem) IsPublicBucket(bucket string) bool { func (fs Filesystem) IsPublicBucket(bucket string) bool {
fs.lock.Lock() fs.lock.Lock()
defer fs.lock.Unlock() defer fs.lock.Unlock()
// get bucket path bucketMetadata, ok := fs.buckets.Metadata[bucket]
bucketDir := filepath.Join(fs.path, bucket) if !ok {
fi, err := os.Stat(bucketDir)
if err != nil {
return true return true
} }
return permToACL(fi.Mode()).IsPublicReadWrite() return bucketMetadata.ACL.IsPublicReadWrite()
} }
// IsReadOnlyBucket - is read only bucket // IsReadOnlyBucket - is read only bucket
func (fs Filesystem) IsReadOnlyBucket(bucket string) bool { func (fs Filesystem) IsReadOnlyBucket(bucket string) bool {
fs.lock.Lock() fs.lock.Lock()
defer fs.lock.Unlock() defer fs.lock.Unlock()
// get bucket path bucketMetadata, ok := fs.buckets.Metadata[bucket]
bucketDir := filepath.Join(fs.path, bucket) if !ok {
fi, err := os.Stat(bucketDir)
if err != nil {
return true return true
} }
return permToACL(fi.Mode()).IsPublicRead() return bucketMetadata.ACL.IsPublicRead()
} }

View File

@ -333,8 +333,7 @@ func testBucketMetadata(c *check.C, create func() Filesystem) {
metadata, err := fs.GetBucketMetadata("string") metadata, err := fs.GetBucketMetadata("string")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
// On windows everything directory is always in public-read-write mode -- TODO need to handle this c.Assert(metadata.ACL, check.Equals, BucketACL("private"))
c.Assert(metadata.ACL, check.Equals, BucketACL("public-read-write"))
} }
func testBucketRecreateFails(c *check.C, create func() Filesystem) { func testBucketRecreateFails(c *check.C, create func() Filesystem) {

View File

@ -24,7 +24,19 @@ import (
"github.com/minio/minio-xl/pkg/quick" "github.com/minio/minio-xl/pkg/quick"
) )
func getFSMultipartConfigPath() (string, *probe.Error) { func getFSBucketsConfigPath() (string, *probe.Error) {
if customBucketsConfigPath != "" {
return customBucketsConfigPath, nil
}
u, err := user.Current()
if err != nil {
return "", probe.NewError(err)
}
fsBucketsConfigPath := filepath.Join(u.HomeDir, ".minio", "buckets.json")
return fsBucketsConfigPath, nil
}
func getFSMultipartsSessionConfigPath() (string, *probe.Error) {
if customMultipartsConfigPath != "" { if customMultipartsConfigPath != "" {
return customMultipartsConfigPath, nil return customMultipartsConfigPath, nil
} }
@ -32,16 +44,16 @@ func getFSMultipartConfigPath() (string, *probe.Error) {
if err != nil { if err != nil {
return "", probe.NewError(err) return "", probe.NewError(err)
} }
fsMultipartsConfigPath := filepath.Join(u.HomeDir, ".minio", "multiparts.json") fsMultipartsConfigPath := filepath.Join(u.HomeDir, ".minio", "multiparts-session.json")
return fsMultipartsConfigPath, nil return fsMultipartsConfigPath, nil
} }
// internal variable only accessed via get/set methods // internal variable only accessed via get/set methods
var customConfigPath, customMultipartsConfigPath string var customMultipartsConfigPath, customBucketsConfigPath string
// SetFSConfigPath - set custom fs config path // SetFSBucketsConfigPath - set custom fs buckets config path
func SetFSConfigPath(configPath string) { func SetFSBucketsConfigPath(configPath string) {
customConfigPath = configPath customBucketsConfigPath = configPath
} }
// SetFSMultipartsConfigPath - set custom multiparts session config path // SetFSMultipartsConfigPath - set custom multiparts session config path
@ -51,7 +63,7 @@ func SetFSMultipartsConfigPath(configPath string) {
// SaveMultipartsSession - save multiparts // SaveMultipartsSession - save multiparts
func SaveMultipartsSession(multiparts *Multiparts) *probe.Error { func SaveMultipartsSession(multiparts *Multiparts) *probe.Error {
fsMultipartsConfigPath, err := getFSMultipartConfigPath() fsMultipartsConfigPath, err := getFSMultipartsSessionConfigPath()
if err != nil { if err != nil {
return err.Trace() return err.Trace()
} }
@ -65,9 +77,25 @@ func SaveMultipartsSession(multiparts *Multiparts) *probe.Error {
return nil return nil
} }
// SaveBucketsMetadata - save metadata of all buckets
func SaveBucketsMetadata(buckets *Buckets) *probe.Error {
fsBucketsConfigPath, err := getFSBucketsConfigPath()
if err != nil {
return err.Trace()
}
qc, err := quick.New(buckets)
if err != nil {
return err.Trace()
}
if err := qc.Save(fsBucketsConfigPath); err != nil {
return err.Trace()
}
return nil
}
// loadMultipartsSession load multipart session file // loadMultipartsSession load multipart session file
func loadMultipartsSession() (*Multiparts, *probe.Error) { func loadMultipartsSession() (*Multiparts, *probe.Error) {
fsMultipartsConfigPath, err := getFSMultipartConfigPath() fsMultipartsConfigPath, err := getFSMultipartsSessionConfigPath()
if err != nil { if err != nil {
return nil, err.Trace() return nil, err.Trace()
} }
@ -83,3 +111,22 @@ func loadMultipartsSession() (*Multiparts, *probe.Error) {
} }
return qc.Data().(*Multiparts), nil return qc.Data().(*Multiparts), nil
} }
// loadBucketsMetadata load buckets metadata file
func loadBucketsMetadata() (*Buckets, *probe.Error) {
fsBucketsConfigPath, err := getFSBucketsConfigPath()
if err != nil {
return nil, err.Trace()
}
buckets := &Buckets{}
buckets.Version = "1"
buckets.Metadata = make(map[string]*BucketMetadata)
qc, err := quick.New(buckets)
if err != nil {
return nil, err.Trace()
}
if err := qc.Load(fsBucketsConfigPath); err != nil {
return nil, err.Trace()
}
return qc.Data().(*Buckets), nil
}

View File

@ -38,7 +38,13 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error {
} }
bucketDir := filepath.Join(fs.path, bucket) bucketDir := filepath.Join(fs.path, bucket)
// check bucket exists // check bucket exists
if _, err := os.Stat(bucketDir); os.IsNotExist(err) { if _, err := os.Stat(bucketDir); err != nil {
if os.IsNotExist(err) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
return probe.NewError(err)
}
if _, ok := fs.buckets.Metadata[bucket]; !ok {
return probe.NewError(BucketNotFound{Bucket: bucket}) return probe.NewError(BucketNotFound{Bucket: bucket})
} }
if err := os.Remove(bucketDir); err != nil { if err := os.Remove(bucketDir); err != nil {
@ -47,6 +53,10 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error {
} }
return probe.NewError(err) return probe.NewError(err)
} }
delete(fs.buckets.Metadata, bucket)
if err := SaveBucketsMetadata(fs.buckets); err != nil {
return err.Trace(bucket)
}
return nil return nil
} }
@ -102,9 +112,12 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error {
return probe.NewError(BucketNameInvalid{Bucket: bucket}) return probe.NewError(BucketNameInvalid{Bucket: bucket})
} }
if !IsValidBucketACL(acl) {
return probe.NewError(InvalidACL{ACL: acl})
}
// get bucket path // get bucket path
bucketDir := filepath.Join(fs.path, bucket) bucketDir := filepath.Join(fs.path, bucket)
// check if bucket exists // check if bucket exists
if _, err = os.Stat(bucketDir); err == nil { if _, err = os.Stat(bucketDir); err == nil {
return probe.NewError(BucketExists{ return probe.NewError(BucketExists{
@ -113,10 +126,30 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error {
} }
// make bucket // make bucket
err = os.Mkdir(bucketDir, aclToPerm(acl)) err = os.Mkdir(bucketDir, 0700)
if err != nil { if err != nil {
return probe.NewError(err) return probe.NewError(err)
} }
bucketMetadata := &BucketMetadata{}
fi, err := os.Stat(bucketDir)
// check if bucket exists
if err != nil {
if os.IsNotExist(err) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
return probe.NewError(err)
}
if strings.TrimSpace(acl) == "" {
acl = "private"
}
bucketMetadata.Name = fi.Name()
bucketMetadata.Created = fi.ModTime()
bucketMetadata.ACL = BucketACL(acl)
fs.buckets.Metadata[bucket] = bucketMetadata
if err := SaveBucketsMetadata(fs.buckets); err != nil {
return err.Trace(bucket)
}
return nil return nil
} }
@ -129,48 +162,22 @@ func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Er
} }
// get bucket path // get bucket path
bucketDir := filepath.Join(fs.path, bucket) bucketDir := filepath.Join(fs.path, bucket)
bucketMetadata := BucketMetadata{}
fi, err := os.Stat(bucketDir) fi, err := os.Stat(bucketDir)
// check if bucket exists
if os.IsNotExist(err) {
return BucketMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil { if err != nil {
// check if bucket exists
if os.IsNotExist(err) {
return BucketMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return BucketMetadata{}, probe.NewError(err) return BucketMetadata{}, probe.NewError(err)
} }
bucketMetadata, ok := fs.buckets.Metadata[bucket]
bucketMetadata.Name = fi.Name() if !ok {
bucketMetadata.Created = fi.ModTime() bucketMetadata = &BucketMetadata{}
bucketMetadata.ACL = permToACL(fi.Mode()) bucketMetadata.Name = fi.Name()
return bucketMetadata, nil bucketMetadata.Created = fi.ModTime()
} bucketMetadata.ACL = BucketACL("private")
// permToACL - convert perm to meaningful ACL
func permToACL(mode os.FileMode) BucketACL {
switch mode.Perm() {
case os.FileMode(0700):
return BucketACL("private")
case os.FileMode(0500):
return BucketACL("public-read")
case os.FileMode(0777):
return BucketACL("public-read-write")
default:
return BucketACL("private")
}
}
// aclToPerm - convert acl to filesystem mode
func aclToPerm(acl string) os.FileMode {
switch acl {
case "private":
return os.FileMode(0700)
case "public-read":
return os.FileMode(0500)
case "public-read-write":
return os.FileMode(0777)
default:
return os.FileMode(0700)
} }
return *bucketMetadata, nil
} }
// SetBucketMetadata - set bucket metadata // SetBucketMetadata - set bucket metadata
@ -184,11 +191,28 @@ func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string
if !IsValidBucketACL(acl) { if !IsValidBucketACL(acl) {
return probe.NewError(InvalidACL{ACL: acl}) return probe.NewError(InvalidACL{ACL: acl})
} }
// get bucket path if strings.TrimSpace(acl) == "" {
acl = "private"
}
bucketDir := filepath.Join(fs.path, bucket) bucketDir := filepath.Join(fs.path, bucket)
err := os.Chmod(bucketDir, aclToPerm(acl)) fi, err := os.Stat(bucketDir)
if err != nil { if err != nil {
// check if bucket exists
if os.IsNotExist(err) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
return probe.NewError(err) return probe.NewError(err)
} }
bucketMetadata, ok := fs.buckets.Metadata[bucket]
if !ok {
bucketMetadata = &BucketMetadata{}
bucketMetadata.Name = fi.Name()
bucketMetadata.Created = fi.ModTime()
}
bucketMetadata.ACL = BucketACL(acl)
fs.buckets.Metadata[bucket] = bucketMetadata
if err := SaveBucketsMetadata(fs.buckets); err != nil {
return err.Trace(bucket)
}
return nil return nil
} }

View File

@ -30,6 +30,13 @@ type Filesystem struct {
minFreeDisk int64 minFreeDisk int64
lock *sync.Mutex lock *sync.Mutex
multiparts *Multiparts multiparts *Multiparts
buckets *Buckets
}
// Buckets holds acl information
type Buckets struct {
Version string `json:"version"`
Metadata map[string]*BucketMetadata
} }
// MultipartSession holds active session information // MultipartSession holds active session information
@ -65,8 +72,24 @@ func New() (Filesystem, *probe.Error) {
return Filesystem{}, err.Trace() return Filesystem{}, err.Trace()
} }
} }
var buckets *Buckets
buckets, err = loadBucketsMetadata()
if err != nil {
if os.IsNotExist(err.ToGoError()) {
buckets = &Buckets{
Version: "1",
Metadata: make(map[string]*BucketMetadata),
}
if err := SaveBucketsMetadata(buckets); err != nil {
return Filesystem{}, err.Trace()
}
} else {
return Filesystem{}, err.Trace()
}
}
a := Filesystem{lock: new(sync.Mutex)} a := Filesystem{lock: new(sync.Mutex)}
a.multiparts = multiparts a.multiparts = multiparts
a.buckets = buckets
return a, nil return a, nil
} }

View File

@ -38,7 +38,8 @@ func (s *MySuite) TestAPISuite(c *C) {
c.Check(err, IsNil) c.Check(err, IsNil)
path, err := ioutil.TempDir(os.TempDir(), "minio-") path, err := ioutil.TempDir(os.TempDir(), "minio-")
c.Check(err, IsNil) c.Check(err, IsNil)
SetFSMultipartsConfigPath(filepath.Join(configPath, "multiparts.json")) SetFSMultipartsConfigPath(filepath.Join(configPath, "multiparts-session.json"))
SetFSBucketsConfigPath(filepath.Join(configPath, "buckets.json"))
storageList = append(storageList, path) storageList = append(storageList, path)
store, perr := New() store, perr := New()
store.SetRootPath(path) store.SetRootPath(path)

View File

@ -59,12 +59,20 @@ func (s *MyAPIFSCacheSuite) SetUpSuite(c *C) {
fsroot, err := ioutil.TempDir(os.TempDir(), "api-") fsroot, err := ioutil.TempDir(os.TempDir(), "api-")
c.Assert(err, IsNil) c.Assert(err, IsNil)
fs.SetFSMultipartsConfigPath(filepath.Join(root, "multiparts.json")) fs.SetFSMultipartsConfigPath(filepath.Join(root, "multiparts-session.json"))
fs.SetFSBucketsConfigPath(filepath.Join(root, "buckets.json"))
multiparts := &fs.Multiparts{} multiparts := &fs.Multiparts{}
multiparts.Version = "1"
multiparts.ActiveSession = make(map[string]*fs.MultipartSession) multiparts.ActiveSession = make(map[string]*fs.MultipartSession)
perr := fs.SaveMultipartsSession(multiparts) perr := fs.SaveMultipartsSession(multiparts)
c.Assert(perr, IsNil) c.Assert(perr, IsNil)
buckets := &fs.Buckets{}
buckets.Version = "1"
buckets.Metadata = make(map[string]*fs.BucketMetadata)
perr = fs.SaveBucketsMetadata(buckets)
c.Assert(perr, IsNil)
accessKeyID, perr := generateAccessKeyID() accessKeyID, perr := generateAccessKeyID()
c.Assert(perr, IsNil) c.Assert(perr, IsNil)
secretAccessKey, perr := generateSecretAccessKey() secretAccessKey, perr := generateSecretAccessKey()