mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
XL: Add additional PartNumber variable as part of xl.json
(#1750)
This is needed for verification of incoming parts and to support variadic part uploads. Which should be sorted properly. Fixes #1740
This commit is contained in:
parent
a97230dd56
commit
ee6645f421
@ -6,7 +6,8 @@
|
||||
},
|
||||
"parts": [
|
||||
{
|
||||
"name": "object00001",
|
||||
"number": 1,
|
||||
"name": "object1",
|
||||
"size": 29,
|
||||
"eTag": "",
|
||||
},
|
||||
|
@ -1,19 +1,22 @@
|
||||
{
|
||||
"parts": [
|
||||
{
|
||||
"number": 1,
|
||||
"size": 5242880,
|
||||
"etag": "3565c6e741e69a007a5ac7db893a62b5",
|
||||
"name": "object00001"
|
||||
"name": "object1"
|
||||
},
|
||||
{
|
||||
"number": 2,
|
||||
"size": 5242880,
|
||||
"etag": "d416712335c280ab1e39498552937764",
|
||||
"name": "object00002"
|
||||
"name": "object2"
|
||||
},
|
||||
{
|
||||
"number": 3,
|
||||
"size": 4338324,
|
||||
"etag": "8a98c5c54d81c6c95ed9bdcaeb941aaf",
|
||||
"name": "object00003"
|
||||
"name": "object3"
|
||||
}
|
||||
],
|
||||
"meta": {
|
||||
@ -33,12 +36,12 @@
|
||||
"data": 5,
|
||||
"checksum": [
|
||||
{
|
||||
"name": "object.00001",
|
||||
"name": "object1",
|
||||
"algorithm": "sha512",
|
||||
"hash": "d9910e1492446389cfae6fe979db0245f96ca97ca2c7a25cab45805882004479320d866a47ea1f7be6a62625dd4de6caf7816009ef9d62779346d01a221b335c",
|
||||
},
|
||||
{
|
||||
"name": "object.00002",
|
||||
"name": "object2",
|
||||
"algorithm": "sha512",
|
||||
"hash": "d9910e1492446389cfae6fe979db0245f96ca97ca2c7a25cab45805882004479320d866a47ea1f7be6a62625dd4de6caf7816009ef9d62779346d01a221b335c",
|
||||
},
|
||||
|
@ -8,6 +8,10 @@ import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
const (
|
||||
fsMetaJSONFile = "fs.json"
|
||||
)
|
||||
|
||||
// A fsMetaV1 represents a metadata header mapping keys to sets of values.
|
||||
type fsMetaV1 struct {
|
||||
Version string `json:"version"`
|
||||
@ -15,9 +19,6 @@ type fsMetaV1 struct {
|
||||
Minio struct {
|
||||
Release string `json:"release"`
|
||||
} `json:"minio"`
|
||||
Checksum struct {
|
||||
Enable bool `json:"enable"`
|
||||
} `json:"checksum"`
|
||||
Parts []objectPartInfo `json:"parts,omitempty"`
|
||||
}
|
||||
|
||||
@ -44,9 +45,9 @@ func (m fsMetaV1) WriteTo(writer io.Writer) (n int64, err error) {
|
||||
}
|
||||
|
||||
// SearchObjectPart - search object part name and etag.
|
||||
func (m fsMetaV1) SearchObjectPart(name string, etag string) int {
|
||||
func (m fsMetaV1) SearchObjectPart(number int) int {
|
||||
for i, part := range m.Parts {
|
||||
if name == part.Name && etag == part.ETag {
|
||||
if number == part.Number {
|
||||
return i
|
||||
}
|
||||
}
|
||||
@ -54,19 +55,16 @@ func (m fsMetaV1) SearchObjectPart(name string, etag string) int {
|
||||
}
|
||||
|
||||
// AddObjectPart - add a new object part in order.
|
||||
func (m *fsMetaV1) AddObjectPart(name string, etag string, size int64) {
|
||||
func (m *fsMetaV1) AddObjectPart(number int, name string, etag string, size int64) {
|
||||
m.Parts = append(m.Parts, objectPartInfo{
|
||||
Name: name,
|
||||
ETag: etag,
|
||||
Size: size,
|
||||
Number: number,
|
||||
Name: name,
|
||||
ETag: etag,
|
||||
Size: size,
|
||||
})
|
||||
sort.Sort(byPartName(m.Parts))
|
||||
sort.Sort(byPartNumber(m.Parts))
|
||||
}
|
||||
|
||||
const (
|
||||
fsMetaJSONFile = "fs.json"
|
||||
)
|
||||
|
||||
// readFSMetadata - read `fs.json`.
|
||||
func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) {
|
||||
r, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0))
|
||||
|
@ -407,7 +407,7 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
fsMeta.AddObjectPart(partSuffix, newMD5Hex, size)
|
||||
fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
|
||||
|
||||
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
|
||||
err = fs.storage.RenameFile(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
|
||||
@ -454,18 +454,21 @@ func (fs fsObjects) listObjectPartsCommon(bucket, object, uploadID string, partN
|
||||
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
// Only parts with higher part numbers will be listed.
|
||||
parts := fsMeta.Parts[partNumberMarker:]
|
||||
partIdx := fsMeta.SearchObjectPart(partNumberMarker)
|
||||
parts := fsMeta.Parts
|
||||
if partIdx != -1 {
|
||||
parts = fsMeta.Parts[partIdx+1:]
|
||||
}
|
||||
count := maxParts
|
||||
for i, part := range parts {
|
||||
for _, part := range parts {
|
||||
var fi FileInfo
|
||||
partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name)
|
||||
fi, err = fs.storage.StatFile(minioMetaBucket, partNamePath)
|
||||
if err != nil {
|
||||
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, partNamePath)
|
||||
}
|
||||
partNum := i + partNumberMarker + 1
|
||||
result.Parts = append(result.Parts, partInfo{
|
||||
PartNumber: partNum,
|
||||
PartNumber: part.Number,
|
||||
ETag: part.ETag,
|
||||
LastModified: fi.ModTime,
|
||||
Size: fi.Size,
|
||||
|
@ -887,10 +887,6 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
|
||||
writeErrorResponse(w, r, ErrInvalidMaxParts, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if maxParts == 0 {
|
||||
maxParts = maxPartsList
|
||||
}
|
||||
|
||||
listPartsInfo, err := api.ObjectAPI.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to list uploaded parts.")
|
||||
|
@ -36,9 +36,10 @@ const (
|
||||
// objectPartInfo Info of each part kept in the multipart metadata
|
||||
// file after CompleteMultipartUpload() is called.
|
||||
type objectPartInfo struct {
|
||||
Name string `json:"name"`
|
||||
ETag string `json:"etag"`
|
||||
Size int64 `json:"size"`
|
||||
Number int `json:"number"`
|
||||
Name string `json:"name"`
|
||||
ETag string `json:"etag"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
// A xlMetaV1 represents a metadata header mapping keys to sets of values.
|
||||
@ -93,17 +94,17 @@ func (m xlMetaV1) WriteTo(writer io.Writer) (n int64, err error) {
|
||||
}
|
||||
|
||||
// byPartName is a collection satisfying sort.Interface.
|
||||
type byPartName []objectPartInfo
|
||||
type byPartNumber []objectPartInfo
|
||||
|
||||
func (t byPartName) Len() int { return len(t) }
|
||||
func (t byPartName) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
||||
func (t byPartName) Less(i, j int) bool { return t[i].Name < t[j].Name }
|
||||
func (t byPartNumber) Len() int { return len(t) }
|
||||
func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
||||
func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
|
||||
|
||||
// SearchObjectPart - searches for part name and etag, returns the
|
||||
// index if found.
|
||||
func (m xlMetaV1) SearchObjectPart(name string, etag string) int {
|
||||
func (m xlMetaV1) SearchObjectPart(number int) int {
|
||||
for i, part := range m.Parts {
|
||||
if name == part.Name && etag == part.ETag {
|
||||
if number == part.Number {
|
||||
return i
|
||||
}
|
||||
}
|
||||
@ -111,25 +112,33 @@ func (m xlMetaV1) SearchObjectPart(name string, etag string) int {
|
||||
}
|
||||
|
||||
// AddObjectPart - add a new object part in order.
|
||||
func (m *xlMetaV1) AddObjectPart(name string, etag string, size int64) {
|
||||
m.Parts = append(m.Parts, objectPartInfo{
|
||||
Name: name,
|
||||
ETag: etag,
|
||||
Size: size,
|
||||
})
|
||||
sort.Sort(byPartName(m.Parts))
|
||||
func (m *xlMetaV1) AddObjectPart(number int, name string, etag string, size int64) {
|
||||
partInfo := objectPartInfo{
|
||||
Number: number,
|
||||
Name: name,
|
||||
ETag: etag,
|
||||
Size: size,
|
||||
}
|
||||
for i, part := range m.Parts {
|
||||
if number == part.Number {
|
||||
m.Parts[i] = partInfo
|
||||
return
|
||||
}
|
||||
}
|
||||
m.Parts = append(m.Parts, partInfo)
|
||||
sort.Sort(byPartNumber(m.Parts))
|
||||
}
|
||||
|
||||
// getPartNumberOffset - given an offset for the whole object, return the part and offset in that part.
|
||||
func (m xlMetaV1) getPartNumberOffset(offset int64) (partNumber int, partOffset int64, err error) {
|
||||
// getPartIndexOffset - given an offset for the whole object, return the part and offset in that part.
|
||||
func (m xlMetaV1) getPartIndexOffset(offset int64) (partIndex int, partOffset int64, err error) {
|
||||
partOffset = offset
|
||||
for i, part := range m.Parts {
|
||||
partNumber = i
|
||||
partIndex = i
|
||||
if part.Size == 0 {
|
||||
return partNumber, partOffset, nil
|
||||
return partIndex, partOffset, nil
|
||||
}
|
||||
if partOffset < part.Size {
|
||||
return partNumber, partOffset, nil
|
||||
return partIndex, partOffset, nil
|
||||
}
|
||||
partOffset -= part.Size
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
|
||||
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
|
||||
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
|
||||
|
||||
partSuffix := fmt.Sprintf("object.%.5d", partID)
|
||||
partSuffix := fmt.Sprintf("object%d", partID)
|
||||
tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix)
|
||||
fileWriter, err := xl.erasureDisk.CreateFile(minioMetaBucket, tmpPartPath)
|
||||
if err != nil {
|
||||
@ -188,7 +188,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
xlMeta.AddObjectPart(partSuffix, newMD5Hex, size)
|
||||
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
|
||||
|
||||
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
|
||||
err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
|
||||
@ -236,19 +236,39 @@ func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partN
|
||||
if err != nil {
|
||||
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
// Populate the result stub.
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
result.MaxParts = maxParts
|
||||
|
||||
// For empty number of parts or maxParts as zero, return right here.
|
||||
if len(xlMeta.Parts) == 0 || maxParts == 0 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Limit output to maxPartsList.
|
||||
if maxParts > maxPartsList {
|
||||
maxParts = maxPartsList
|
||||
}
|
||||
|
||||
// Only parts with higher part numbers will be listed.
|
||||
parts := xlMeta.Parts[partNumberMarker:]
|
||||
partIdx := xlMeta.SearchObjectPart(partNumberMarker)
|
||||
parts := xlMeta.Parts
|
||||
if partIdx != -1 {
|
||||
parts = xlMeta.Parts[partIdx+1:]
|
||||
}
|
||||
count := maxParts
|
||||
for i, part := range parts {
|
||||
for _, part := range parts {
|
||||
var fi FileInfo
|
||||
partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name)
|
||||
fi, err = disk.StatFile(minioMetaBucket, partNamePath)
|
||||
if err != nil {
|
||||
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, partNamePath)
|
||||
}
|
||||
partNum := i + partNumberMarker + 1
|
||||
result.Parts = append(result.Parts, partInfo{
|
||||
PartNumber: partNum,
|
||||
PartNumber: part.Number,
|
||||
ETag: part.ETag,
|
||||
LastModified: fi.ModTime,
|
||||
Size: fi.Size,
|
||||
@ -266,10 +286,6 @@ func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partN
|
||||
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
|
||||
result.NextPartNumberMarker = nextPartNumberMarker
|
||||
}
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
result.MaxParts = maxParts
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -309,24 +325,45 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
}
|
||||
|
||||
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
|
||||
|
||||
// Read the current `xl.json`.
|
||||
xlMeta, err := readXLMetadata(xl.getRandomDisk(), minioMetaBucket, uploadIDPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
var objectSize int64
|
||||
|
||||
// Save current xl meta for validation.
|
||||
var currentXLMeta = xlMeta
|
||||
|
||||
// Allocate parts similar to incoming slice.
|
||||
xlMeta.Parts = make([]objectPartInfo, len(parts))
|
||||
|
||||
// Loop through all parts, validate them and then commit to disk.
|
||||
for i, part := range parts {
|
||||
// Construct part suffix.
|
||||
partSuffix := fmt.Sprintf("object.%.5d", part.PartNumber)
|
||||
if xlMeta.SearchObjectPart(partSuffix, part.ETag) == -1 {
|
||||
partIdx := currentXLMeta.SearchObjectPart(part.PartNumber)
|
||||
if partIdx == -1 {
|
||||
return "", InvalidPart{}
|
||||
}
|
||||
if currentXLMeta.Parts[partIdx].ETag != part.ETag {
|
||||
return "", BadDigest{}
|
||||
}
|
||||
// All parts except the last part has to be atleast 5MB.
|
||||
if (i < len(parts)-1) && !isMinAllowedPartSize(xlMeta.Parts[i].Size) {
|
||||
if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].Size) {
|
||||
return "", PartTooSmall{}
|
||||
}
|
||||
objectSize += xlMeta.Parts[i].Size
|
||||
|
||||
// Save for total object size.
|
||||
objectSize += currentXLMeta.Parts[partIdx].Size
|
||||
|
||||
// Add incoming parts.
|
||||
xlMeta.Parts[i] = objectPartInfo{
|
||||
Number: part.PartNumber,
|
||||
ETag: part.ETag,
|
||||
Size: currentXLMeta.Parts[partIdx].Size,
|
||||
Name: fmt.Sprintf("object%d", part.PartNumber),
|
||||
}
|
||||
}
|
||||
|
||||
// Check if an object is present as one of the parent dir.
|
||||
|
@ -32,7 +32,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
|
||||
if err != nil {
|
||||
return nil, toObjectErr(err, bucket, object)
|
||||
}
|
||||
partIndex, offset, err := xlMeta.getPartNumberOffset(startOffset)
|
||||
partIndex, offset, err := xlMeta.getPartIndexOffset(startOffset)
|
||||
if err != nil {
|
||||
return nil, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -190,7 +190,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
nsMutex.Lock(bucket, object)
|
||||
defer nsMutex.Unlock(bucket, object)
|
||||
|
||||
tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object00001")
|
||||
tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1")
|
||||
tempObj := path.Join(tmpMetaPrefix, bucket, object)
|
||||
fileWriter, err := xl.erasureDisk.CreateFile(minioMetaBucket, tempErasureObj)
|
||||
if err != nil {
|
||||
@ -282,7 +282,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
xlMeta.Meta = metadata
|
||||
xlMeta.Stat.Size = size
|
||||
xlMeta.Stat.ModTime = modTime
|
||||
xlMeta.AddObjectPart("object00001", newMD5Hex, xlMeta.Stat.Size)
|
||||
xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size)
|
||||
if err = xl.writeXLMetadata(bucket, object, xlMeta); err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user