diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 9d9b0d453..12e4bc48e 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -199,38 +199,57 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto readDirFn(pathJoin(drivePath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error { readDirFn(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{}) - if err != nil { + var modTime time.Time + // Upload IDs are of the form base64_url(x), we can extract the time from the UUID. + if b64, err := base64.RawURLEncoding.DecodeString(uploadIDDir); err == nil { + if split := strings.Split(string(b64), "x"); len(split) == 2 { + t, err := strconv.ParseInt(split[1], 10, 64) + if err == nil { + modTime = time.Unix(0, t) + } + } + } + // Fallback for older uploads without time in the ID. + if modTime.IsZero() { + wait := deleteMultipartCleanupSleeper.Timer(ctx) + fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{}) + if err != nil { + return nil + } + modTime = fi.ModTime + wait() + } + if time.Since(modTime) < expiry { return nil } w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) return w.Run(func() error { wait := deleteMultipartCleanupSleeper.Timer(ctx) - if time.Since(fi.ModTime) > expiry { - pathUUID := mustGetUUID() - targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID) - - renameAll(pathJoin(drivePath, minioMetaMultipartBucket, uploadIDPath), targetPath, pathJoin(drivePath, minioMetaBucket)) - } + pathUUID := mustGetUUID() + targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID) + renameAll(pathJoin(drivePath, minioMetaMultipartBucket, uploadIDPath), targetPath, pathJoin(drivePath, minioMetaBucket)) wait() return nil }) }) + // Get the modtime of the shaDir. vi, err := disk.StatVol(ctx, pathJoin(minioMetaMultipartBucket, shaDir)) if err != nil { return nil } + // Modtime is returned in the Created field. See (*xlStorage).StatVol + if time.Since(vi.Created) < expiry { + return nil + } w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) return w.Run(func() error { wait := deleteMultipartCleanupSleeper.Timer(ctx) - if time.Since(vi.Created) > expiry { - pathUUID := mustGetUUID() - targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID) + pathUUID := mustGetUUID() + targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID) - // We are not deleting shaDir recursively here, if shaDir is empty - // and its older then we can happily delete it. - Rename(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), targetPath) - } + // We are not deleting shaDir recursively here, if shaDir is empty + // and its older then we can happily delete it. + Rename(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), targetPath) wait() return nil }) @@ -279,10 +298,13 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec var disk StorageAPI disks := er.getOnlineLocalDisks() if len(disks) == 0 { - // using er.getOnlineLocalDisks() has one side-affect where - // on a pooled setup all disks are remote, add a fallback - disks = er.getOnlineDisks() + // If no local, get non-healing disks. + var ok bool + if disks, ok = er.getOnlineDisksWithHealing(false); !ok { + disks = er.getOnlineDisks() + } } + for _, disk = range disks { if disk == nil { continue @@ -494,7 +516,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, partsMetadata[index].ModTime = modTime partsMetadata[index].Metadata = userDefined } - uploadUUID := mustGetUUID() + uploadUUID := fmt.Sprintf("%sx%d", mustGetUUID(), modTime.UnixNano()) uploadID := base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadUUID))) uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID)