fs: Save metadata for objects in minioMetaBucket directory. (#2251)

This commit is contained in:
Harshavardhana 2016-07-21 17:31:14 -07:00 committed by Anand Babu (AB) Periasamy
parent 303f216150
commit 0add96f655
6 changed files with 175 additions and 20 deletions

View File

@ -1,9 +1,26 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/json"
"path"
"sort"
"strings"
)
const (
@ -18,6 +35,8 @@ type fsMetaV1 struct {
Minio struct {
Release string `json:"release"`
} `json:"minio"`
// Metadata map for current object `fs.json`.
Meta map[string]string `json:"meta,omitempty"`
Parts []objectPartInfo `json:"parts,omitempty"`
}
@ -111,14 +130,41 @@ func writeFSFormatData(storage StorageAPI, fsFormat formatConfigV1) error {
return nil
}
// writeFSMetadata - writes `fs.json` metadata.
func (fs fsObjects) writeTempFSMetadata(bucket, path string, fsMeta fsMetaV1) error {
// writeFSMetadata - writes `fs.json` metadata, marshals fsMeta object into json
// and saves it to disk.
func writeFSMetadata(storage StorageAPI, bucket, path string, fsMeta fsMetaV1) error {
metadataBytes, err := json.Marshal(fsMeta)
if err != nil {
return err
}
if err = fs.storage.AppendFile(bucket, path, metadataBytes); err != nil {
if err = storage.AppendFile(bucket, path, metadataBytes); err != nil {
return err
}
return nil
}
var extendedHeaders = []string{
"X-Amz-Meta-",
"X-Minio-Meta-",
// Add new extended headers.
}
// isExtendedHeader validates if input string matches extended headers.
func isExtendedHeader(header string) bool {
for _, extendedHeader := range extendedHeaders {
if strings.HasPrefix(header, extendedHeader) {
return true
}
}
return false
}
// Return true if extended HTTP headers are set, false otherwise.
func hasExtendedHeader(metadata map[string]string) bool {
for k := range metadata {
if isExtendedHeader(k) {
return true
}
}
return false
}

63
fs-v1-metadata_test.go Normal file
View File

@ -0,0 +1,63 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "testing"
// Tests scenarios which can occur for hasExtendedHeader function.
func TestHasExtendedHeader(t *testing.T) {
// All test cases concerning hasExtendedHeader function.
testCases := []struct {
metadata map[string]string
has bool
}{
// Verifies if X-Amz-Meta is present.
{
metadata: map[string]string{
"X-Amz-Meta-1": "value",
},
has: true,
},
// Verifies if X-Minio-Meta is present.
{
metadata: map[string]string{
"X-Minio-Meta-1": "value",
},
has: true,
},
// Verifies if extended header is not present.
{
metadata: map[string]string{
"md5Sum": "value",
},
has: false,
},
// Verifieis if extended header is not present, but with an empty input.
{
metadata: nil,
has: false,
},
}
// Validate all test cases.
for i, testCase := range testCases {
has := hasExtendedHeader(testCase.metadata)
if has != testCase.has {
t.Fatalf("Test case %d: Expected \"%#v\", but got \"%#v\"", i+1, testCase.has, has)
}
}
}

View File

@ -218,6 +218,10 @@ func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// Initialize `fs.json` values.
fsMeta := newFSMetaV1()
// Save additional metadata only if extended headers such as "X-Amz-Meta-" are set.
if hasExtendedHeader(meta) {
fsMeta.Meta = meta
}
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
@ -231,7 +235,7 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempFSMetadataPath := path.Join(tmpMetaPrefix, getUUID()+"-"+fsMetaJSONFile)
if err = fs.writeTempFSMetadata(minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil {
if err = writeFSMetadata(fs.storage, minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempFSMetadataPath)
}
err = fs.storage.RenameFile(minioMetaBucket, tempFSMetadataPath, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
@ -377,7 +381,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempFSMetadataPath := path.Join(tmpMetaPrefix, getUUID()+"-"+fsMetaJSONFile)
if err = fs.writeTempFSMetadata(minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil {
if err = writeFSMetadata(fs.storage, minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempFSMetadataPath)
}
err = fs.storage.RenameFile(minioMetaBucket, tempFSMetadataPath, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
@ -578,9 +582,21 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(err, bucket, object)
}
// No need to save part info, since we have concatenated all parts.
fsMeta.Parts = nil
// Save additional metadata only if extended headers such as "X-Amz-Meta-" are set.
if hasExtendedHeader(fsMeta.Meta) {
fsMeta.Meta["md5Sum"] = s3MD5
fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
}
// Cleanup all the parts if everything else has been safely committed.
if err = cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil {
return "", err
return "", toObjectErr(err, bucket, object)
}
// Hold the lock so that two parallel complete-multipart-uploads do not

View File

@ -309,23 +309,32 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object))
if err != nil && err != errFileNotFound {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Guess content-type from the extension if possible.
contentType := ""
contentType := fsMeta.Meta["content-type"]
if contentType == "" {
if objectExt := filepath.Ext(object); objectExt != "" {
if content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]; ok {
contentType = content.ContentType
}
}
}
// Guess content-type from the extension if possible.
return ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: fi.ModTime,
Size: fi.Size,
IsDir: fi.Mode.IsDir(),
MD5Sum: fsMeta.Meta["md5Sum"],
ContentType: contentType,
MD5Sum: "", // Read from metadata.
ContentEncoding: fsMeta.Meta["content-encoding"],
UserDefined: fsMeta.Meta,
}, nil
}
@ -422,10 +431,24 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
return "", toObjectErr(err, bucket, object)
}
// Save additional metadata only if extended headers such as "X-Amz-Meta-" are set.
if hasExtendedHeader(metadata) {
// Initialize `fs.json` values.
fsMeta := newFSMetaV1()
fsMeta.Meta = metadata
fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
}
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
}
// DeleteObject - deletes an object from a bucket, this operation is destructive
// and there are no rollbacks supported.
func (fs fsObjects) DeleteObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@ -434,7 +457,11 @@ func (fs fsObjects) DeleteObject(bucket, object string) error {
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
if err := fs.storage.DeleteFile(bucket, object); err != nil {
err := fs.storage.DeleteFile(minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object))
if err != nil && err != errFileNotFound {
return toObjectErr(err, bucket, object)
}
if err = fs.storage.DeleteFile(bucket, object); err != nil {
return toObjectErr(err, bucket, object)
}
return nil

View File

@ -30,6 +30,9 @@ const (
// Staging buffer read size for all internal operations version 1.
readSizeV1 = 128 * 1024 // 128KiB.
// Buckets meta prefix.
bucketMetaPrefix = "buckets"
)
// Register callback functions that needs to be called when process shutsdown.

View File

@ -83,7 +83,7 @@ type xlMetaV1 struct {
Release string `json:"release"`
} `json:"minio"`
// Metadata map for current object `xl.json`.
Meta map[string]string `json:"meta"`
Meta map[string]string `json:"meta,omitempty"`
// Captures all the individual object `xl.json`.
Parts []objectPartInfo `json:"parts,omitempty"`
}