diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 876b7ec49..d43532508 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -79,6 +79,10 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ return err } + s.RLock() + legacy := s.formatLegacy + s.RUnlock() + // Use a small block size to start sending quickly w := newMetacacheWriter(wr, 16<<10) w.reuseBlocks = true // We are not sharing results, so reuse buffers. @@ -114,6 +118,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ metadata, err := s.readMetadata(ctx, pathJoin(volumeDir, opts.BaseDir[:len(opts.BaseDir)-1]+globalDirSuffix, xlStorageFormatFile)) + diskHealthCheckOK(ctx, err) if err == nil { // if baseDir is already a directory object, consider it // as part of the list call, this is AWS S3 specific @@ -245,7 +250,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ return nil } // Check legacy. - if HasSuffix(entry, xlStorageFormatFileV1) { + if HasSuffix(entry, xlStorageFormatFileV1) && legacy { var meta metaCacheEntry meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, current, entry)) diskHealthCheckOK(ctx, err) @@ -331,14 +336,16 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ return err } case osIsNotExist(err), isSysErrIsDir(err): - meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFileV1)) - diskHealthCheckOK(ctx, err) - if err == nil { - // It was an object - if err := send(meta); err != nil { - return err + if legacy { + meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFileV1)) + diskHealthCheckOK(ctx, err) + if err == nil { + // It was an object + if err := send(meta); err != nil { + return err + } + continue } - continue } // NOT an object, append to stack (with slash) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index b301e0b1a..06567e1e2 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -441,7 +441,12 @@ func (s *xlStorage) readMetadataWithDMTime(ctx context.Context, itemPath string) } func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) { - buf, _, err := s.readMetadataWithDMTime(ctx, itemPath) + var buf []byte + err := xioutil.NewDeadlineWorker(diskMaxTimeout).Run(func() error { + var rerr error + buf, _, rerr = s.readMetadataWithDMTime(ctx, itemPath) + return rerr + }) return buf, err } @@ -957,16 +962,21 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis var legacyJSON bool buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { - if err != errFileNotFound { + if !errors.Is(err, errFileNotFound) { return err } metaDataPoolPut(buf) // Never used, return it - buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1)) - if err != nil { - return err + s.RLock() + legacy := s.formatLegacy + s.RUnlock() + if legacy { + buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1)) + if err != nil { + return err + } + legacyJSON = true } - legacyJSON = true } if len(buf) == 0 { @@ -1099,7 +1109,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F var legacyJSON bool buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { - if err != errFileNotFound { + if !errors.Is(err, errFileNotFound) { return err } metaDataPoolPut(buf) // Never used, return it @@ -1108,14 +1118,19 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return s.WriteMetadata(ctx, volume, path, fi) } - buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1)) - if err != nil { - if err == errFileNotFound && fi.VersionID != "" { - return errFileVersionNotFound + s.RLock() + legacy := s.formatLegacy + s.RUnlock() + if legacy { + buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1)) + if err != nil { + if errors.Is(err, errFileNotFound) && fi.VersionID != "" { + return errFileVersionNotFound + } + return err } - return err + legacyJSON = true } - legacyJSON = true } if len(buf) == 0 {