mirror of
https://github.com/minio/minio.git
synced 2024-12-26 07:05:55 -05:00
add DeleteBulk support, add sufficient deadlines per rename() (#20185)
deadlines per moveToTrash() allows for a more granular timeout approach for syscalls, instead of an aggregate timeout. This PR also enhances multipart state cleanup to be optimal by removing 100's of multipart network rename() calls into single network call.
This commit is contained in:
parent
673df6d517
commit
80ff907d08
@ -103,10 +103,9 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
|
||||
return fi, partsMetadata, err
|
||||
}
|
||||
|
||||
// Removes part.meta given by partName belonging to a multipart upload from minioMetaBucket
|
||||
func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string, partNumber int) {
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
// cleanMultipartPath removes all extraneous files and parts from the multipart folder, this is used per CompleteMultipart.
|
||||
// do not use this function outside of completeMultipartUpload()
|
||||
func (er erasureObjects) cleanupMultipartPath(ctx context.Context, paths ...string) {
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
g := errgroup.WithNErrs(len(storageDisks))
|
||||
@ -116,42 +115,7 @@ func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string
|
||||
}
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
|
||||
Recursive: false,
|
||||
Immediate: false,
|
||||
})
|
||||
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
g.Wait()
|
||||
}
|
||||
|
||||
// Removes part given by partName belonging to a multipart upload from minioMetaBucket
|
||||
func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) {
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
g := errgroup.WithNErrs(len(storageDisks))
|
||||
for index, disk := range storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
|
||||
// requests. xl.meta is the authoritative source of truth on which parts constitute
|
||||
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
|
||||
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath, DeleteOptions{
|
||||
Recursive: false,
|
||||
Immediate: false,
|
||||
})
|
||||
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
|
||||
Recursive: false,
|
||||
Immediate: false,
|
||||
})
|
||||
|
||||
_ = storageDisks[index].DeleteBulk(ctx, minioMetaMultipartBucket, paths...)
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
@ -1359,10 +1323,10 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
}
|
||||
|
||||
paths := make([]string, 0, len(currentFI.Parts))
|
||||
// Remove parts that weren't present in CompleteMultipartUpload request.
|
||||
for _, curpart := range currentFI.Parts {
|
||||
// Remove part.meta which is not needed anymore.
|
||||
er.removePartMeta(bucket, object, uploadID, currentFI.DataDir, curpart.Number)
|
||||
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d.meta", curpart.Number)))
|
||||
|
||||
if objectPartIndex(fi.Parts, curpart.Number) == -1 {
|
||||
// Delete the missing part files. e.g,
|
||||
@ -1371,10 +1335,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
// Request 3: PutObjectPart 2
|
||||
// Request 4: CompleteMultipartUpload --part 2
|
||||
// N.B. 1st part is not present. This part should be removed from the storage.
|
||||
er.removeObjectPart(bucket, object, uploadID, currentFI.DataDir, curpart.Number)
|
||||
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d", curpart.Number)))
|
||||
}
|
||||
}
|
||||
|
||||
er.cleanupMultipartPath(ctx, paths...) // cleanup all part.N.meta, and skipped part.N's before final rename().
|
||||
|
||||
defer func() {
|
||||
if err == nil {
|
||||
er.deleteAll(context.Background(), minioMetaMultipartBucket, uploadIDPath)
|
||||
|
@ -222,6 +222,13 @@ func (d *naughtyDisk) CheckParts(ctx context.Context, volume string, path string
|
||||
return d.disk.CheckParts(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.disk.DeleteBulk(ctx, volume, paths...)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
|
@ -392,24 +392,24 @@ func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) {
|
||||
|
||||
// ReadMultipleReq contains information of multiple files to read from disk.
|
||||
type ReadMultipleReq struct {
|
||||
Bucket string // Bucket. Can be empty if multiple buckets.
|
||||
Prefix string // Shared prefix of all files. Can be empty. Will be joined to filename without modification.
|
||||
Files []string // Individual files to read.
|
||||
MaxSize int64 // Return error if size is exceed.
|
||||
MetadataOnly bool // Read as XL meta and truncate data.
|
||||
AbortOn404 bool // Stop reading after first file not found.
|
||||
MaxResults int // Stop after this many successful results. <= 0 means all.
|
||||
Bucket string `msg:"bk"` // Bucket. Can be empty if multiple buckets.
|
||||
Prefix string `msg:"pr,omitempty"` // Shared prefix of all files. Can be empty. Will be joined to filename without modification.
|
||||
Files []string `msg:"fl"` // Individual files to read.
|
||||
MaxSize int64 `msg:"ms"` // Return error if size is exceed.
|
||||
MetadataOnly bool `msg:"mo"` // Read as XL meta and truncate data.
|
||||
AbortOn404 bool `msg:"ab"` // Stop reading after first file not found.
|
||||
MaxResults int `msg:"mr"` // Stop after this many successful results. <= 0 means all.
|
||||
}
|
||||
|
||||
// ReadMultipleResp contains a single response from a ReadMultipleReq.
|
||||
type ReadMultipleResp struct {
|
||||
Bucket string // Bucket as given by request.
|
||||
Prefix string // Prefix as given by request.
|
||||
File string // File name as given in request.
|
||||
Exists bool // Returns whether the file existed on disk.
|
||||
Error string // Returns any error when reading.
|
||||
Data []byte // Contains all data of file.
|
||||
Modtime time.Time // Modtime of file on disk.
|
||||
Bucket string `msg:"bk"` // Bucket as given by request.
|
||||
Prefix string `msg:"pr,omitempty"` // Prefix as given by request.
|
||||
File string `msg:"fl"` // File name as given in request.
|
||||
Exists bool `msg:"ex"` // Returns whether the file existed on disk.
|
||||
Error string `msg:"er,omitempty"` // Returns any error when reading.
|
||||
Data []byte `msg:"d"` // Contains all data of file.
|
||||
Modtime time.Time `msg:"m"` // Modtime of file on disk.
|
||||
}
|
||||
|
||||
// DeleteVersionHandlerParams are parameters for DeleteVersionHandler
|
||||
@ -516,8 +516,8 @@ type WriteAllHandlerParams struct {
|
||||
// only after as a 2-phase call, allowing the older dataDir to
|
||||
// hang-around in-case we need some form of recovery.
|
||||
type RenameDataResp struct {
|
||||
Sign []byte
|
||||
OldDataDir string // contains '<uuid>', it is designed to be passed as value to Delete(bucket, pathJoin(object, dataDir))
|
||||
Sign []byte `msg:"s"`
|
||||
OldDataDir string `msg:"od"` // contains '<uuid>', it is designed to be passed as value to Delete(bucket, pathJoin(object, dataDir))
|
||||
}
|
||||
|
||||
const (
|
||||
@ -534,15 +534,26 @@ const (
|
||||
|
||||
// CheckPartsResp is a response of the storage CheckParts and VerifyFile APIs
|
||||
type CheckPartsResp struct {
|
||||
Results []int
|
||||
Results []int `msg:"r"`
|
||||
}
|
||||
|
||||
// LocalDiskIDs - GetLocalIDs response.
|
||||
type LocalDiskIDs struct {
|
||||
IDs []string
|
||||
IDs []string `msg:"i"`
|
||||
}
|
||||
|
||||
// ListDirResult - ListDir()'s response.
|
||||
type ListDirResult struct {
|
||||
Entries []string `msg:"e"`
|
||||
}
|
||||
|
||||
// DeleteBulkReq - send multiple paths in same delete request.
|
||||
type DeleteBulkReq struct {
|
||||
Paths []string `msg:"p"`
|
||||
}
|
||||
|
||||
// DeleteVersionsErrsResp - collection of delete errors
|
||||
// for bulk version deletes
|
||||
type DeleteVersionsErrsResp struct {
|
||||
Errs []string `msg:"e"`
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -348,6 +348,119 @@ func BenchmarkDecodeCheckPartsResp(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDeleteBulkReq(t *testing.T) {
|
||||
v := DeleteBulkReq{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecodeDeleteBulkReq(t *testing.T) {
|
||||
v := DeleteBulkReq{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecodeDeleteBulkReq Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := DeleteBulkReq{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodeDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecodeDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDeleteFileHandlerParams(t *testing.T) {
|
||||
v := DeleteFileHandlerParams{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
@ -687,6 +800,119 @@ func BenchmarkDecodeDeleteVersionHandlerParams(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDeleteVersionsErrsResp(t *testing.T) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecodeDeleteVersionsErrsResp(t *testing.T) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecodeDeleteVersionsErrsResp Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := DeleteVersionsErrsResp{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodeDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecodeDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDiskInfo(t *testing.T) {
|
||||
v := DiskInfo{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
|
@ -81,6 +81,7 @@ type StorageAPI interface {
|
||||
// Metadata operations
|
||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) error
|
||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) []error
|
||||
DeleteBulk(ctx context.Context, volume string, paths ...string) error
|
||||
WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error
|
||||
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error
|
||||
ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (FileInfo, error)
|
||||
|
@ -737,7 +737,9 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
||||
}
|
||||
|
||||
dErrResp := &DeleteVersionsErrsResp{}
|
||||
if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil {
|
||||
decoder := msgpNewReader(reader)
|
||||
defer readMsgpReaderPoolPut(decoder)
|
||||
if err = dErrResp.DecodeMsg(decoder); err != nil {
|
||||
for i := range errs {
|
||||
errs[i] = toStorageErr(err)
|
||||
}
|
||||
@ -745,7 +747,11 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
||||
}
|
||||
|
||||
for i, dErr := range dErrResp.Errs {
|
||||
errs[i] = toStorageErr(dErr)
|
||||
if dErr != "" {
|
||||
errs[i] = toStorageErr(errors.New(dErr))
|
||||
} else {
|
||||
errs[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
return errs
|
||||
@ -795,6 +801,26 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
||||
return verifyResp, nil
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
|
||||
req := &DeleteBulkReq{Paths: paths}
|
||||
body, err := req.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteBulk, values, bytes.NewReader(body), int64(len(body)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
|
||||
_, err = waitForHTTPResponse(respBody)
|
||||
return toStorageErr(err)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
|
@ -20,7 +20,7 @@ package cmd
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v61" // Move all Read* calls to http.MethodGet, compact handlers and query params fields
|
||||
storageRESTVersion = "v62" // Introduce DeleteBulk internode API.
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
@ -43,6 +43,7 @@ const (
|
||||
storageRESTMethodStatInfoFile = "/sfile"
|
||||
storageRESTMethodReadMultiple = "/rmpl"
|
||||
storageRESTMethodCleanAbandoned = "/cln"
|
||||
storageRESTMethodDeleteBulk = "/dblk"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -629,12 +628,6 @@ func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.
|
||||
return grid.NewNPErr(s.getStorage().Delete(context.Background(), p.Volume, p.FilePath, p.Opts))
|
||||
}
|
||||
|
||||
// DeleteVersionsErrsResp - collection of delete errors
|
||||
// for bulk version deletes
|
||||
type DeleteVersionsErrsResp struct {
|
||||
Errs []error
|
||||
}
|
||||
|
||||
// DeleteVersionsHandler - delete a set of a versions.
|
||||
func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@ -659,21 +652,20 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http
|
||||
}
|
||||
}
|
||||
|
||||
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]error, totalVersions)}
|
||||
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
|
||||
opts := DeleteOptions{}
|
||||
errs := s.getStorage().DeleteVersions(r.Context(), volume, versions, opts)
|
||||
done(nil)
|
||||
|
||||
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]string, totalVersions)}
|
||||
for idx := range versions {
|
||||
if errs[idx] != nil {
|
||||
dErrsResp.Errs[idx] = StorageErr(errs[idx].Error())
|
||||
dErrsResp.Errs[idx] = errs[idx].Error()
|
||||
}
|
||||
}
|
||||
encoder.Encode(dErrsResp)
|
||||
|
||||
buf, _ := dErrsResp.MarshalMsg(nil)
|
||||
w.Write(buf)
|
||||
}
|
||||
|
||||
// RenameDataHandler - renames a meta object and data dir to destination.
|
||||
@ -1107,18 +1099,15 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
resp, err := s.getStorage().VerifyFile(r.Context(), volume, filePath, fi)
|
||||
done(nil)
|
||||
|
||||
done(err)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
encoder.Encode(resp)
|
||||
buf, _ := resp.MarshalMsg(nil)
|
||||
w.Write(buf)
|
||||
}
|
||||
|
||||
func checkDiskFatalErrs(errs []error) error {
|
||||
@ -1243,6 +1232,24 @@ func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) DeleteBulkHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
var req DeleteBulkReq
|
||||
mr := msgpNewReader(r.Body)
|
||||
defer readMsgpReaderPoolPut(mr)
|
||||
|
||||
if err := req.DecodeMsg(mr); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
volume := r.Form.Get(storageRESTVolume)
|
||||
keepHTTPResponseAlive(w)(s.getStorage().DeleteBulk(r.Context(), volume, req.Paths...))
|
||||
}
|
||||
|
||||
// ReadMultiple returns multiple files
|
||||
func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@ -1325,6 +1332,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteBulk).HandlerFunc(h(server.DeleteBulkHandler))
|
||||
|
||||
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
|
||||
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler))
|
||||
|
@ -36,12 +36,13 @@ func _() {
|
||||
_ = x[storageMetricReadMultiple-25]
|
||||
_ = x[storageMetricDeleteAbandonedParts-26]
|
||||
_ = x[storageMetricDiskInfo-27]
|
||||
_ = x[storageMetricLast-28]
|
||||
_ = x[storageMetricDeleteBulk-28]
|
||||
_ = x[storageMetricLast-29]
|
||||
}
|
||||
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoLast"
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoDeleteBulkLast"
|
||||
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 286}
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 292, 296}
|
||||
|
||||
func (i storageMetric) String() string {
|
||||
if i >= storageMetric(len(_storageMetric_index)-1) {
|
||||
|
@ -70,6 +70,7 @@ const (
|
||||
storageMetricReadMultiple
|
||||
storageMetricDeleteAbandonedParts
|
||||
storageMetricDiskInfo
|
||||
storageMetricDeleteBulk
|
||||
|
||||
// .... add more
|
||||
|
||||
@ -499,6 +500,16 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
|
||||
})
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteBulk, append([]string{volume}, paths...)...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer done(0, &err)
|
||||
|
||||
return p.storage.DeleteBulk(ctx, volume, paths...)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
|
||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDelete, volume, path)
|
||||
if err != nil {
|
||||
|
@ -1079,32 +1079,37 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
|
||||
return err
|
||||
}
|
||||
|
||||
s.RLock()
|
||||
legacy := s.formatLegacy
|
||||
s.RUnlock()
|
||||
|
||||
var legacyJSON bool
|
||||
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile))
|
||||
if err != nil {
|
||||
if !errors.Is(err, errFileNotFound) {
|
||||
return err
|
||||
buf, err := xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) {
|
||||
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile))
|
||||
if err != nil && !errors.Is(err, errFileNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.RLock()
|
||||
legacy := s.formatLegacy
|
||||
s.RUnlock()
|
||||
if legacy {
|
||||
if errors.Is(err, errFileNotFound) && legacy {
|
||||
buf, _, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1))
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
legacyJSON = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(buf) == 0 {
|
||||
if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) {
|
||||
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
|
||||
return errVolumeNotFound
|
||||
if len(buf) == 0 {
|
||||
if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) {
|
||||
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
|
||||
return nil, errVolumeNotFound
|
||||
}
|
||||
return nil, errFileNotFound
|
||||
}
|
||||
}
|
||||
return errFileNotFound
|
||||
return buf, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if legacyJSON {
|
||||
@ -1178,10 +1183,7 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
|
||||
errs[i] = ctx.Err()
|
||||
continue
|
||||
}
|
||||
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
||||
if err := w.Run(func() error { return s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...) }); err != nil {
|
||||
errs[i] = err
|
||||
}
|
||||
errs[i] = s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...)
|
||||
diskHealthCheckOK(ctx, errs[i])
|
||||
}
|
||||
|
||||
@ -1212,7 +1214,7 @@ func (s *xlStorage) diskAlmostFilled() bool {
|
||||
return (float64(info.Free)/float64(info.Used)) < almostFilledPercent || (float64(info.FreeInodes)/float64(info.UsedInodes)) < almostFilledPercent
|
||||
}
|
||||
|
||||
func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) {
|
||||
func (s *xlStorage) moveToTrashNoDeadline(filePath string, recursive, immediatePurge bool) (err error) {
|
||||
pathUUID := mustGetUUID()
|
||||
targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID)
|
||||
|
||||
@ -1265,10 +1267,16 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) {
|
||||
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
||||
return w.Run(func() (err error) {
|
||||
return s.moveToTrashNoDeadline(filePath, recursive, immediatePurge)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. forceDelMarker
|
||||
// will force creating a new `xl.meta` to create a new delete marker
|
||||
func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) {
|
||||
@ -2417,7 +2425,41 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteFile - delete a file at path.
|
||||
// DeleteBulk - delete many files in bulk to trash.
|
||||
// this delete does not recursively delete empty
|
||||
// parents, if you need empty parent delete support
|
||||
// please use Delete() instead. This API is meant as
|
||||
// an optimization for Multipart operations.
|
||||
func (s *xlStorage) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !skipAccessChecks(volume) {
|
||||
// Stat a volume entry.
|
||||
if err = Access(volumeDir); err != nil {
|
||||
return convertAccessError(err, errVolumeAccessDenied)
|
||||
}
|
||||
}
|
||||
|
||||
for _, fp := range paths {
|
||||
// Following code is needed so that we retain SlashSeparator suffix if any in
|
||||
// path argument.
|
||||
filePath := pathJoin(volumeDir, fp)
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = s.moveToTrash(filePath, false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete - delete a file at path.
|
||||
func (s *xlStorage) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user