mirror of
https://github.com/minio/minio.git
synced 2025-11-20 18:06:10 -05:00
Make channels more aware of errors and shutdown on error - some cleanup
This commit is contained in:
@@ -31,33 +31,31 @@ import (
|
||||
)
|
||||
|
||||
type storage struct {
|
||||
root string
|
||||
writeLock sync.Mutex
|
||||
root string
|
||||
lock *sync.Mutex
|
||||
}
|
||||
|
||||
type SerializedMetadata struct {
|
||||
ContentType string
|
||||
}
|
||||
|
||||
type MkdirFailedError struct{}
|
||||
|
||||
func (self MkdirFailedError) Error() string {
|
||||
return "Mkdir Failed"
|
||||
}
|
||||
|
||||
func Start(root string) (chan<- string, <-chan error, *storage) {
|
||||
ctrlChannel := make(chan string)
|
||||
errorChannel := make(chan error)
|
||||
go start(ctrlChannel, errorChannel)
|
||||
return ctrlChannel, errorChannel, &storage{root: root}
|
||||
s := storage{}
|
||||
s.root = root
|
||||
s.lock = new(sync.Mutex)
|
||||
go start(ctrlChannel, errorChannel, &s)
|
||||
return ctrlChannel, errorChannel, &s
|
||||
}
|
||||
|
||||
func start(ctrlChannel <-chan string, errorChannel chan<- error) {
|
||||
func start(ctrlChannel <-chan string, errorChannel chan<- error, s *storage) {
|
||||
err := os.MkdirAll(s.root, 0700)
|
||||
errorChannel <- err
|
||||
close(errorChannel)
|
||||
}
|
||||
|
||||
// Bucket Operations
|
||||
|
||||
func (storage *storage) ListBuckets() ([]mstorage.BucketMetadata, error) {
|
||||
files, err := ioutil.ReadDir(storage.root)
|
||||
if err != nil {
|
||||
@@ -83,8 +81,8 @@ func (storage *storage) ListBuckets() ([]mstorage.BucketMetadata, error) {
|
||||
}
|
||||
|
||||
func (storage *storage) StoreBucket(bucket string) error {
|
||||
storage.writeLock.Lock()
|
||||
defer storage.writeLock.Unlock()
|
||||
storage.lock.Lock()
|
||||
defer storage.lock.Unlock()
|
||||
|
||||
// verify bucket path legal
|
||||
if mstorage.IsValidBucket(bucket) == false {
|
||||
@@ -110,8 +108,8 @@ func (storage *storage) StoreBucket(bucket string) error {
|
||||
}
|
||||
|
||||
func (storage *storage) GetBucketPolicy(bucket string) (interface{}, error) {
|
||||
storage.writeLock.Lock()
|
||||
defer storage.writeLock.Unlock()
|
||||
storage.lock.Lock()
|
||||
defer storage.lock.Unlock()
|
||||
|
||||
var p policy.BucketPolicy
|
||||
// verify bucket path legal
|
||||
@@ -153,8 +151,8 @@ func (storage *storage) GetBucketPolicy(bucket string) (interface{}, error) {
|
||||
}
|
||||
|
||||
func (storage *storage) StoreBucketPolicy(bucket string, policy interface{}) error {
|
||||
storage.writeLock.Lock()
|
||||
defer storage.writeLock.Unlock()
|
||||
storage.lock.Lock()
|
||||
defer storage.lock.Unlock()
|
||||
|
||||
// verify bucket path legal
|
||||
if mstorage.IsValidBucket(bucket) == false {
|
||||
@@ -332,8 +330,8 @@ func (storage *storage) ListObjects(bucket, prefix string, count int) ([]mstorag
|
||||
|
||||
func (storage *storage) StoreObject(bucket, key, contentType string, data io.Reader) error {
|
||||
// TODO Commits should stage then move instead of writing directly
|
||||
storage.writeLock.Lock()
|
||||
defer storage.writeLock.Unlock()
|
||||
storage.lock.Lock()
|
||||
defer storage.lock.Unlock()
|
||||
|
||||
// check bucket name valid
|
||||
if mstorage.IsValidBucket(bucket) == false {
|
||||
|
||||
Reference in New Issue
Block a user