xl/getObjectInfo: Returns back proper size, modTime and md5Sum. (#1479)

Fixes #1469
This commit is contained in:
Harshavardhana 2016-05-04 15:28:58 -07:00 committed by Anand Babu (AB) Periasamy
parent 321aefa026
commit 6988ed9257
2 changed files with 42 additions and 24 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"io" "io"
"path" "path"
"time"
) )
// MultipartPartInfo Info of each part kept in the multipart metadata file after // MultipartPartInfo Info of each part kept in the multipart metadata file after
@ -33,20 +34,17 @@ type MultipartPartInfo struct {
// MultipartObjectInfo - contents of the multipart metadata file after // MultipartObjectInfo - contents of the multipart metadata file after
// CompleteMultipartUpload() is called. // CompleteMultipartUpload() is called.
type MultipartObjectInfo []MultipartPartInfo type MultipartObjectInfo struct {
Parts []MultipartPartInfo
// GetSize - Return the size of the object. ModTime time.Time
func (m MultipartObjectInfo) GetSize() (size int64) { Size int64
for _, part := range m { MD5Sum string
size += part.Size
}
return
} }
// GetPartNumberOffset - given an offset for the whole object, return the part and offset in that part. // GetPartNumberOffset - given an offset for the whole object, return the part and offset in that part.
func (m MultipartObjectInfo) GetPartNumberOffset(offset int64) (partIndex int, partOffset int64, err error) { func (m MultipartObjectInfo) GetPartNumberOffset(offset int64) (partIndex int, partOffset int64, err error) {
partOffset = offset partOffset = offset
for i, part := range m { for i, part := range m.Parts {
partIndex = i partIndex = i
if partOffset < part.Size { if partOffset < part.Size {
return return
@ -98,7 +96,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} else if !status { } else if !status {
return "", InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
var metadata MultipartObjectInfo var metadata = MultipartObjectInfo{}
var md5Sums []string var md5Sums []string
for _, part := range parts { for _, part := range parts {
// Construct part suffix. // Construct part suffix.
@ -111,16 +109,35 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
return "", err return "", err
} }
metadata = append(metadata, MultipartPartInfo{part.PartNumber, part.ETag, fi.Size}) // Update metadata parts.
metadata.Parts = append(metadata.Parts, MultipartPartInfo{
PartNumber: part.PartNumber,
ETag: part.ETag,
Size: fi.Size,
})
metadata.Size += fi.Size
multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber)) multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber))
err = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix) err = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix)
// We need a way to roll back if of the renames failed. // TODO: We need a way to roll back if of the renames failed.
if err != nil { if err != nil {
return "", err return "", err
} }
// Save md5sum for future response.
md5Sums = append(md5Sums, part.ETag) md5Sums = append(md5Sums, part.ETag)
} }
// Calculate and save s3 compatible md5sum.
s3MD5, err := makeS3MD5(md5Sums...)
if err != nil {
return "", err
}
metadata.MD5Sum = s3MD5
// Save modTime as well as the current time.
metadata.ModTime = time.Now().UTC()
// Create temporary multipart meta file to write and then rename.
tempMultipartMetaFile := path.Join(tmpMetaPrefix, bucket, object, multipartMetaFile) tempMultipartMetaFile := path.Join(tmpMetaPrefix, bucket, object, multipartMetaFile)
w, err := xl.storage.CreateFile(minioMetaBucket, tempMultipartMetaFile) w, err := xl.storage.CreateFile(minioMetaBucket, tempMultipartMetaFile)
if err != nil { if err != nil {
@ -138,11 +155,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if err = w.Close(); err != nil { if err = w.Close(); err != nil {
return "", err return "", err
} }
// Save the s3 md5.
s3MD5, err := makeS3MD5(md5Sums...)
if err != nil {
return "", err
}
multipartObjFile := path.Join(object, multipartMetaFile) multipartObjFile := path.Join(object, multipartMetaFile)
err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, bucket, multipartObjFile) err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, bucket, multipartObjFile)
if err != nil { if err != nil {

View File

@ -102,8 +102,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
go func() { go func() {
for ; partIndex < len(info); partIndex++ { for ; partIndex < len(info.Parts); partIndex++ {
part := info[partIndex] part := info.Parts[partIndex]
r, err := xl.storage.ReadFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)), offset) r, err := xl.storage.ReadFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)), offset)
if err != nil { if err != nil {
fileWriter.CloseWithError(err) fileWriter.CloseWithError(err)
@ -147,7 +147,9 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
fi.Size = info.GetSize() fi.Size = info.Size
fi.ModTime = info.ModTime
fi.MD5Sum = info.MD5Sum
} }
contentType := "application/octet-stream" contentType := "application/octet-stream"
if objectExt := filepath.Ext(object); objectExt != "" { if objectExt := filepath.Ext(object); objectExt != "" {
@ -163,7 +165,7 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
Size: fi.Size, Size: fi.Size,
IsDir: fi.Mode.IsDir(), IsDir: fi.Mode.IsDir(),
ContentType: contentType, ContentType: contentType,
MD5Sum: "", // Read from metadata. MD5Sum: fi.MD5Sum,
}, nil }, nil
} }
@ -206,7 +208,7 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
// Range through all files and delete it. // Range through all files and delete it.
for _, part := range info { for _, part := range info.Parts {
err = xl.storage.DeleteFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber))) err = xl.storage.DeleteFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)))
if err != nil { if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
@ -273,7 +275,9 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
fileInfo.Mode = 0 fileInfo.Mode = 0
fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator) fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator)
fileInfo.Size = info.GetSize() fileInfo.Size = info.Size
fileInfo.ModTime = info.ModTime
fileInfo.MD5Sum = info.MD5Sum
} else if err != errFileNotFound { } else if err != errFileNotFound {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
} }
@ -287,7 +291,9 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
if err != nil { if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
} }
fileInfo.Size = info.GetSize() fileInfo.Size = info.Size
fileInfo.ModTime = info.ModTime
fileInfo.MD5Sum = info.MD5Sum
allFileInfos = append(allFileInfos, fileInfo) allFileInfos = append(allFileInfos, fileInfo)
maxKeys-- maxKeys--
continue continue