From 2e0fd2cba9fd96e75e98ce739ebc45674e89d543 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 12 Aug 2024 01:38:15 -0700 Subject: [PATCH] implement a safer completeMultipart implementation (#20227) - optimize writing part.N.meta by writing both part.N and its meta in sequence without network component. - remove part.N.meta, part.N which were partially success ful, in quorum loss situations during renamePart() - allow for strict read quorum check arbitrated via ETag for the given part number, this makes it double safer upon final commit. - return an appropriate error when read quorum is missing, instead of returning InvalidPart{}, which is non-retryable error. This kind of situation can happen when many nodes are going offline in rotation, an example of such a restart() behavior is statefulset updates in k8s. fixes #20091 --- cmd/erasure-common.go | 90 ----- cmd/erasure-metadata.go | 31 +- cmd/erasure-multipart.go | 268 +++++++++------ cmd/erasure.go | 5 + cmd/naughty-disk_test.go | 14 + cmd/storage-datatypes.go | 20 ++ cmd/storage-datatypes_gen.go | 554 ++++++++++++++++++++++++++++++ cmd/storage-datatypes_gen_test.go | 339 ++++++++++++++++++ cmd/storage-interface.go | 2 + cmd/storage-rest-client.go | 49 +++ cmd/storage-rest-common.go | 3 +- cmd/storage-rest-server.go | 36 ++ cmd/storagemetric_string.go | 8 +- cmd/xl-storage-disk-id-check.go | 24 ++ cmd/xl-storage-format-v1.go | 15 +- cmd/xl-storage-format-v1_gen.go | 120 ++++--- cmd/xl-storage.go | 171 ++++++++- internal/grid/handlers.go | 2 + internal/grid/handlers_string.go | 11 +- 19 files changed, 1487 insertions(+), 275 deletions(-) diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 869571a54..350a1aba7 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -19,13 +19,9 @@ package cmd import ( "context" - "fmt" - "io" "math/rand" "sync" "time" - - "github.com/minio/pkg/v3/sync/errgroup" ) func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { @@ -87,89 +83,3 @@ func (er erasureObjects) getLocalDisks() (newDisks []StorageAPI) { } return newDisks } - -// readMultipleFiles Reads raw data from all specified files from all disks. -func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultipleReq, readQuorum int) ([]ReadMultipleResp, error) { - resps := make([]chan ReadMultipleResp, len(disks)) - for i := range resps { - resps[i] = make(chan ReadMultipleResp, len(req.Files)) - } - g := errgroup.WithNErrs(len(disks)) - // Read files in parallel across disks. - for index := range disks { - index := index - g.Go(func() (err error) { - if disks[index] == nil { - return errDiskNotFound - } - return disks[index].ReadMultiple(ctx, req, resps[index]) - }, index) - } - - dataArray := make([]ReadMultipleResp, 0, len(req.Files)) - // Merge results. They should come in order from each. - for _, wantFile := range req.Files { - quorum := 0 - toAdd := ReadMultipleResp{ - Bucket: req.Bucket, - Prefix: req.Prefix, - File: wantFile, - } - for i := range resps { - if disks[i] == nil { - continue - } - select { - case <-ctx.Done(): - case gotFile, ok := <-resps[i]: - if !ok { - continue - } - if gotFile.Error != "" || !gotFile.Exists { - continue - } - if gotFile.File != wantFile || gotFile.Bucket != req.Bucket || gotFile.Prefix != req.Prefix { - continue - } - quorum++ - if toAdd.Modtime.After(gotFile.Modtime) || len(gotFile.Data) < len(toAdd.Data) { - // Pick latest, or largest to avoid possible truncated entries. - continue - } - toAdd = gotFile - } - } - if quorum < readQuorum { - toAdd.Exists = false - toAdd.Error = errErasureReadQuorum.Error() - toAdd.Data = nil - } - dataArray = append(dataArray, toAdd) - } - - ignoredErrs := []error{ - errFileNotFound, - errVolumeNotFound, - errFileVersionNotFound, - io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors - io.EOF, // some times we would read without locks, ignore these errors - context.DeadlineExceeded, - context.Canceled, - } - ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...) - - errs := g.Wait() - for index, err := range errs { - if err == nil { - continue - } - if !IsErr(err, ignoredErrs...) { - storageLogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", - disks[index], req.Bucket, req.Prefix, err), - disks[index].String()) - } - } - - // Return all the metadata. - return dataArray, nil -} diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 2dce8587e..7a43c8bef 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -390,8 +390,7 @@ func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Tim return findFileInfoInQuorum(ctx, metaArr, modTime, etag, quorum) } -// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently. -func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, origbucket, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) { +func writeAllMetadataWithRevert(ctx context.Context, disks []StorageAPI, origbucket, bucket, prefix string, files []FileInfo, quorum int, revert bool) ([]StorageAPI, error) { g := errgroup.WithNErrs(len(disks)) // Start writing `xl.meta` to all disks in parallel. @@ -415,9 +414,37 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, origbucket, bu mErrs := g.Wait() err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum) + if err != nil && revert { + ng := errgroup.WithNErrs(len(disks)) + for index := range disks { + if mErrs[index] != nil { + continue + } + index := index + ng.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + return disks[index].Delete(ctx, bucket, pathJoin(prefix, xlStorageFormatFile), DeleteOptions{ + Recursive: true, + }) + }, index) + } + ng.Wait() + } + return evalDisks(disks, mErrs), err } +func writeAllMetadata(ctx context.Context, disks []StorageAPI, origbucket, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) { + return writeAllMetadataWithRevert(ctx, disks, origbucket, bucket, prefix, files, quorum, true) +} + +// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently. +func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, origbucket, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) { + return writeAllMetadataWithRevert(ctx, disks, origbucket, bucket, prefix, files, quorum, false) +} + func commonParity(parities []int, defaultParityCount int) int { N := len(parities) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 293428c2d..bcfa34f78 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -80,6 +80,14 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object return fi, nil, err } + if readQuorum < 0 { + return fi, nil, errErasureReadQuorum + } + + if writeQuorum < 0 { + return fi, nil, errErasureWriteQuorum + } + quorum := readQuorum if write { quorum = writeQuorum @@ -88,14 +96,13 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object // List all online disks. _, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, quorum) - var reducedErr error if write { - reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) } else { - reducedErr = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) + err = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) } - if reducedErr != nil { - return fi, nil, reducedErr + if err != nil { + return fi, nil, err } // Pick one from the first valid metadata. @@ -490,9 +497,10 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID) // Write updated `xl.meta` to all disks. - if _, err := writeUniqueFileInfo(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { + if _, err := writeAllMetadata(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return nil, toObjectErr(err, bucket, object) } + return &NewMultipartUploadResult{ UploadID: uploadID, ChecksumAlgo: userDefined[hash.MinIOMultipartChecksum], @@ -513,7 +521,7 @@ func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object } // renamePart - renames multipart part to its relevant location under uploadID. -func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) { +func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, optsMeta []byte, writeQuorum int) ([]StorageAPI, error) { g := errgroup.WithNErrs(len(disks)) // Rename file on all underlying storage disks. @@ -523,60 +531,25 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds if disks[index] == nil { return errDiskNotFound } - return disks[index].RenameFile(ctx, srcBucket, srcEntry, dstBucket, dstEntry) + return disks[index].RenamePart(ctx, srcBucket, srcEntry, dstBucket, dstEntry, optsMeta) }, index) } // Wait for all renames to finish. errs := g.Wait() - // Do not need to undo partial successful operation since those will be cleaned up - // in 24hrs via multipart cleaner, never rename() back to `.minio.sys/tmp` as there - // is no way to clean them. - - // We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum - // otherwise return failure. Cleanup successful renames. - return evalDisks(disks, errs), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) -} - -// writeAllDisks - writes 'b' to all provided disks. -// If write cannot reach quorum, the files will be deleted from all disks. -func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry string, b []byte, writeQuorum int) ([]StorageAPI, error) { - g := errgroup.WithNErrs(len(disks)) - - // Write file to all underlying storage disks. - for index := range disks { - index := index - g.Go(func() error { - if disks[index] == nil { - return errDiskNotFound - } - return disks[index].WriteAll(ctx, dstBucket, dstEntry, b) - }, index) + paths := []string{ + dstEntry, + dstEntry + ".meta", } - // Wait for all renames to finish. - errs := g.Wait() - - // We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum - // otherwise return failure. Cleanup successful renames. err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) - if errors.Is(err, errErasureWriteQuorum) { - // Remove all written - g := errgroup.WithNErrs(len(disks)) - for index := range disks { - if disks[index] == nil || errs[index] != nil { - continue - } - index := index - g.Go(func() error { - return disks[index].Delete(ctx, dstBucket, dstEntry, DeleteOptions{Immediate: true}) - }, index) - } - // Ignore these errors. - g.WaitErr() + if err != nil { + er.cleanupMultipartPath(ctx, paths...) } + // We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum + // otherwise return failure. Cleanup successful renames. return evalDisks(disks, errs), err } @@ -732,19 +705,6 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Rename temporary part file to its final location. partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix) - onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) - if err != nil { - if errors.Is(err, errFileNotFound) { - // An in-quorum errFileNotFound means that client stream - // prematurely closed and we do not find any xl.meta or - // part.1's - in such a scenario we must return as if client - // disconnected. This means that erasure.Encode() CreateFile() - // did not do anything. - return pi, IncompleteBody{Bucket: bucket, Object: object} - } - - return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) - } md5hex := r.MD5CurrentHexString() if opts.PreserveETag != "" { @@ -766,15 +726,22 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo Checksums: r.ContentCRC(), } - fi.Parts = []ObjectPartInfo{partInfo} - partFI, err := fi.MarshalMsg(nil) + partFI, err := partInfo.MarshalMsg(nil) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - // Write part metadata to all disks. - onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", partFI, writeQuorum) + onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum) if err != nil { + if errors.Is(err, errFileNotFound) { + // An in-quorum errFileNotFound means that client stream + // prematurely closed and we do not find any xl.meta or + // part.1's - in such a scenario we must return as if client + // disconnected. This means that erasure.Encode() CreateFile() + // did not do anything. + return pi, IncompleteBody{Bucket: bucket, Object: object} + } + return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } @@ -917,7 +884,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up g := errgroup.WithNErrs(len(req.Files)).WithConcurrency(32) - partsInfo := make([]ObjectPartInfo, len(req.Files)) + partsInfo := make([]*ObjectPartInfo, len(req.Files)) for i, file := range req.Files { file := file partN := i + start @@ -929,21 +896,17 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return err } - var pfi FileInfo - _, err = pfi.UnmarshalMsg(buf) + pinfo := &ObjectPartInfo{} + _, err = pinfo.UnmarshalMsg(buf) if err != nil { return err } - if len(pfi.Parts) != 1 { - return errors.New("invalid number of parts expected 1, got 0") + if partN != pinfo.Number { + return fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partN, partN, pinfo.Number) } - if partN != pfi.Parts[0].Number { - return fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partN, partN, pfi.Parts[0].Number) - } - - partsInfo[i] = pfi.Parts[0] + partsInfo[i] = pinfo return nil }, i) } @@ -951,7 +914,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up g.Wait() for _, part := range partsInfo { - if part.Number != 0 && !part.ModTime.IsZero() { + if part != nil && part.Number != 0 && !part.ModTime.IsZero() { fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums) } } @@ -987,6 +950,106 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, nil } +func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaPaths []string, partNumbers []int, readQuorum int) ([]*ObjectPartInfo, error) { + g := errgroup.WithNErrs(len(disks)) + + objectPartInfos := make([][]*ObjectPartInfo, len(disks)) + // Rename file on all underlying storage disks. + for index := range disks { + index := index + g.Go(func() (err error) { + if disks[index] == nil { + return errDiskNotFound + } + objectPartInfos[index], err = disks[index].ReadParts(ctx, bucket, partMetaPaths...) + return err + }, index) + } + + if err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum); err != nil { + return nil, err + } + + partInfosInQuorum := make([]*ObjectPartInfo, len(partMetaPaths)) + partMetaQuorumMap := make(map[string]int, len(partNumbers)) + for pidx := range partMetaPaths { + var pinfos []*ObjectPartInfo + for idx := range disks { + if len(objectPartInfos[idx]) == 0 { + partMetaQuorumMap[partMetaPaths[pidx]]++ + continue + } + + pinfo := objectPartInfos[idx][pidx] + if pinfo == nil { + partMetaQuorumMap[partMetaPaths[pidx]]++ + continue + } + + if pinfo.ETag == "" { + partMetaQuorumMap[partMetaPaths[pidx]]++ + } else { + pinfos = append(pinfos, pinfo) + partMetaQuorumMap[pinfo.ETag]++ + } + } + + var maxQuorum int + var maxETag string + var maxPartMeta string + for etag, quorum := range partMetaQuorumMap { + if maxQuorum < quorum { + maxQuorum = quorum + maxETag = etag + maxPartMeta = etag + } + } + + var pinfo *ObjectPartInfo + for _, pinfo = range pinfos { + if pinfo != nil && maxETag != "" && pinfo.ETag == maxETag { + break + } + if maxPartMeta != "" && path.Base(maxPartMeta) == fmt.Sprintf("part.%d.meta", pinfo.Number) { + break + } + } + + if pinfo != nil && pinfo.ETag != "" && partMetaQuorumMap[maxETag] >= readQuorum { + partInfosInQuorum[pidx] = pinfo + continue + } + + if partMetaQuorumMap[maxPartMeta] == len(disks) { + if pinfo != nil && pinfo.Error != "" { + partInfosInQuorum[pidx] = &ObjectPartInfo{Error: pinfo.Error} + } else { + partInfosInQuorum[pidx] = &ObjectPartInfo{ + Error: InvalidPart{ + PartNumber: partNumbers[pidx], + }.Error(), + } + } + } else { + partInfosInQuorum[pidx] = &ObjectPartInfo{Error: errErasureReadQuorum.Error()} + } + } + return partInfosInQuorum, nil +} + +func errStrToPartErr(errStr string) error { + if strings.Contains(errStr, "file not found") { + return InvalidPart{} + } + if strings.Contains(errStr, "Specified part could not be found") { + return InvalidPart{} + } + if strings.Contains(errStr, errErasureReadQuorum.Error()) { + return errErasureReadQuorum + } + return errors.New(errStr) +} + // CompleteMultipartUpload - completes an ongoing multipart // transaction after receiving all the parts indicated by the client. // Returns an md5sum calculated by concatenating all the individual @@ -1040,24 +1103,22 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) onlineDisks := er.getDisks() writeQuorum := fi.WriteQuorum(er.defaultWQuorum()) + readQuorum := fi.ReadQuorum(er.defaultRQuorum()) // Read Part info for all parts partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" - req := ReadMultipleReq{ - Bucket: minioMetaMultipartBucket, - Prefix: partPath, - MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. - Files: make([]string, 0, len(parts)), - AbortOn404: true, - MetadataOnly: true, + partMetaPaths := make([]string, len(parts)) + partNumbers := make([]int, len(parts)) + for idx, part := range parts { + partMetaPaths[idx] = pathJoin(partPath, fmt.Sprintf("part.%d.meta", part.PartNumber)) + partNumbers[idx] = part.PartNumber } - for _, part := range parts { - req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber)) - } - partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum) + + partInfoFiles, err := readParts(ctx, onlineDisks, minioMetaMultipartBucket, partMetaPaths, partNumbers, readQuorum) if err != nil { return oi, err } + if len(partInfoFiles) != len(parts) { // Should only happen through internal error err := fmt.Errorf("unexpected part result count: %d, want %d", len(partInfoFiles), len(parts)) @@ -1119,35 +1180,22 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str opts.EncryptFn = metadataEncrypter(key) } - for i, part := range partInfoFiles { - partID := parts[i].PartNumber - if part.Error != "" || !part.Exists { - return oi, InvalidPart{ - PartNumber: partID, - } - } - - var pfi FileInfo - _, err := pfi.UnmarshalMsg(part.Data) - if err != nil { - // Maybe crash or similar. + for idx, part := range partInfoFiles { + if part.Error != "" { + err = errStrToPartErr(part.Error) bugLogIf(ctx, err) - return oi, InvalidPart{ - PartNumber: partID, - } + return oi, err } - partI := pfi.Parts[0] - partNumber := partI.Number - if partID != partNumber { - internalLogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partID, partID, partI.Number)) + if parts[idx].PartNumber != part.Number { + internalLogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", parts[idx].PartNumber, parts[idx].PartNumber, part.Number)) return oi, InvalidPart{ - PartNumber: partID, + PartNumber: part.Number, } } // Add the current part. - fi.AddObjectPart(partI.Number, partI.ETag, partI.Size, partI.ActualSize, partI.ModTime, partI.Index, partI.Checksums) + fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums) } // Calculate full object size. diff --git a/cmd/erasure.go b/cmd/erasure.go index cc851d625..e78ce6ef4 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -90,6 +90,11 @@ func (er erasureObjects) defaultWQuorum() int { return dataCount } +// defaultRQuorum read quorum based on setDriveCount and defaultParityCount +func (er erasureObjects) defaultRQuorum() int { + return er.setDriveCount - er.defaultParityCount +} + func diskErrToDriveState(err error) (state string) { switch { case errors.Is(err, errDiskNotFound) || errors.Is(err, context.DeadlineExceeded): diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 91173421e..ed809e94c 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -208,6 +208,20 @@ func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string, return d.disk.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts) } +func (d *naughtyDisk) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) error { + if err := d.calcError(); err != nil { + return err + } + return d.disk.RenamePart(ctx, srcVolume, srcPath, dstVolume, dstPath, meta) +} + +func (d *naughtyDisk) ReadParts(ctx context.Context, bucket string, partMetaPaths ...string) ([]*ObjectPartInfo, error) { + if err := d.calcError(); err != nil { + return nil, err + } + return d.disk.ReadParts(ctx, bucket, partMetaPaths...) +} + func (d *naughtyDisk) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error { if err := d.calcError(); err != nil { return err diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index f1eec9d97..30f6576be 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -494,6 +494,16 @@ type RenameFileHandlerParams struct { DstFilePath string `msg:"dp"` } +// RenamePartHandlerParams are parameters for RenamePartHandler. +type RenamePartHandlerParams struct { + DiskID string `msg:"id"` + SrcVolume string `msg:"sv"` + SrcFilePath string `msg:"sp"` + DstVolume string `msg:"dv"` + DstFilePath string `msg:"dp"` + Meta []byte `msg:"m"` +} + // ReadAllHandlerParams are parameters for ReadAllHandler. type ReadAllHandlerParams struct { DiskID string `msg:"id"` @@ -547,6 +557,16 @@ type ListDirResult struct { Entries []string `msg:"e"` } +// ReadPartsReq - send multiple part paths to read from +type ReadPartsReq struct { + Paths []string `msg:"p"` +} + +// ReadPartsResp - is the response for ReadPartsReq +type ReadPartsResp struct { + Infos []*ObjectPartInfo `msg:"is"` +} + // DeleteBulkReq - send multiple paths in same delete request. type DeleteBulkReq struct { Paths []string `msg:"p"` diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index a6c755652..34db15bcc 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -4830,6 +4830,332 @@ func (z *ReadMultipleResp) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *ReadPartsReq) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "p": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Paths") + return + } + if cap(z.Paths) >= int(zb0002) { + z.Paths = (z.Paths)[:zb0002] + } else { + z.Paths = make([]string, zb0002) + } + for za0001 := range z.Paths { + z.Paths[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Paths", za0001) + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReadPartsReq) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "p" + err = en.Append(0x81, 0xa1, 0x70) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Paths))) + if err != nil { + err = msgp.WrapError(err, "Paths") + return + } + for za0001 := range z.Paths { + err = en.WriteString(z.Paths[za0001]) + if err != nil { + err = msgp.WrapError(err, "Paths", za0001) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReadPartsReq) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "p" + o = append(o, 0x81, 0xa1, 0x70) + o = msgp.AppendArrayHeader(o, uint32(len(z.Paths))) + for za0001 := range z.Paths { + o = msgp.AppendString(o, z.Paths[za0001]) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReadPartsReq) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "p": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Paths") + return + } + if cap(z.Paths) >= int(zb0002) { + z.Paths = (z.Paths)[:zb0002] + } else { + z.Paths = make([]string, zb0002) + } + for za0001 := range z.Paths { + z.Paths[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Paths", za0001) + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReadPartsReq) Msgsize() (s int) { + s = 1 + 2 + msgp.ArrayHeaderSize + for za0001 := range z.Paths { + s += msgp.StringPrefixSize + len(z.Paths[za0001]) + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *ReadPartsResp) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "is": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Infos") + return + } + if cap(z.Infos) >= int(zb0002) { + z.Infos = (z.Infos)[:zb0002] + } else { + z.Infos = make([]*ObjectPartInfo, zb0002) + } + for za0001 := range z.Infos { + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "Infos", za0001) + return + } + z.Infos[za0001] = nil + } else { + if z.Infos[za0001] == nil { + z.Infos[za0001] = new(ObjectPartInfo) + } + err = z.Infos[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Infos", za0001) + return + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReadPartsResp) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "is" + err = en.Append(0x81, 0xa2, 0x69, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Infos))) + if err != nil { + err = msgp.WrapError(err, "Infos") + return + } + for za0001 := range z.Infos { + if z.Infos[za0001] == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = z.Infos[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Infos", za0001) + return + } + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReadPartsResp) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "is" + o = append(o, 0x81, 0xa2, 0x69, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Infos))) + for za0001 := range z.Infos { + if z.Infos[za0001] == nil { + o = msgp.AppendNil(o) + } else { + o, err = z.Infos[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Infos", za0001) + return + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReadPartsResp) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "is": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Infos") + return + } + if cap(z.Infos) >= int(zb0002) { + z.Infos = (z.Infos)[:zb0002] + } else { + z.Infos = make([]*ObjectPartInfo, zb0002) + } + for za0001 := range z.Infos { + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.Infos[za0001] = nil + } else { + if z.Infos[za0001] == nil { + z.Infos[za0001] = new(ObjectPartInfo) + } + bts, err = z.Infos[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Infos", za0001) + return + } + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReadPartsResp) Msgsize() (s int) { + s = 1 + 3 + msgp.ArrayHeaderSize + for za0001 := range z.Infos { + if z.Infos[za0001] == nil { + s += msgp.NilSize + } else { + s += z.Infos[za0001].Msgsize() + } + } + return +} + // DecodeMsg implements msgp.Decodable func (z *RenameDataHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte @@ -5757,6 +6083,234 @@ func (z *RenameOptions) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *RenamePartHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "id": + z.DiskID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "DiskID") + return + } + case "sv": + z.SrcVolume, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "SrcVolume") + return + } + case "sp": + z.SrcFilePath, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "SrcFilePath") + return + } + case "dv": + z.DstVolume, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "DstVolume") + return + } + case "dp": + z.DstFilePath, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "DstFilePath") + return + } + case "m": + z.Meta, err = dc.ReadBytes(z.Meta) + if err != nil { + err = msgp.WrapError(err, "Meta") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *RenamePartHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "id" + err = en.Append(0x86, 0xa2, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteString(z.DiskID) + if err != nil { + err = msgp.WrapError(err, "DiskID") + return + } + // write "sv" + err = en.Append(0xa2, 0x73, 0x76) + if err != nil { + return + } + err = en.WriteString(z.SrcVolume) + if err != nil { + err = msgp.WrapError(err, "SrcVolume") + return + } + // write "sp" + err = en.Append(0xa2, 0x73, 0x70) + if err != nil { + return + } + err = en.WriteString(z.SrcFilePath) + if err != nil { + err = msgp.WrapError(err, "SrcFilePath") + return + } + // write "dv" + err = en.Append(0xa2, 0x64, 0x76) + if err != nil { + return + } + err = en.WriteString(z.DstVolume) + if err != nil { + err = msgp.WrapError(err, "DstVolume") + return + } + // write "dp" + err = en.Append(0xa2, 0x64, 0x70) + if err != nil { + return + } + err = en.WriteString(z.DstFilePath) + if err != nil { + err = msgp.WrapError(err, "DstFilePath") + return + } + // write "m" + err = en.Append(0xa1, 0x6d) + if err != nil { + return + } + err = en.WriteBytes(z.Meta) + if err != nil { + err = msgp.WrapError(err, "Meta") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *RenamePartHandlerParams) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 6 + // string "id" + o = append(o, 0x86, 0xa2, 0x69, 0x64) + o = msgp.AppendString(o, z.DiskID) + // string "sv" + o = append(o, 0xa2, 0x73, 0x76) + o = msgp.AppendString(o, z.SrcVolume) + // string "sp" + o = append(o, 0xa2, 0x73, 0x70) + o = msgp.AppendString(o, z.SrcFilePath) + // string "dv" + o = append(o, 0xa2, 0x64, 0x76) + o = msgp.AppendString(o, z.DstVolume) + // string "dp" + o = append(o, 0xa2, 0x64, 0x70) + o = msgp.AppendString(o, z.DstFilePath) + // string "m" + o = append(o, 0xa1, 0x6d) + o = msgp.AppendBytes(o, z.Meta) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RenamePartHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "id": + z.DiskID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DiskID") + return + } + case "sv": + z.SrcVolume, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SrcVolume") + return + } + case "sp": + z.SrcFilePath, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SrcFilePath") + return + } + case "dv": + z.DstVolume, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DstVolume") + return + } + case "dp": + z.DstFilePath, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DstFilePath") + return + } + case "m": + z.Meta, bts, err = msgp.ReadBytesBytes(bts, z.Meta) + if err != nil { + err = msgp.WrapError(err, "Meta") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *RenamePartHandlerParams) Msgsize() (s int) { + s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 3 + msgp.StringPrefixSize + len(z.SrcVolume) + 3 + msgp.StringPrefixSize + len(z.SrcFilePath) + 3 + msgp.StringPrefixSize + len(z.DstVolume) + 3 + msgp.StringPrefixSize + len(z.DstFilePath) + 2 + msgp.BytesPrefixSize + len(z.Meta) + return +} + // DecodeMsg implements msgp.Decodable func (z *UpdateMetadataOpts) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go index 09c795e48..ffc09b53f 100644 --- a/cmd/storage-datatypes_gen_test.go +++ b/cmd/storage-datatypes_gen_test.go @@ -2382,6 +2382,232 @@ func BenchmarkDecodeReadMultipleResp(b *testing.B) { } } +func TestMarshalUnmarshalReadPartsReq(t *testing.T) { + v := ReadPartsReq{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReadPartsReq(b *testing.B) { + v := ReadPartsReq{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReadPartsReq(b *testing.B) { + v := ReadPartsReq{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReadPartsReq(b *testing.B) { + v := ReadPartsReq{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReadPartsReq(t *testing.T) { + v := ReadPartsReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReadPartsReq Msgsize() is inaccurate") + } + + vn := ReadPartsReq{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReadPartsReq(b *testing.B) { + v := ReadPartsReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReadPartsReq(b *testing.B) { + v := ReadPartsReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalReadPartsResp(t *testing.T) { + v := ReadPartsResp{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReadPartsResp(b *testing.B) { + v := ReadPartsResp{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReadPartsResp(b *testing.B) { + v := ReadPartsResp{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReadPartsResp(b *testing.B) { + v := ReadPartsResp{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReadPartsResp(t *testing.T) { + v := ReadPartsResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReadPartsResp Msgsize() is inaccurate") + } + + vn := ReadPartsResp{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReadPartsResp(b *testing.B) { + v := ReadPartsResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReadPartsResp(b *testing.B) { + v := ReadPartsResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalRenameDataHandlerParams(t *testing.T) { v := RenameDataHandlerParams{} bts, err := v.MarshalMsg(nil) @@ -2947,6 +3173,119 @@ func BenchmarkDecodeRenameOptions(b *testing.B) { } } +func TestMarshalUnmarshalRenamePartHandlerParams(t *testing.T) { + v := RenamePartHandlerParams{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgRenamePartHandlerParams(b *testing.B) { + v := RenamePartHandlerParams{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgRenamePartHandlerParams(b *testing.B) { + v := RenamePartHandlerParams{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalRenamePartHandlerParams(b *testing.B) { + v := RenamePartHandlerParams{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRenamePartHandlerParams(t *testing.T) { + v := RenamePartHandlerParams{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeRenamePartHandlerParams Msgsize() is inaccurate") + } + + vn := RenamePartHandlerParams{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRenamePartHandlerParams(b *testing.B) { + v := RenamePartHandlerParams{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRenamePartHandlerParams(b *testing.B) { + v := RenamePartHandlerParams{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalUpdateMetadataOpts(t *testing.T) { v := UpdateMetadataOpts{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 1a98c548b..13400cfc8 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -95,10 +95,12 @@ type StorageAPI interface { CreateFile(ctx context.Context, origvolume, olume, path string, size int64, reader io.Reader) error ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error + RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) error CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (*CheckPartsResp, error) Delete(ctx context.Context, volume string, path string, opts DeleteOptions) (err error) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (*CheckPartsResp, error) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) + ReadParts(ctx context.Context, bucket string, partMetaPaths ...string) ([]*ObjectPartInfo, error) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error CleanAbandonedData(ctx context.Context, volume string, path string) error diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 3f7e63e44..cc0a60485 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -757,6 +757,55 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri return errs } +// RenamePart - renames multipart part file +func (client *storageRESTClient) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) (err error) { + ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + + _, err = storageRenamePartRPC.Call(ctx, client.gridConn, &RenamePartHandlerParams{ + DiskID: *client.diskID.Load(), + SrcVolume: srcVolume, + SrcFilePath: srcPath, + DstVolume: dstVolume, + DstFilePath: dstPath, + Meta: meta, + }) + return toStorageErr(err) +} + +// ReadParts - reads various part.N.meta paths from a drive remotely and returns object part info for each of those part.N.meta if found +func (client *storageRESTClient) ReadParts(ctx context.Context, volume string, partMetaPaths ...string) ([]*ObjectPartInfo, error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + + rp := &ReadPartsReq{Paths: partMetaPaths} + buf, err := rp.MarshalMsg(nil) + if err != nil { + return nil, err + } + + respBody, err := client.call(ctx, storageRESTMethodReadParts, values, bytes.NewReader(buf), -1) + defer xhttp.DrainBody(respBody) + if err != nil { + return nil, err + } + + respReader, err := waitForHTTPResponse(respBody) + if err != nil { + return nil, toStorageErr(err) + } + + rd := msgpNewReader(respReader) + defer readMsgpReaderPoolPut(rd) + + readPartsResp := &ReadPartsResp{} + if err = readPartsResp.DecodeMsg(rd); err != nil { + return nil, toStorageErr(err) + } + + return readPartsResp.Infos, nil +} + // RenameFile - renames a file. func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 25401ce94..361045de2 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -20,7 +20,7 @@ package cmd //go:generate msgp -file $GOFILE -unexported const ( - storageRESTVersion = "v62" // Introduce DeleteBulk internode API. + storageRESTVersion = "v63" // Introduce RenamePart and ReadParts API storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -44,6 +44,7 @@ const ( storageRESTMethodReadMultiple = "/rmpl" storageRESTMethodCleanAbandoned = "/cln" storageRESTMethodDeleteBulk = "/dblk" + storageRESTMethodReadParts = "/rps" ) const ( diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index a4727d2ee..923651d30 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -68,6 +68,7 @@ var ( storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData2, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} }) storageRenameDataInlineRPC = grid.NewSingleHandler[*RenameDataInlineHandlerParams, *RenameDataResp](grid.HandlerRenameDataInline, newRenameDataInlineHandlerParams, func() *RenameDataResp { return &RenameDataResp{} }).AllowCallRequestPool(false) storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true) + storageRenamePartRPC = grid.NewSingleHandler[*RenamePartHandlerParams, grid.NoPayload](grid.HandlerRenamePart, func() *RenamePartHandlerParams { return &RenamePartHandlerParams{} }, grid.NewNoPayload) storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} }) storageUpdateMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerUpdateMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) storageWriteMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerWriteMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) @@ -525,6 +526,31 @@ func (s *storageRESTServer) ReadXLHandlerWS(params *grid.MSS) (*RawFileInfo, *gr return &rf, nil } +// ReadPartsHandler - read section of a file. +func (s *storageRESTServer) ReadPartsHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + volume := r.Form.Get(storageRESTVolume) + + var preq ReadPartsReq + if err := msgp.Decode(r.Body, &preq); err != nil { + s.writeErrorResponse(w, err) + return + } + + done := keepHTTPResponseAlive(w) + infos, err := s.getStorage().ReadParts(r.Context(), volume, preq.Paths...) + done(nil) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + presp := &ReadPartsResp{Infos: infos} + storageLogIf(r.Context(), msgp.Encode(w, presp)) +} + // ReadFileHandler - read section of a file. func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -692,6 +718,14 @@ func (s *storageRESTServer) RenameFileHandler(p *RenameFileHandlerParams) (grid. return grid.NewNPErr(s.getStorage().RenameFile(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath)) } +// RenamePartHandler - rename a multipart part from source to destination +func (s *storageRESTServer) RenamePartHandler(p *RenamePartHandlerParams) (grid.NoPayload, *grid.RemoteErr) { + if !s.checkID(p.DiskID) { + return grid.NewNPErr(errDiskNotFound) + } + return grid.NewNPErr(s.getStorage().RenamePart(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath, p.Meta)) +} + // CleanAbandonedDataHandler - Clean unused data directories. func (s *storageRESTServer) CleanAbandonedDataHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1333,6 +1367,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteBulk).HandlerFunc(h(server.DeleteBulkHandler)) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadParts).HandlerFunc(h(server.ReadPartsHandler)) subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler)) subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler)) @@ -1343,6 +1378,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin logger.FatalIf(storageReadAllRPC.Register(gm, server.ReadAllHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameFileRPC.Register(gm, server.RenameFileHandler, endpoint.Path), "unable to register handler") + logger.FatalIf(storageRenamePartRPC.Register(gm, server.RenamePartHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameDataRPC.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameDataInlineRPC.Register(gm, server.RenameDataInlineHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageDeleteFileRPC.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler") diff --git a/cmd/storagemetric_string.go b/cmd/storagemetric_string.go index cb0d02032..794781329 100644 --- a/cmd/storagemetric_string.go +++ b/cmd/storagemetric_string.go @@ -37,12 +37,14 @@ func _() { _ = x[storageMetricDeleteAbandonedParts-26] _ = x[storageMetricDiskInfo-27] _ = x[storageMetricDeleteBulk-28] - _ = x[storageMetricLast-29] + _ = x[storageMetricRenamePart-29] + _ = x[storageMetricReadParts-30] + _ = x[storageMetricLast-31] } -const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoDeleteBulkLast" +const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoDeleteBulkRenamePartReadPartsLast" -var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 292, 296} +var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 292, 302, 311, 315} func (i storageMetric) String() string { if i >= storageMetric(len(_storageMetric_index)-1) { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 19c624fb0..02257e47f 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "math/rand" + "path" "runtime" "strconv" "strings" @@ -71,6 +72,8 @@ const ( storageMetricDeleteAbandonedParts storageMetricDiskInfo storageMetricDeleteBulk + storageMetricRenamePart + storageMetricReadParts // .... add more @@ -453,6 +456,17 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path }) } +func (p *xlStorageDiskIDCheck) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) (err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenamePart, srcVolume, srcPath, dstVolume, dstPath) + if err != nil { + return err + } + defer done(0, &err) + + w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) + return w.Run(func() error { return p.storage.RenamePart(ctx, srcVolume, srcPath, dstVolume, dstPath, meta) }) +} + func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenameFile, srcVolume, srcPath, dstVolume, dstPath) if err != nil { @@ -699,6 +713,16 @@ func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path st return p.storage.StatInfoFile(ctx, volume, path, glob) } +func (p *xlStorageDiskIDCheck) ReadParts(ctx context.Context, volume string, partMetaPaths ...string) ([]*ObjectPartInfo, error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadParts, volume, path.Dir(partMetaPaths[0])) + if err != nil { + return nil, err + } + defer done(0, &err) + + return p.storage.ReadParts(ctx, volume, partMetaPaths...) +} + // ReadMultiple will read multiple files and send each files as response. // Files are read and returned in the given order. // The resp channel is closed before the call returns. diff --git a/cmd/xl-storage-format-v1.go b/cmd/xl-storage-format-v1.go index 4d9da5565..77690423e 100644 --- a/cmd/xl-storage-format-v1.go +++ b/cmd/xl-storage-format-v1.go @@ -159,13 +159,14 @@ const ( // ObjectPartInfo Info of each part kept in the multipart metadata // file after CompleteMultipartUpload() is called. type ObjectPartInfo struct { - ETag string `json:"etag,omitempty"` - Number int `json:"number"` - Size int64 `json:"size"` // Size of the part on the disk. - ActualSize int64 `json:"actualSize"` // Original size of the part without compression or encryption bytes. - ModTime time.Time `json:"modTime"` // Date and time at which the part was uploaded. - Index []byte `json:"index,omitempty" msg:"index,omitempty"` - Checksums map[string]string `json:"crc,omitempty" msg:"crc,omitempty"` // Content Checksums + ETag string `json:"etag,omitempty" msg:"e"` + Number int `json:"number" msg:"n"` + Size int64 `json:"size" msg:"s"` // Size of the part on the disk. + ActualSize int64 `json:"actualSize" msg:"as"` // Original size of the part without compression or encryption bytes. + ModTime time.Time `json:"modTime" msg:"mt"` // Date and time at which the part was uploaded. + Index []byte `json:"index,omitempty" msg:"i,omitempty"` + Checksums map[string]string `json:"crc,omitempty" msg:"crc,omitempty"` // Content Checksums + Error string `json:"error,omitempty" msg:"err,omitempty"` // only set while reading part meta from drive. } // ChecksumInfo - carries checksums of individual scattered parts per disk. diff --git a/cmd/xl-storage-format-v1_gen.go b/cmd/xl-storage-format-v1_gen.go index 444db638b..2c84ef661 100644 --- a/cmd/xl-storage-format-v1_gen.go +++ b/cmd/xl-storage-format-v1_gen.go @@ -569,37 +569,37 @@ func (z *ObjectPartInfo) DecodeMsg(dc *msgp.Reader) (err error) { return } switch msgp.UnsafeString(field) { - case "ETag": + case "e": z.ETag, err = dc.ReadString() if err != nil { err = msgp.WrapError(err, "ETag") return } - case "Number": + case "n": z.Number, err = dc.ReadInt() if err != nil { err = msgp.WrapError(err, "Number") return } - case "Size": + case "s": z.Size, err = dc.ReadInt64() if err != nil { err = msgp.WrapError(err, "Size") return } - case "ActualSize": + case "as": z.ActualSize, err = dc.ReadInt64() if err != nil { err = msgp.WrapError(err, "ActualSize") return } - case "ModTime": + case "mt": z.ModTime, err = dc.ReadTime() if err != nil { err = msgp.WrapError(err, "ModTime") return } - case "index": + case "i": z.Index, err = dc.ReadBytes(z.Index) if err != nil { err = msgp.WrapError(err, "Index") @@ -635,6 +635,12 @@ func (z *ObjectPartInfo) DecodeMsg(dc *msgp.Reader) (err error) { } z.Checksums[za0001] = za0002 } + case "err": + z.Error, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Error") + return + } default: err = dc.Skip() if err != nil { @@ -649,8 +655,8 @@ func (z *ObjectPartInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { // check for omitted fields - zb0001Len := uint32(7) - var zb0001Mask uint8 /* 7 bits */ + zb0001Len := uint32(8) + var zb0001Mask uint8 /* 8 bits */ _ = zb0001Mask if z.Index == nil { zb0001Len-- @@ -660,6 +666,10 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { zb0001Len-- zb0001Mask |= 0x40 } + if z.Error == "" { + zb0001Len-- + zb0001Mask |= 0x80 + } // variable map header, size zb0001Len err = en.Append(0x80 | uint8(zb0001Len)) if err != nil { @@ -668,8 +678,8 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { if zb0001Len == 0 { return } - // write "ETag" - err = en.Append(0xa4, 0x45, 0x54, 0x61, 0x67) + // write "e" + err = en.Append(0xa1, 0x65) if err != nil { return } @@ -678,8 +688,8 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ETag") return } - // write "Number" - err = en.Append(0xa6, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72) + // write "n" + err = en.Append(0xa1, 0x6e) if err != nil { return } @@ -688,8 +698,8 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Number") return } - // write "Size" - err = en.Append(0xa4, 0x53, 0x69, 0x7a, 0x65) + // write "s" + err = en.Append(0xa1, 0x73) if err != nil { return } @@ -698,8 +708,8 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Size") return } - // write "ActualSize" - err = en.Append(0xaa, 0x41, 0x63, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65) + // write "as" + err = en.Append(0xa2, 0x61, 0x73) if err != nil { return } @@ -708,8 +718,8 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ActualSize") return } - // write "ModTime" - err = en.Append(0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65) + // write "mt" + err = en.Append(0xa2, 0x6d, 0x74) if err != nil { return } @@ -719,8 +729,8 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { return } if (zb0001Mask & 0x20) == 0 { // if not omitted - // write "index" - err = en.Append(0xa5, 0x69, 0x6e, 0x64, 0x65, 0x78) + // write "i" + err = en.Append(0xa1, 0x69) if err != nil { return } @@ -754,6 +764,18 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { } } } + if (zb0001Mask & 0x80) == 0 { // if not omitted + // write "err" + err = en.Append(0xa3, 0x65, 0x72, 0x72) + if err != nil { + return + } + err = en.WriteString(z.Error) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + } return } @@ -761,8 +783,8 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) { func (z *ObjectPartInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) // check for omitted fields - zb0001Len := uint32(7) - var zb0001Mask uint8 /* 7 bits */ + zb0001Len := uint32(8) + var zb0001Mask uint8 /* 8 bits */ _ = zb0001Mask if z.Index == nil { zb0001Len-- @@ -772,29 +794,33 @@ func (z *ObjectPartInfo) MarshalMsg(b []byte) (o []byte, err error) { zb0001Len-- zb0001Mask |= 0x40 } + if z.Error == "" { + zb0001Len-- + zb0001Mask |= 0x80 + } // variable map header, size zb0001Len o = append(o, 0x80|uint8(zb0001Len)) if zb0001Len == 0 { return } - // string "ETag" - o = append(o, 0xa4, 0x45, 0x54, 0x61, 0x67) + // string "e" + o = append(o, 0xa1, 0x65) o = msgp.AppendString(o, z.ETag) - // string "Number" - o = append(o, 0xa6, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72) + // string "n" + o = append(o, 0xa1, 0x6e) o = msgp.AppendInt(o, z.Number) - // string "Size" - o = append(o, 0xa4, 0x53, 0x69, 0x7a, 0x65) + // string "s" + o = append(o, 0xa1, 0x73) o = msgp.AppendInt64(o, z.Size) - // string "ActualSize" - o = append(o, 0xaa, 0x41, 0x63, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65) + // string "as" + o = append(o, 0xa2, 0x61, 0x73) o = msgp.AppendInt64(o, z.ActualSize) - // string "ModTime" - o = append(o, 0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65) + // string "mt" + o = append(o, 0xa2, 0x6d, 0x74) o = msgp.AppendTime(o, z.ModTime) if (zb0001Mask & 0x20) == 0 { // if not omitted - // string "index" - o = append(o, 0xa5, 0x69, 0x6e, 0x64, 0x65, 0x78) + // string "i" + o = append(o, 0xa1, 0x69) o = msgp.AppendBytes(o, z.Index) } if (zb0001Mask & 0x40) == 0 { // if not omitted @@ -806,6 +832,11 @@ func (z *ObjectPartInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, za0002) } } + if (zb0001Mask & 0x80) == 0 { // if not omitted + // string "err" + o = append(o, 0xa3, 0x65, 0x72, 0x72) + o = msgp.AppendString(o, z.Error) + } return } @@ -827,37 +858,37 @@ func (z *ObjectPartInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { return } switch msgp.UnsafeString(field) { - case "ETag": + case "e": z.ETag, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err, "ETag") return } - case "Number": + case "n": z.Number, bts, err = msgp.ReadIntBytes(bts) if err != nil { err = msgp.WrapError(err, "Number") return } - case "Size": + case "s": z.Size, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { err = msgp.WrapError(err, "Size") return } - case "ActualSize": + case "as": z.ActualSize, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { err = msgp.WrapError(err, "ActualSize") return } - case "ModTime": + case "mt": z.ModTime, bts, err = msgp.ReadTimeBytes(bts) if err != nil { err = msgp.WrapError(err, "ModTime") return } - case "index": + case "i": z.Index, bts, err = msgp.ReadBytesBytes(bts, z.Index) if err != nil { err = msgp.WrapError(err, "Index") @@ -893,6 +924,12 @@ func (z *ObjectPartInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { } z.Checksums[za0001] = za0002 } + case "err": + z.Error, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -907,13 +944,14 @@ func (z *ObjectPartInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *ObjectPartInfo) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.ETag) + 7 + msgp.IntSize + 5 + msgp.Int64Size + 11 + msgp.Int64Size + 8 + msgp.TimeSize + 6 + msgp.BytesPrefixSize + len(z.Index) + 4 + msgp.MapHeaderSize + s = 1 + 2 + msgp.StringPrefixSize + len(z.ETag) + 2 + msgp.IntSize + 2 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.TimeSize + 2 + msgp.BytesPrefixSize + len(z.Index) + 4 + msgp.MapHeaderSize if z.Checksums != nil { for za0001, za0002 := range z.Checksums { _ = za0002 s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) } } + s += 4 + msgp.StringPrefixSize + len(z.Error) return } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 725615368..c75e94c15 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1085,13 +1085,13 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis var legacyJSON bool buf, err := xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) { - buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile)) + buf, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile)) if err != nil && !errors.Is(err, errFileNotFound) { return nil, err } if errors.Is(err, errFileNotFound) && legacy { - buf, _, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1)) + buf, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1)) if err != nil { return nil, err } @@ -1270,6 +1270,13 @@ func (s *xlStorage) moveToTrashNoDeadline(filePath string, recursive, immediateP return nil } +func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, filePath string) (buf []byte, err error) { + return xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) { + data, _, err := s.readAllDataWithDMTime(ctx, volume, volumeDir, filePath) + return data, err + }) +} + func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) { w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) return w.Run(func() (err error) { @@ -1299,7 +1306,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F } var legacyJSON bool - buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile)) + buf, err := s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile)) if err != nil { if !errors.Is(err, errFileNotFound) { return err @@ -1467,8 +1474,8 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, origvolume, volume, path // First writes for special situations do not write to stable storage. // this is currently used by // - emphemeral objects such as objects created during listObjects() calls - // - newMultipartUpload() call.. - return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, false, "") + ok := volume == minioMetaMultipartBucket // - newMultipartUpload() call must be synced to drives. + return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, ok, "") } buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) @@ -1564,7 +1571,7 @@ func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath str xlPath := pathJoin(filePath, xlStorageFormatFile) if readData { - buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, xlPath) + buf, dmTime, err = s.readAllDataWithDMTime(ctx, volume, volumeDir, xlPath) } else { buf, dmTime, err = s.readMetadataWithDMTime(ctx, xlPath) if err != nil { @@ -1584,7 +1591,7 @@ func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath str s.RUnlock() if err != nil && errors.Is(err, errFileNotFound) && legacy { - buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1)) + buf, dmTime, err = s.readAllDataWithDMTime(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1)) if err != nil { return nil, time.Time{}, err } @@ -1721,7 +1728,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, v canInline := fi.ShardFileSize(fi.Parts[0].ActualSize) <= inlineBlock if canInline { dataPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number)) - fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath) + fi.Data, err = s.readAllData(ctx, volume, volumeDir, dataPath) if err != nil { return FileInfo{}, err } @@ -1732,7 +1739,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, v return fi, nil } -func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, filePath string) (buf []byte, dmTime time.Time, err error) { +func (s *xlStorage) readAllDataWithDMTime(ctx context.Context, volume, volumeDir string, filePath string) (buf []byte, dmTime time.Time, err error) { if filePath == "" { return nil, dmTime, errFileNotFound } @@ -1827,8 +1834,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu return nil, err } - buf, _, err = s.readAllData(ctx, volume, volumeDir, filePath) - return buf, err + return s.readAllData(ctx, volume, volumeDir, filePath) } // ReadFile reads exactly len(buf) bytes into buf. It returns the @@ -2112,10 +2118,10 @@ func (s *xlStorage) CreateFile(ctx context.Context, origvolume, volume, path str } }() - return s.writeAllDirect(ctx, filePath, fileSize, r, os.O_CREATE|os.O_WRONLY|os.O_EXCL, volumeDir) + return s.writeAllDirect(ctx, filePath, fileSize, r, os.O_CREATE|os.O_WRONLY|os.O_EXCL, volumeDir, false) } -func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSize int64, r io.Reader, flags int, skipParent string) (err error) { +func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSize int64, r io.Reader, flags int, skipParent string, truncate bool) (err error) { if contextCanceled(ctx) { return ctx.Err() } @@ -2165,9 +2171,15 @@ func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSiz } if written < fileSize && fileSize >= 0 { + if truncate { + w.Truncate(0) // zero-in the file size to indicate that its unreadable + } w.Close() return errLessData } else if written > fileSize && fileSize >= 0 { + if truncate { + w.Truncate(0) // zero-in the file size to indicate that its unreadable + } w.Close() return errMoreData } @@ -2215,7 +2227,7 @@ func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b // This is an optimization mainly to ensure faster I/O. if len(b) > xioutil.DirectioAlignSize { r := bytes.NewReader(b) - return s.writeAllDirect(ctx, filePath, r.Size(), r, flags, skipParent) + return s.writeAllDirect(ctx, filePath, r.Size(), r, flags, skipParent, true) } w, err = s.openFileSync(filePath, flags, skipParent) } else { @@ -2232,6 +2244,7 @@ func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b } if n != len(b) { + w.Truncate(0) // to indicate that we did partial write. w.Close() return io.ErrShortWrite } @@ -2859,6 +2872,96 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f return res, nil } +// RenamePart - rename part path to destination path atomically. +func (s *xlStorage) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) (err error) { + srcVolumeDir, err := s.getVolDir(srcVolume) + if err != nil { + return err + } + dstVolumeDir, err := s.getVolDir(dstVolume) + if err != nil { + return err + } + if !skipAccessChecks(srcVolume) { + // Stat a volume entry. + if err = Access(srcVolumeDir); err != nil { + if osIsNotExist(err) { + return errVolumeNotFound + } else if isSysErrIO(err) { + return errFaultyDisk + } + return err + } + } + if !skipAccessChecks(dstVolume) { + if err = Access(dstVolumeDir); err != nil { + if osIsNotExist(err) { + return errVolumeNotFound + } else if isSysErrIO(err) { + return errFaultyDisk + } + return err + } + } + srcIsDir := HasSuffix(srcPath, SlashSeparator) + dstIsDir := HasSuffix(dstPath, SlashSeparator) + // Either src and dst have to be directories or files, else return error. + if !(srcIsDir && dstIsDir || !srcIsDir && !dstIsDir) { + return errFileAccessDenied + } + srcFilePath := pathutil.Join(srcVolumeDir, srcPath) + if err = checkPathLength(srcFilePath); err != nil { + return err + } + dstFilePath := pathutil.Join(dstVolumeDir, dstPath) + if err = checkPathLength(dstFilePath); err != nil { + return err + } + if srcIsDir { + // If source is a directory, we expect the destination to be non-existent but we + // we still need to allow overwriting an empty directory since it represents + // an object empty directory. + dirInfo, err := Lstat(dstFilePath) + if isSysErrIO(err) { + return errFaultyDisk + } + if err != nil { + if !osIsNotExist(err) { + return err + } + } else { + if !dirInfo.IsDir() { + return errFileAccessDenied + } + if err = Remove(dstFilePath); err != nil { + if isSysErrNotEmpty(err) || isSysErrNotDir(err) { + return errFileAccessDenied + } else if isSysErrIO(err) { + return errFaultyDisk + } + return err + } + } + } + + if err = renameAll(srcFilePath, dstFilePath, dstVolumeDir); err != nil { + if isSysErrNotEmpty(err) || isSysErrNotDir(err) { + return errFileAccessDenied + } + return osErrToFileErr(err) + } + + if err = s.WriteAll(ctx, dstVolume, dstPath+".meta", meta); err != nil { + return osErrToFileErr(err) + } + + // Remove parent dir of the source file if empty + parentDir := pathutil.Dir(srcFilePath) + s.deleteFile(srcVolumeDir, parentDir, false, false) + + return nil +} + // RenameFile - rename source path to destination path atomically. func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { srcVolumeDir, err := s.getVolDir(srcVolume) @@ -3002,6 +3105,40 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File return &resp, nil } +func (s *xlStorage) ReadParts(ctx context.Context, volume string, partMetaPaths ...string) ([]*ObjectPartInfo, error) { + volumeDir, err := s.getVolDir(volume) + if err != nil { + return nil, err + } + + parts := make([]*ObjectPartInfo, len(partMetaPaths)) + for idx, partMetaPath := range partMetaPaths { + var partNumber int + fmt.Sscanf(pathutil.Dir(partMetaPath), "part.%d.meta", &partNumber) + + if contextCanceled(ctx) { + parts[idx] = &ObjectPartInfo{Error: ctx.Err().Error(), Number: partNumber} + continue + } + data, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, partMetaPath)) + if err != nil { + parts[idx] = &ObjectPartInfo{ + Error: err.Error(), + Number: partNumber, + } + continue + } + pinfo := &ObjectPartInfo{} + if _, err = pinfo.UnmarshalMsg(data); err != nil { + parts[idx] = &ObjectPartInfo{Error: err.Error(), Number: partNumber} + continue + } + parts[idx] = pinfo + } + diskHealthCheckOK(ctx, nil) + return parts, nil +} + // ReadMultiple will read multiple files and send each back as response. // Files are read and returned in the given order. // The resp channel is closed before the call returns. @@ -3020,15 +3157,17 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp Prefix: req.Prefix, File: f, } + var data []byte var mt time.Time + fullPath := pathJoin(volumeDir, req.Prefix, f) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) if err := w.Run(func() (err error) { if req.MetadataOnly { data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) } else { - data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath) + data, mt, err = s.readAllDataWithDMTime(ctx, req.Bucket, volumeDir, fullPath) } return err }); err != nil { @@ -3131,7 +3270,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path } baseDir := pathJoin(volumeDir, path+slashSeparator) metaPath := pathutil.Join(baseDir, xlStorageFormatFile) - buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath) + buf, err := s.readAllData(ctx, volume, volumeDir, metaPath) if err != nil { return err } diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index 9569e0f38..a978639d4 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -113,6 +113,7 @@ const ( HandlerRenameDataInline HandlerRenameData2 HandlerCheckParts2 + HandlerRenamePart // Add more above here ^^^ // If all handlers are used, the type of Handler can be changed. @@ -194,6 +195,7 @@ var handlerPrefixes = [handlerLast]string{ HandlerRenameDataInline: storagePrefix, HandlerRenameData2: storagePrefix, HandlerCheckParts2: storagePrefix, + HandlerRenamePart: storagePrefix, } const ( diff --git a/internal/grid/handlers_string.go b/internal/grid/handlers_string.go index 51ed08c9c..4417e6716 100644 --- a/internal/grid/handlers_string.go +++ b/internal/grid/handlers_string.go @@ -83,14 +83,15 @@ func _() { _ = x[HandlerRenameDataInline-72] _ = x[HandlerRenameData2-73] _ = x[HandlerCheckParts2-74] - _ = x[handlerTest-75] - _ = x[handlerTest2-76] - _ = x[handlerLast-77] + _ = x[HandlerRenamePart-75] + _ = x[handlerTest-76] + _ = x[handlerTest2-77] + _ = x[handlerLast-78] } -const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlineRenameData2CheckParts2handlerTesthandlerTest2handlerLast" +const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlineRenameData2CheckParts2RenameParthandlerTesthandlerTest2handlerLast" -var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 926, 937, 949, 960} +var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 926, 936, 947, 959, 970} func (i HandlerID) String() string { if i >= HandlerID(len(_HandlerID_index)-1) {