Unify gateway and object layer. (#5487)

* Unify gateway and object layer. Bring bucket policies into
object layer.
This commit is contained in:
poornas
2018-02-09 15:19:30 -08:00
committed by kannappanr
parent a7f6e14370
commit 4f73fd9487
43 changed files with 517 additions and 2458 deletions

View File

@@ -24,11 +24,13 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"sort"
"sync"
"syscall"
"time"
"github.com/minio/minio-go/pkg/policy"
"github.com/minio/minio/pkg/errors"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/lock"
@@ -58,6 +60,9 @@ type fsObjects struct {
// To manage the appendRoutine go-routines
nsMutex *nsLockMap
// Variable represents bucket policies in memory.
bucketPolicies *bucketPolicies
}
// Represents the background append file.
@@ -254,6 +259,11 @@ func (fs *fsObjects) MakeBucketWithLocation(bucket, location string) error {
// GetBucketInfo - fetch bucket metadata info.
func (fs *fsObjects) GetBucketInfo(bucket string) (bi BucketInfo, e error) {
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
if e := bucketLock.GetRLock(globalObjectTimeout); e != nil {
return bi, e
}
defer bucketLock.RUnlock()
st, err := fs.statBucketDir(bucket)
if err != nil {
return bi, toObjectErr(err, bucket)
@@ -310,6 +320,11 @@ func (fs *fsObjects) ListBuckets() ([]BucketInfo, error) {
// DeleteBucket - delete a bucket and all the metadata associated
// with the bucket including pending multipart, object metadata.
func (fs *fsObjects) DeleteBucket(bucket string) error {
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
bucketDir, err := fs.getBucketDir(bucket)
if err != nil {
return toObjectErr(err, bucket)
@@ -331,7 +346,22 @@ func (fs *fsObjects) DeleteBucket(bucket string) error {
if err = fsRemoveAll(minioMetadataBucketDir); err != nil {
return toObjectErr(err, bucket)
}
// Delete bucket access policy, if present - ignore any errors.
_ = removeBucketPolicy(bucket, fs)
// Notify all peers (including self) to update in-memory state
S3PeersUpdateBucketPolicy(bucket)
// Delete notification config, if present - ignore any errors.
_ = removeNotificationConfig(bucket, fs)
// Notify all peers (including self) to update in-memory state
S3PeersUpdateBucketNotification(bucket, nil)
// Delete listener config, if present - ignore any errors.
_ = removeListenerConfig(bucket, fs)
// Notify all peers (including self) to update in-memory state
S3PeersUpdateBucketListener(bucket, []listenerConfig{})
return nil
}
@@ -621,6 +651,9 @@ func (fs *fsObjects) parentDirIsObject(bucket, parent string) bool {
// Additionally writes `fs.json` which carries the necessary metadata
// for future object operations.
func (fs *fsObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) {
if err := checkPutObjectArgs(bucket, object, fs, data.Size()); err != nil {
return ObjectInfo{}, err
}
// Lock the object.
objectLock := fs.nsMutex.NewNSLock(bucket, object)
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
@@ -664,7 +697,7 @@ func (fs *fsObjects) putObject(bucket string, object string, data *hash.Reader,
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
if err = checkPutObjectArgs(bucket, object, fs); err != nil {
if err = checkPutObjectArgs(bucket, object, fs, data.Size()); err != nil {
return ObjectInfo{}, err
}
@@ -872,7 +905,13 @@ func (fs *fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKe
if err := checkListObjsArgs(bucket, prefix, marker, delimiter, fs); err != nil {
return loi, err
}
// Marker is set validate pre-condition.
if marker != "" {
// Marker not common with prefix is not implemented.Send an empty response
if !hasPrefix(marker, prefix) {
return ListObjectsInfo{}, e
}
}
if _, err := fs.statBucketDir(bucket); err != nil {
return loi, err
}
@@ -1042,3 +1081,62 @@ func (fs *fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, m
func (fs *fsObjects) ListBucketsHeal() ([]BucketInfo, error) {
return []BucketInfo{}, errors.Trace(NotImplemented{})
}
// SetBucketPolicy sets policy on bucket
func (fs *fsObjects) SetBucketPolicy(bucket string, policy policy.BucketAccessPolicy) error {
return persistAndNotifyBucketPolicyChange(bucket, false, policy, fs)
}
// GetBucketPolicy will get policy on bucket
func (fs *fsObjects) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, error) {
policy := fs.bucketPolicies.GetBucketPolicy(bucket)
if reflect.DeepEqual(policy, emptyBucketPolicy) {
return readBucketPolicy(bucket, fs)
}
return policy, nil
}
// DeleteBucketPolicy deletes all policies on bucket
func (fs *fsObjects) DeleteBucketPolicy(bucket string) error {
return persistAndNotifyBucketPolicyChange(bucket, true, emptyBucketPolicy, fs)
}
// ListObjectsV2 lists all blobs in bucket filtered by prefix
func (fs *fsObjects) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
loi, err := fs.ListObjects(bucket, prefix, continuationToken, delimiter, maxKeys)
if err != nil {
return result, err
}
listObjectsV2Info := ListObjectsV2Info{
IsTruncated: loi.IsTruncated,
ContinuationToken: continuationToken,
NextContinuationToken: loi.NextMarker,
Objects: loi.Objects,
Prefixes: loi.Prefixes,
}
return listObjectsV2Info, err
}
// RefreshBucketPolicy refreshes cache policy with what's on disk.
func (fs *fsObjects) RefreshBucketPolicy(bucket string) error {
policy, err := readBucketPolicy(bucket, fs)
if err != nil {
if reflect.DeepEqual(policy, emptyBucketPolicy) {
return fs.bucketPolicies.DeleteBucketPolicy(bucket)
}
return err
}
return fs.bucketPolicies.SetBucketPolicy(bucket, policy)
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (fs *fsObjects) IsNotificationSupported() bool {
return true
}
// IsEncryptionSupported returns whether server side encryption is applicable for this layer.
func (fs *fsObjects) IsEncryptionSupported() bool {
return true
}