objects: Save all the incoming metadata properly. (#1688)

For both multipart and single put operation
This commit is contained in:
Harshavardhana 2016-05-18 19:54:25 -07:00 committed by Anand Babu (AB) Periasamy
parent af85acf388
commit 7d6ed50fc2
8 changed files with 99 additions and 35 deletions

View File

@ -28,8 +28,9 @@ func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
} }
// NewMultipartUpload - initialize a new multipart upload, returns a unique id. // NewMultipartUpload - initialize a new multipart upload, returns a unique id.
func (fs fsObjects) NewMultipartUpload(bucket, object string) (string, error) { func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
return newMultipartUploadCommon(fs.storage, bucket, object) meta = make(map[string]string) // Reset the meta value, we are not going to save headers for fs.
return newMultipartUploadCommon(fs.storage, bucket, object, meta)
} }
// PutObjectPart - writes the multipart upload chunks. // PutObjectPart - writes the multipart upload chunks.

View File

@ -35,7 +35,7 @@ func testObjectNewMultipartUpload(obj ObjectLayer, instanceType string, t *testi
errMsg := "Bucket not found: minio-bucket" errMsg := "Bucket not found: minio-bucket"
// opearation expected to fail since the bucket on which NewMultipartUpload is being initiated doesn't exist. // opearation expected to fail since the bucket on which NewMultipartUpload is being initiated doesn't exist.
uploadID, err := obj.NewMultipartUpload(bucket, object) uploadID, err := obj.NewMultipartUpload(bucket, object, nil)
if err == nil { if err == nil {
t.Fatalf("%s: Expected to fail since the NewMultipartUpload is intialized on a non-existant bucket.", instanceType) t.Fatalf("%s: Expected to fail since the NewMultipartUpload is intialized on a non-existant bucket.", instanceType)
} }
@ -50,7 +50,7 @@ func testObjectNewMultipartUpload(obj ObjectLayer, instanceType string, t *testi
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())
} }
uploadID, err = obj.NewMultipartUpload(bucket, object) uploadID, err = obj.NewMultipartUpload(bucket, object, nil)
if err != nil { if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())
} }
@ -83,7 +83,7 @@ func testObjectAPIIsUploadIDExists(obj ObjectLayer, instanceType string, t *test
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())
} }
_, err = obj.NewMultipartUpload(bucket, object) _, err = obj.NewMultipartUpload(bucket, object, nil)
if err != nil { if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())
} }
@ -114,7 +114,7 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t *testing
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())
} }
// Initiate Multipart Upload on the above created bucket. // Initiate Multipart Upload on the above created bucket.
uploadID, err := obj.NewMultipartUpload(bucket, object) uploadID, err := obj.NewMultipartUpload(bucket, object, nil)
if err != nil { if err != nil {
// Failed to create NewMultipartUpload, abort. // Failed to create NewMultipartUpload, abort.
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())

View File

@ -19,6 +19,7 @@ package main
import ( import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -71,7 +72,7 @@ func createUploadsJSON(storage StorageAPI, bucket, object, uploadID string) erro
// newMultipartUploadCommon - initialize a new multipart, is a common // newMultipartUploadCommon - initialize a new multipart, is a common
// function for both object layers. // function for both object layers.
func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) (uploadID string, err error) { func newMultipartUploadCommon(storage StorageAPI, bucket string, object string, meta map[string]string) (uploadID string, err error) {
// Verify if bucket name is valid. // Verify if bucket name is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket} return "", BucketNameInvalid{Bucket: bucket}
@ -111,6 +112,17 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string)
if w, err = storage.CreateFile(minioMetaBucket, tempUploadIDPath); err != nil { if w, err = storage.CreateFile(minioMetaBucket, tempUploadIDPath); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
// Encode the uploaded metadata into incomplete file.
encoder := json.NewEncoder(w)
err = encoder.Encode(&meta)
if err != nil {
if clErr := safeCloseAndRemove(w); clErr != nil {
return "", toObjectErr(clErr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
// Close the writer. // Close the writer.
if err = w.Close(); err != nil { if err = w.Close(); err != nil {
if clErr := safeCloseAndRemove(w); clErr != nil { if clErr := safeCloseAndRemove(w); clErr != nil {
@ -118,6 +130,8 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string)
} }
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
// Rename the file to the actual location from temporary path.
err = storage.RenameFile(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) err = storage.RenameFile(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
if err != nil { if err != nil {
if derr := storage.DeleteFile(minioMetaBucket, tempUploadIDPath); derr != nil { if derr := storage.DeleteFile(minioMetaBucket, tempUploadIDPath); derr != nil {

View File

@ -554,6 +554,22 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return return
} }
// Save metadata.
metadata := make(map[string]string)
// Make sure we hex encode md5sum here.
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
// Save other metadata if available.
metadata["content-type"] = r.Header.Get("Content-Type")
metadata["content-encoding"] = r.Header.Get("Content-Encoding")
for key := range r.Header {
cKey := http.CanonicalHeaderKey(key)
if strings.HasPrefix(cKey, "x-amz-meta-") {
metadata[cKey] = r.Header.Get(cKey)
} else if strings.HasPrefix(key, "x-minio-meta-") {
metadata[cKey] = r.Header.Get(cKey)
}
}
var md5Sum string var md5Sum string
switch getRequestAuthType(r) { switch getRequestAuthType(r) {
default: default:
@ -567,7 +583,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return return
} }
// Create anonymous object. // Create anonymous object.
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil) md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, metadata)
case authTypePresigned, authTypeSigned: case authTypePresigned, authTypeSigned:
// Initialize a pipe for data pipe line. // Initialize a pipe for data pipe line.
reader, writer := io.Pipe() reader, writer := io.Pipe()
@ -608,10 +624,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writer.Close() writer.Close()
}() }()
// Save metadata.
metadata := make(map[string]string)
// Make sure we hex encode here.
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
// Create object. // Create object.
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata) md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
// Close the pipe. // Close the pipe.
@ -657,7 +669,21 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
} }
} }
uploadID, err := api.ObjectAPI.NewMultipartUpload(bucket, object) // Save metadata.
metadata := make(map[string]string)
// Save other metadata if available.
metadata["content-type"] = r.Header.Get("Content-Type")
metadata["content-encoding"] = r.Header.Get("Content-Encoding")
for key := range r.Header {
cKey := http.CanonicalHeaderKey(key)
if strings.HasPrefix(cKey, "x-amz-meta-") {
metadata[cKey] = r.Header.Get(cKey)
} else if strings.HasPrefix(key, "x-minio-meta-") {
metadata[cKey] = r.Header.Get(cKey)
}
}
uploadID, err := api.ObjectAPI.NewMultipartUpload(bucket, object, metadata)
if err != nil { if err != nil {
errorIf(err, "Unable to initiate new multipart upload id.") errorIf(err, "Unable to initiate new multipart upload id.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)

View File

@ -35,7 +35,7 @@ type ObjectLayer interface {
// Multipart operations. // Multipart operations.
ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error)
NewMultipartUpload(bucket, object string) (uploadID string, err error) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error)
PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (md5 string, err error) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (md5 string, err error)
ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error)
AbortMultipartUpload(bucket, object, uploadID string) error AbortMultipartUpload(bucket, object, uploadID string) error

View File

@ -61,7 +61,7 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectLayer) {
obj := create() obj := create()
err := obj.MakeBucket("bucket") err := obj.MakeBucket("bucket")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
uploadID, err := obj.NewMultipartUpload("bucket", "key") uploadID, err := obj.NewMultipartUpload("bucket", "key", nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
// Create a byte array of 5MB. // Create a byte array of 5MB.
data := bytes.Repeat([]byte("0123456789abcdef"), 5*1024*1024/16) data := bytes.Repeat([]byte("0123456789abcdef"), 5*1024*1024/16)
@ -87,7 +87,7 @@ func testMultipartObjectAbort(c *check.C, create func() ObjectLayer) {
obj := create() obj := create()
err := obj.MakeBucket("bucket") err := obj.MakeBucket("bucket")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
uploadID, err := obj.NewMultipartUpload("bucket", "key") uploadID, err := obj.NewMultipartUpload("bucket", "key", nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
parts := make(map[int]string) parts := make(map[int]string)

View File

@ -41,6 +41,8 @@ type MultipartObjectInfo struct {
ModTime time.Time ModTime time.Time
Size int64 Size int64
MD5Sum string MD5Sum string
ContentType string
// Add more fields here.
} }
type byMultipartFiles []string type byMultipartFiles []string
@ -68,6 +70,25 @@ func (m MultipartObjectInfo) GetPartNumberOffset(offset int64) (partIndex int, p
return return
} }
// getMultipartObjectMeta - incomplete meta file and extract meta
// information if any.
func getMultipartObjectMeta(storage StorageAPI, metaFile string) (meta map[string]string, err error) {
meta = make(map[string]string)
offset := int64(0)
objMetaReader, err := storage.ReadFile(minioMetaBucket, metaFile, offset)
if err != nil {
return nil, err
}
decoder := json.NewDecoder(objMetaReader)
err = decoder.Decode(&meta)
if err != nil {
return nil, err
}
// Close the metadata reader.
objMetaReader.Close()
return meta, nil
}
func partNumToPartFileName(partNum int) string { func partNumToPartFileName(partNum int) string {
return fmt.Sprintf("%.5d%s", partNum, multipartSuffix) return fmt.Sprintf("%.5d%s", partNum, multipartSuffix)
} }
@ -78,8 +99,8 @@ func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
} }
// NewMultipartUpload - initialize a new multipart upload, returns a unique id. // NewMultipartUpload - initialize a new multipart upload, returns a unique id.
func (xl xlObjects) NewMultipartUpload(bucket, object string) (string, error) { func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
return newMultipartUploadCommon(xl.storage, bucket, object) return newMultipartUploadCommon(xl.storage, bucket, object, meta)
} }
// PutObjectPart - writes the multipart upload chunks. // PutObjectPart - writes the multipart upload chunks.
@ -148,6 +169,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
var metadata = MultipartObjectInfo{} var metadata = MultipartObjectInfo{}
var errs = make([]error, len(parts)) var errs = make([]error, len(parts))
uploadIDIncompletePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile)
objMeta, err := getMultipartObjectMeta(xl.storage, uploadIDIncompletePath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDIncompletePath)
}
// Waitgroup to wait for go-routines. // Waitgroup to wait for go-routines.
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
@ -184,6 +211,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Save successfully calculated md5sum. // Save successfully calculated md5sum.
metadata.MD5Sum = s3MD5 metadata.MD5Sum = s3MD5
metadata.ContentType = objMeta["content-type"]
// Save modTime as well as the current time. // Save modTime as well as the current time.
metadata.ModTime = time.Now().UTC() metadata.ModTime = time.Now().UTC()
@ -244,7 +273,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Delete the incomplete file place holder. // Delete the incomplete file place holder.
uploadIDIncompletePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile)
err = xl.storage.DeleteFile(minioMetaBucket, uploadIDIncompletePath) err = xl.storage.DeleteFile(minioMetaBucket, uploadIDIncompletePath)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDIncompletePath) return "", toObjectErr(err, minioMetaBucket, uploadIDIncompletePath)

View File

@ -17,7 +17,6 @@
package main package main
import ( import (
"bytes"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
@ -377,26 +376,22 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
metadataBytes, err := json.Marshal(metadata)
if err != nil {
return newMD5Hex, nil
}
tempMetaJSONFile := path.Join(tmpMetaPrefix, bucket, object, "meta.json") tempMetaJSONFile := path.Join(tmpMetaPrefix, bucket, object, "meta.json")
fileWriter, err = xl.storage.CreateFile(minioMetaBucket, tempMetaJSONFile) metaWriter, err := xl.storage.CreateFile(minioMetaBucket, tempMetaJSONFile)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
if _, err = io.Copy(fileWriter, bytes.NewReader(metadataBytes)); err != nil { encoder := json.NewEncoder(metaWriter)
if clErr := safeCloseAndRemove(fileWriter); clErr != nil { err = encoder.Encode(&metadata)
if err != nil {
if clErr := safeCloseAndRemove(metaWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object) return "", toObjectErr(clErr, bucket, object)
} }
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
if err = metaWriter.Close(); err != nil {
if err = fileWriter.Close(); err != nil { if err = safeCloseAndRemove(metaWriter); err != nil {
if err = safeCloseAndRemove(fileWriter); err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)