XL: Truly use unique id's in temp directory. (#1790)

This also helps in avoiding cleaning up directories after.

Additionally this patch also fixes the problem of Range offsets.
This commit is contained in:
Harshavardhana 2016-05-29 00:42:09 -07:00
parent feb337098d
commit 5e8de786b3
6 changed files with 66 additions and 52 deletions

View File

@ -76,7 +76,7 @@ func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta
return "", err return "", err
} }
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil { if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
@ -300,7 +300,7 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
partSuffix := fmt.Sprintf("object%d", partID) partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix)
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
@ -348,7 +348,7 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s
return "", toObjectErr(err, minioMetaBucket, partPath) return "", toObjectErr(err, minioMetaBucket, partPath)
} }
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil { if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
@ -475,7 +475,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", err return "", err
} }
tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, "object1") tempObj := path.Join(tmpMetaPrefix, uploadID, "object1")
var buffer = make([]byte, blockSize) var buffer = make([]byte, blockSize)
// Loop through all parts, validate them and then commit to disk. // Loop through all parts, validate them and then commit to disk.

View File

@ -229,8 +229,10 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
} }
} }
uniqueID := getUUID()
// Temporary object. // Temporary object.
tempObj := path.Join(tmpMetaPrefix, bucket, object) tempObj := path.Join(tmpMetaPrefix, uniqueID)
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()

View File

@ -140,8 +140,8 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
// Count for errors encountered. // Count for errors encountered.
var xlJSONErrCount = 0 var xlJSONErrCount = 0
// Allocate 4MB buffer. // Allocate 4MiB buffer.
var buffer = make([]byte, blockSize) buffer := make([]byte, blockSize)
// Return the first successful lookup from a random list of disks. // Return the first successful lookup from a random list of disks.
for xlJSONErrCount < len(xl.storageDisks) { for xlJSONErrCount < len(xl.storageDisks) {
@ -173,6 +173,7 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
return xlMeta return xlMeta
} }
// renameXLMetadata - renames `xl.json` from source prefix to destination prefix.
func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error { func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks)) var mErrs = make([]error, len(xl.storageDisks))
@ -185,11 +186,18 @@ func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix
// Rename `xl.json` in a routine. // Rename `xl.json` in a routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
// Renames `xl.json` from source prefix to destination prefix.
rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
if rErr != nil { if rErr != nil {
mErrs[index] = rErr mErrs[index] = rErr
return return
} }
// Delete any dangling directories.
dErr := disk.DeleteFile(srcBucket, srcPrefix)
if dErr != nil {
mErrs[index] = dErr
return
}
mErrs[index] = nil mErrs[index] = nil
}(index, disk) }(index, disk)
} }

View File

@ -114,7 +114,8 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI
// uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. // uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error { func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error {
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile) uniqueID := getUUID()
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
@ -169,7 +170,8 @@ func newUploadsV1(format string) uploadsV1 {
// writeUploadJSON - create `uploads.json` or update it with new uploadID. // writeUploadJSON - create `uploads.json` or update it with new uploadID.
func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) (err error) { func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) (err error) {
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile) uniqueID := getUUID()
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}

View File

