mirror of
https://github.com/minio/minio.git
synced 2025-02-23 03:22:30 -05:00
remove overzealous check during HEAD() (#19940)
due to a historic bug in CopyObject() where an inlined object loses its metadata, the check causes an incorrect fallback verifying data-dir. CopyObject() bug was fixed in ffa91f97942 however the occurrence of this problem is historic, so the aforementioned check is stretching too much. Bonus: simplify fileInfoRaw() to read xl.json as well, also recreate buckets properly.
This commit is contained in:
parent
c91d1ec2e3
commit
7bd1d899bc
@ -576,35 +576,11 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
|
||||
}
|
||||
|
||||
func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVers, allParts bool) (FileInfo, error) {
|
||||
var xl xlMetaV2
|
||||
if err := xl.LoadOrConvert(ri.Buf); err != nil {
|
||||
return FileInfo{}, errFileCorrupt
|
||||
}
|
||||
|
||||
fi, err := xl.ToFileInfo(bucket, object, "", inclFreeVers, allParts)
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
|
||||
if !fi.IsValid() {
|
||||
return FileInfo{}, errFileCorrupt
|
||||
}
|
||||
|
||||
versionID := fi.VersionID
|
||||
if versionID == "" {
|
||||
versionID = nullVersionID
|
||||
}
|
||||
|
||||
fileInfo, err := xl.ToFileInfo(bucket, object, versionID, inclFreeVers, allParts)
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
|
||||
if readData {
|
||||
fileInfo.Data = xl.data.find(versionID)
|
||||
}
|
||||
|
||||
return fileInfo, nil
|
||||
return getFileInfo(ri.Buf, bucket, object, "", fileInfoOpts{
|
||||
Data: readData,
|
||||
InclFreeVersions: inclFreeVers,
|
||||
AllParts: allParts,
|
||||
})
|
||||
}
|
||||
|
||||
func readAllRawFileInfo(ctx context.Context, disks []StorageAPI, bucket, object string, readData bool) ([]RawFileInfo, []error) {
|
||||
@ -756,8 +732,9 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
||||
disks := er.getDisks()
|
||||
|
||||
ropts := ReadOptions{
|
||||
ReadData: readData,
|
||||
Healing: false,
|
||||
ReadData: readData,
|
||||
InclFreeVersions: opts.InclFreeVersions,
|
||||
Healing: false,
|
||||
}
|
||||
|
||||
mrfCheck := make(chan FileInfo)
|
||||
|
@ -163,7 +163,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
|
||||
for _, bucket := range healBuckets {
|
||||
_, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode})
|
||||
_, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
|
||||
Recreate: true,
|
||||
ScanMode: scanMode,
|
||||
})
|
||||
if err != nil {
|
||||
// Log bucket healing error if any, we shall retry again.
|
||||
healingLogIf(ctx, err)
|
||||
@ -262,6 +265,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
// Heal current bucket again in case if it is failed
|
||||
// in the beginning of erasure set healing
|
||||
if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
|
||||
Recreate: true,
|
||||
ScanMode: scanMode,
|
||||
}); err != nil {
|
||||
// Set this such that when we return this function
|
||||
@ -513,7 +517,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
// we let the caller retry this disk again for the
|
||||
// buckets it failed to list.
|
||||
retErr = err
|
||||
healingLogIf(ctx, err)
|
||||
healingLogIf(ctx, fmt.Errorf("listing failed with: %v on bucket: %v", err, bucket))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -259,7 +259,7 @@ func (e *metaCacheEntry) fileInfo(bucket string) (FileInfo, error) {
|
||||
}
|
||||
return e.cached.ToFileInfo(bucket, e.name, "", false, false)
|
||||
}
|
||||
return getFileInfo(e.metadata, bucket, e.name, "", false, false)
|
||||
return getFileInfo(e.metadata, bucket, e.name, "", fileInfoOpts{})
|
||||
}
|
||||
|
||||
// xlmeta returns the decoded metadata.
|
||||
|
@ -138,8 +138,12 @@ func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin
|
||||
poolErrs = append(poolErrs, reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum))
|
||||
}
|
||||
|
||||
opts.Remove = isAllBucketsNotFound(poolErrs)
|
||||
opts.Recreate = !opts.Remove
|
||||
if !opts.Recreate {
|
||||
// when there is no force recreate look for pool
|
||||
// errors to recreate the bucket on all pools.
|
||||
opts.Remove = isAllBucketsNotFound(poolErrs)
|
||||
opts.Recreate = !opts.Remove
|
||||
}
|
||||
|
||||
g = errgroup.WithNErrs(len(sys.peerClients))
|
||||
healBucketResults := make([]madmin.HealResultItem, len(sys.peerClients))
|
||||
|
@ -123,7 +123,7 @@ func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (
|
||||
g.Wait()
|
||||
}
|
||||
|
||||
// Create the quorum lost volume only if its nor makred for delete
|
||||
// Create the lost volume only if its not marked for delete
|
||||
if !opts.Remove {
|
||||
// Initialize sync waitgroup.
|
||||
g = errgroup.WithNErrs(len(localDrives))
|
||||
|
@ -520,13 +520,14 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, vo
|
||||
// Use websocket when not reading data.
|
||||
if !opts.ReadData {
|
||||
resp, err := storageReadVersionRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
|
||||
storageRESTDiskID: *client.diskID.Load(),
|
||||
storageRESTOrigVolume: origvolume,
|
||||
storageRESTVolume: volume,
|
||||
storageRESTFilePath: path,
|
||||
storageRESTVersionID: versionID,
|
||||
storageRESTReadData: strconv.FormatBool(opts.ReadData),
|
||||
storageRESTHealing: strconv.FormatBool(opts.Healing),
|
||||
storageRESTDiskID: *client.diskID.Load(),
|
||||
storageRESTOrigVolume: origvolume,
|
||||
storageRESTVolume: volume,
|
||||
storageRESTFilePath: path,
|
||||
storageRESTVersionID: versionID,
|
||||
storageRESTInclFreeVersions: strconv.FormatBool(opts.InclFreeVersions),
|
||||
storageRESTReadData: strconv.FormatBool(opts.ReadData),
|
||||
storageRESTHealing: strconv.FormatBool(opts.Healing),
|
||||
}))
|
||||
if err != nil {
|
||||
return fi, toStorageErr(err)
|
||||
@ -539,6 +540,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, vo
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTVersionID, versionID)
|
||||
values.Set(storageRESTInclFreeVersions, strconv.FormatBool(opts.InclFreeVersions))
|
||||
values.Set(storageRESTReadData, strconv.FormatBool(opts.ReadData))
|
||||
values.Set(storageRESTHealing, strconv.FormatBool(opts.Healing))
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||
// Copyright (c) 2015-2024 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
@ -20,7 +20,7 @@ package cmd
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v58" // Change VerifyFile signature
|
||||
storageRESTVersion = "v59" // Change ReadOptions inclFreeVersions
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
@ -46,29 +46,30 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
storageRESTVolume = "volume"
|
||||
storageRESTVolumes = "volumes"
|
||||
storageRESTDirPath = "dir-path"
|
||||
storageRESTFilePath = "file-path"
|
||||
storageRESTVersionID = "version-id"
|
||||
storageRESTReadData = "read-data"
|
||||
storageRESTHealing = "healing"
|
||||
storageRESTTotalVersions = "total-versions"
|
||||
storageRESTSrcVolume = "source-volume"
|
||||
storageRESTSrcPath = "source-path"
|
||||
storageRESTDstVolume = "destination-volume"
|
||||
storageRESTDstPath = "destination-path"
|
||||
storageRESTOffset = "offset"
|
||||
storageRESTLength = "length"
|
||||
storageRESTCount = "count"
|
||||
storageRESTBitrotAlgo = "bitrot-algo"
|
||||
storageRESTBitrotHash = "bitrot-hash"
|
||||
storageRESTDiskID = "disk-id"
|
||||
storageRESTForceDelete = "force-delete"
|
||||
storageRESTGlob = "glob"
|
||||
storageRESTMetrics = "metrics"
|
||||
storageRESTDriveQuorum = "drive-quorum"
|
||||
storageRESTOrigVolume = "orig-volume"
|
||||
storageRESTVolume = "volume"
|
||||
storageRESTVolumes = "volumes"
|
||||
storageRESTDirPath = "dir-path"
|
||||
storageRESTFilePath = "file-path"
|
||||
storageRESTVersionID = "version-id"
|
||||
storageRESTReadData = "read-data"
|
||||
storageRESTHealing = "healing"
|
||||
storageRESTTotalVersions = "total-versions"
|
||||
storageRESTSrcVolume = "source-volume"
|
||||
storageRESTSrcPath = "source-path"
|
||||
storageRESTDstVolume = "destination-volume"
|
||||
storageRESTDstPath = "destination-path"
|
||||
storageRESTOffset = "offset"
|
||||
storageRESTLength = "length"
|
||||
storageRESTCount = "count"
|
||||
storageRESTBitrotAlgo = "bitrot-algo"
|
||||
storageRESTBitrotHash = "bitrot-hash"
|
||||
storageRESTDiskID = "disk-id"
|
||||
storageRESTForceDelete = "force-delete"
|
||||
storageRESTGlob = "glob"
|
||||
storageRESTMetrics = "metrics"
|
||||
storageRESTDriveQuorum = "drive-quorum"
|
||||
storageRESTOrigVolume = "orig-volume"
|
||||
storageRESTInclFreeVersions = "incl-free-versions"
|
||||
)
|
||||
|
||||
type nsScannerOptions struct {
|
||||
|
@ -378,7 +378,16 @@ func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, *
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
|
||||
fi, err := s.getStorage().ReadVersion(context.Background(), origvolume, volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
|
||||
inclFreeVersions, err := strconv.ParseBool(params.Get(storageRESTInclFreeVersions))
|
||||
if err != nil {
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
|
||||
fi, err := s.getStorage().ReadVersion(context.Background(), origvolume, volume, filePath, versionID, ReadOptions{
|
||||
InclFreeVersions: inclFreeVersions,
|
||||
ReadData: readData,
|
||||
Healing: healing,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
@ -404,7 +413,18 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
fi, err := s.getStorage().ReadVersion(r.Context(), origvolume, volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
|
||||
|
||||
inclFreeVersions, err := strconv.ParseBool(r.Form.Get(storageRESTInclFreeVersions))
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
fi, err := s.getStorage().ReadVersion(r.Context(), origvolume, volume, filePath, versionID, ReadOptions{
|
||||
InclFreeVersions: inclFreeVersions,
|
||||
ReadData: readData,
|
||||
Healing: healing,
|
||||
})
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
|
@ -103,7 +103,13 @@ func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string, allParts bool
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data, allParts bool) (FileInfo, error) {
|
||||
type fileInfoOpts struct {
|
||||
InclFreeVersions bool
|
||||
Data bool
|
||||
AllParts bool
|
||||
}
|
||||
|
||||
func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, opts fileInfoOpts) (FileInfo, error) {
|
||||
var fi FileInfo
|
||||
var err error
|
||||
var inData xlMetaInlineData
|
||||
@ -111,7 +117,7 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data, allPart
|
||||
return FileInfo{}, e
|
||||
} else if buf != nil {
|
||||
inData = data
|
||||
fi, err = buf.ToFileInfo(volume, path, versionID, allParts)
|
||||
fi, err = buf.ToFileInfo(volume, path, versionID, opts.AllParts)
|
||||
if len(buf) != 0 && errors.Is(err, errFileNotFound) {
|
||||
// This special case is needed to handle len(xlMeta.versions) == 0
|
||||
return FileInfo{
|
||||
@ -140,15 +146,16 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data, allPart
|
||||
}, nil
|
||||
}
|
||||
inData = xlMeta.data
|
||||
fi, err = xlMeta.ToFileInfo(volume, path, versionID, false, allParts)
|
||||
fi, err = xlMeta.ToFileInfo(volume, path, versionID, opts.InclFreeVersions, opts.AllParts)
|
||||
}
|
||||
if !data || err != nil {
|
||||
if !opts.Data || err != nil {
|
||||
return fi, err
|
||||
}
|
||||
versionID = fi.VersionID
|
||||
if versionID == "" {
|
||||
versionID = nullVersionID
|
||||
}
|
||||
|
||||
fi.Data = inData.find(versionID)
|
||||
if len(fi.Data) == 0 {
|
||||
// PR #11758 used DataDir, preserve it
|
||||
|
@ -1612,8 +1612,9 @@ func (s *xlStorage) ReadXL(ctx context.Context, volume, path string, readData bo
|
||||
|
||||
// ReadOptions optional inputs for ReadVersion
|
||||
type ReadOptions struct {
|
||||
ReadData bool
|
||||
Healing bool
|
||||
InclFreeVersions bool
|
||||
ReadData bool
|
||||
Healing bool
|
||||
}
|
||||
|
||||
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
|
||||
@ -1657,7 +1658,11 @@ func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, v
|
||||
return fi, err
|
||||
}
|
||||
|
||||
fi, err = getFileInfo(buf, volume, path, versionID, readData, true)
|
||||
fi, err = getFileInfo(buf, volume, path, versionID, fileInfoOpts{
|
||||
Data: opts.ReadData,
|
||||
InclFreeVersions: opts.InclFreeVersions,
|
||||
AllParts: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
@ -1710,22 +1715,6 @@ func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, v
|
||||
}
|
||||
}
|
||||
|
||||
if !skipAccessChecks(volume) && !opts.Healing && fi.TransitionStatus == "" && !fi.InlineData() && len(fi.Data) == 0 && fi.DataDir != "" && fi.DataDir != emptyUUID && fi.VersionPurgeStatus().Empty() {
|
||||
// Verify if the dataDir is present or not when the data
|
||||
// is not inlined to make sure we return correct errors
|
||||
// during HeadObject().
|
||||
|
||||
// Healing must not come here and return error, since healing
|
||||
// deals with dataDirs directly, let healing fix things automatically.
|
||||
if lerr := Access(pathJoin(volumeDir, path, fi.DataDir)); lerr != nil {
|
||||
if os.IsNotExist(lerr) {
|
||||
// Data dir is missing we must return errFileCorrupted
|
||||
return FileInfo{}, errFileCorrupt
|
||||
}
|
||||
return FileInfo{}, osErrToFileErr(lerr)
|
||||
}
|
||||
}
|
||||
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
|
@ -271,7 +271,7 @@ func TestXLStorageReadVersion(t *testing.T) {
|
||||
}
|
||||
|
||||
xlMeta, _ := os.ReadFile("testdata/xl.meta")
|
||||
fi, _ := getFileInfo(xlMeta, "exists", "as-file", "", false, true)
|
||||
fi, _ := getFileInfo(xlMeta, "exists", "as-file", "", fileInfoOpts{Data: false, AllParts: true})
|
||||
|
||||
// Create files for the test cases.
|
||||
if err = xlStorage.MakeVol(context.Background(), "exists"); err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user