Support creating empty directories. (#5049)

Every so often we get requirements for creating
directories/prefixes and we end up rejecting
such requirements. This PR implements this and
allows empty directories without any new file
addition to backend.

Existing lower APIs themselves are leveraged to provide
this behavior. Only FS backend supports this for
the time being as desired.
This commit is contained in:
Harshavardhana
2017-10-16 17:20:54 -07:00
committed by Dee Koder
parent 0c0d1e4150
commit b2cbade477
6 changed files with 186 additions and 80 deletions

View File

@@ -201,7 +201,7 @@ func (fs fsObjects) statBucketDir(bucket string) (os.FileInfo, error) {
if err != nil {
return nil, err
}
st, err := fsStatDir(bucketDir)
st, err := fsStatVolume(bucketDir)
if err != nil {
return nil, err
}
@@ -255,7 +255,7 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) {
continue
}
var fi os.FileInfo
fi, err = fsStatDir(pathJoin(fs.fsPath, entry))
fi, err = fsStatVolume(pathJoin(fs.fsPath, entry))
// There seems like no practical reason to check for errors
// at this point, if there are indeed errors we can simply
// just ignore such buckets and list only those which
@@ -379,7 +379,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
if err = checkGetObjArgs(bucket, object); err != nil {
if err = checkBucketAndObjectNamesFS(bucket, object); err != nil {
return err
}
@@ -397,6 +397,12 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
return toObjectErr(traceError(errUnexpected), bucket, object)
}
// If its a directory request, we return an empty body.
if hasSuffix(object, slashSeparator) {
_, err = writer.Write([]byte(""))
return toObjectErr(traceError(err), bucket, object)
}
if bucket != minioMetaBucket {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
_, err = fs.rwPool.Open(fsMetaPath)
@@ -440,6 +446,15 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
fsMeta := fsMetaV1{}
if hasSuffix(object, slashSeparator) {
// Directory call needs to arrive with object ending with "/".
fi, err := fsStatDir(pathJoin(fs.fsPath, bucket, object))
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
// Read `fs.json` to perhaps contend with
@@ -472,9 +487,25 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
// Checks bucket and object name validity, returns nil if both are valid.
func checkBucketAndObjectNamesFS(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if len(object) == 0 {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if !IsValidObjectPrefix(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return nil
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
if err := checkGetObjArgs(bucket, object); err != nil {
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
return oi, err
}
@@ -510,6 +541,11 @@ func (fs fsObjects) parentDirIsObject(bucket, parent string) bool {
// Additionally writes `fs.json` which carries the necessary metadata
// for future object operations.
func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, metadata map[string]string) (objInfo ObjectInfo, retErr error) {
// No metadata is set, allocate a new one.
if metadata == nil {
metadata = make(map[string]string)
}
var err error
// Validate if bucket name is valid and exists.
@@ -517,6 +553,9 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me
return ObjectInfo{}, toObjectErr(err, bucket)
}
fsMeta := newFSMetaV1()
fsMeta.Meta = metadata
// This is a special case with size as '0' and object ends
// with a slash separator, we treat it like a valid operation
// and return success.
@@ -525,7 +564,14 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me
if fs.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
return dirObjectInfo(bucket, object, data.Size(), metadata), nil
if err = fsMkdirAll(pathJoin(fs.fsPath, bucket, object)); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
var fi os.FileInfo
if fi, err = fsStatDir(pathJoin(fs.fsPath, bucket, object)); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
if err = checkPutObjectArgs(bucket, object, fs); err != nil {
@@ -542,14 +588,6 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me
return ObjectInfo{}, traceError(errInvalidArgument)
}
// No metadata is set, allocate a new one.
if metadata == nil {
metadata = make(map[string]string)
}
fsMeta := newFSMetaV1()
fsMeta.Meta = metadata
var wlk *lock.LockedFile
if bucket != minioMetaBucket {
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
@@ -632,7 +670,7 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me
// DeleteObject - deletes an object from a bucket, this operation is destructive
// and there are no rollbacks supported.
func (fs fsObjects) DeleteObject(bucket, object string) error {
if err := checkDelObjArgs(bucket, object); err != nil {
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
return err
}
@@ -779,18 +817,30 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
// Convert entry to ObjectInfo
entryToObjectInfo := func(entry string) (objInfo ObjectInfo, err error) {
if hasSuffix(entry, slashSeparator) {
// Object name needs to be full path.
objInfo.Name = entry
objInfo.IsDir = true
return
}
// Protect reading `fs.json`.
// Protect the entry from concurrent deletes, or renames.
objectLock := globalNSMutex.NewNSLock(bucket, entry)
if err = objectLock.GetRLock(globalListingTimeout); err != nil {
return ObjectInfo{}, err
}
if hasSuffix(entry, slashSeparator) {
var fi os.FileInfo
fi, err = fsStatDir(pathJoin(fs.fsPath, bucket, entry))
objectLock.RUnlock()
if err != nil {
return objInfo, err
}
// Success.
return ObjectInfo{
// Object name needs to be full path.
Name: entry,
Bucket: bucket,
Size: fi.Size(),
ModTime: fi.ModTime(),
IsDir: fi.IsDir(),
}, nil
}
var etag string
etag, err = fs.getObjectETag(bucket, entry)
objectLock.RUnlock()