@ -82,7 +82,7 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta
return "", err return "", err
} }
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
@ -141,7 +141,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
partSuffix := fmt.Sprintf("object%d", partID) partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix)
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
@ -173,6 +173,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
return "", BadDigest{md5Hex, newMD5Hex} return "", BadDigest{md5Hex, newMD5Hex}
} }
} }
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil { if err != nil {
@ -184,7 +185,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
@ -372,7 +373,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Save successfully calculated md5sum. // Save successfully calculated md5sum.
xlMeta.Meta["md5Sum"] = s3MD5 xlMeta.Meta["md5Sum"] = s3MD5
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
@ -380,16 +381,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if rErr != nil { if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
} }
// Hold write lock on the destination before rename // Hold write lock on the destination before rename
nsMutex.Lock(bucket, object) nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object) defer nsMutex.Unlock(bucket, object)
// Delete if an object already exists. // Rename if an object already exists to temporary location.
// FIXME: rename it to tmp file and delete only after uniqueID := getUUID()
// the newly uploaded file is renamed from tmp location to err = xl.renameObject(bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID))
// the original location. Verify if the object is a multipart object.
err = xl.deleteObject(bucket, object)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
@ -407,10 +405,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
} }
// Rename the multipart object to final location.
if err = xl.renameObject(minioMetaBucket, uploadIDPath, bucket, object); err != nil { if err = xl.renameObject(minioMetaBucket, uploadIDPath, bucket, object); err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
// Delete the previously successfully renamed object.
xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID))
// Hold the lock so that two parallel complete-multipart-uploads do no // Hold the lock so that two parallel complete-multipart-uploads do no
// leave a stale uploads.json behind. // leave a stale uploads.json behind.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))

View File

@ -54,27 +54,22 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
totalLeft := length totalLeft := length
for ; partIndex < len(xlMeta.Parts); partIndex++ { for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex] part := xlMeta.Parts[partIndex]
totalPartSize := part.Size var buffer io.Reader
for totalPartSize > 0 { buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size)
var buffer io.Reader if err != nil {
buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size) return err
if err != nil {
return err
}
if int64(buffer.(*bytes.Buffer).Len()) > totalLeft {
if _, err := io.CopyN(writer, buffer, totalLeft); err != nil {
return err
}
return nil
}
n, err := io.Copy(writer, buffer)
if err != nil {
return err
}
totalLeft -= n
totalPartSize -= n
partOffset += n
} }
if int64(buffer.(*bytes.Buffer).Len()) > totalLeft {
if _, err := io.CopyN(writer, buffer, totalLeft); err != nil {
return err
}
return nil
}
n, err := io.Copy(writer, buffer)
if err != nil {
return err
}
totalLeft -= n
// Reset part offset to 0 to read rest of the parts from the beginning. // Reset part offset to 0 to read rest of the parts from the beginning.
partOffset = 0 partOffset = 0
} }
@ -203,8 +198,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
nsMutex.Lock(bucket, object) nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object) defer nsMutex.Unlock(bucket, object)
tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1") uniqueID := getUUID()
tempObj := path.Join(tmpMetaPrefix, bucket, object) tempErasureObj := path.Join(tmpMetaPrefix, uniqueID, "object1")
tempObj := path.Join(tmpMetaPrefix, uniqueID)
// Initialize xl meta. // Initialize xl meta.
xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks)
@ -281,13 +277,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
return "", toObjectErr(errFileAccessDenied, bucket, object) return "", toObjectErr(errFileAccessDenied, bucket, object)
} }
// Delete if an object already exists. // Rename if an object already exists to temporary location.
err = xl.deleteObject(bucket, object) newUniqueID := getUUID()
if err != nil { err = xl.renameObject(bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, newUniqueID))
return "", toObjectErr(err, bucket, object)
}
err = xl.renameObject(minioMetaBucket, tempObj, bucket, object)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
@ -298,13 +290,21 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
xlMeta.Stat.ModTime = modTime xlMeta.Stat.ModTime = modTime
xlMeta.Stat.Version = higherVersion xlMeta.Stat.Version = higherVersion
xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size) xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size)
if err = xl.writeXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), xlMeta); err != nil {
// Write `xl.json` metadata.
if err = xl.writeXLMetadata(minioMetaBucket, tempObj, xlMeta); err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
rErr := xl.renameXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), bucket, object)
if rErr != nil { // Rename the successfully written tempoary object to final location.
return "", toObjectErr(rErr, bucket, object) err = xl.renameObject(minioMetaBucket, tempObj, bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
} }
// Delete the temporary object.
xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, newUniqueID))
// Return md5sum, successfully wrote object. // Return md5sum, successfully wrote object.
return newMD5Hex, nil return newMD5Hex, nil
} }