Tweak multipart uploads (#19756)

* Store ModTime in the upload ID; return it when listing instead of the current time.
* Use this ModTime to expire and skip reading the file info.
* Consistent upload sorting in listing (since it now has the ModTime).
* Exclude healing disks to avoid returning an empty list.
This commit is contained in:
Klaus Post 2024-05-17 09:40:09 -07:00 committed by GitHub
parent e432e79324
commit 3b7747b42b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -199,38 +199,57 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
if err != nil {
var modTime time.Time
// Upload IDs are of the form base64_url(<UUID>x<UnixNano>), we can extract the time from the UUID.
if b64, err := base64.RawURLEncoding.DecodeString(uploadIDDir); err == nil {
if split := strings.Split(string(b64), "x"); len(split) == 2 {
t, err := strconv.ParseInt(split[1], 10, 64)
if err == nil {
modTime = time.Unix(0, t)
}
}
}
// Fallback for older uploads without time in the ID.
if modTime.IsZero() {
wait := deleteMultipartCleanupSleeper.Timer(ctx)
fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
if err != nil {
return nil
}
modTime = fi.ModTime
wait()
}
if time.Since(modTime) < expiry {
return nil
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error {
wait := deleteMultipartCleanupSleeper.Timer(ctx)
if time.Since(fi.ModTime) > expiry {
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
renameAll(pathJoin(drivePath, minioMetaMultipartBucket, uploadIDPath), targetPath, pathJoin(drivePath, minioMetaBucket))
}
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
renameAll(pathJoin(drivePath, minioMetaMultipartBucket, uploadIDPath), targetPath, pathJoin(drivePath, minioMetaBucket))
wait()
return nil
})
})
// Get the modtime of the shaDir.
vi, err := disk.StatVol(ctx, pathJoin(minioMetaMultipartBucket, shaDir))
if err != nil {
return nil
}
// Modtime is returned in the Created field. See (*xlStorage).StatVol
if time.Since(vi.Created) < expiry {
return nil
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error {
wait := deleteMultipartCleanupSleeper.Timer(ctx)
if time.Since(vi.Created) > expiry {
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
// We are not deleting shaDir recursively here, if shaDir is empty
// and its older then we can happily delete it.
Rename(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), targetPath)
}
// We are not deleting shaDir recursively here, if shaDir is empty
// and its older then we can happily delete it.
Rename(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), targetPath)
wait()
return nil
})
@ -279,10 +298,13 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
var disk StorageAPI
disks := er.getOnlineLocalDisks()
if len(disks) == 0 {
// using er.getOnlineLocalDisks() has one side-affect where
// on a pooled setup all disks are remote, add a fallback
disks = er.getOnlineDisks()
// If no local, get non-healing disks.
var ok bool
if disks, ok = er.getOnlineDisksWithHealing(false); !ok {
disks = er.getOnlineDisks()
}
}
for _, disk = range disks {
if disk == nil {
continue
@ -494,7 +516,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
partsMetadata[index].ModTime = modTime
partsMetadata[index].Metadata = userDefined
}
uploadUUID := mustGetUUID()
uploadUUID := fmt.Sprintf("%sx%d", mustGetUUID(), modTime.UnixNano())
uploadID := base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadUUID)))
uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID)