avoid Access() calls on known bucket paths (#17719)

This commit is contained in:
Harshavardhana 2023-07-26 11:31:40 -07:00 committed by GitHub
parent a7c71e4c6b
commit b28bcad11b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 70 deletions

View File

@ -1065,8 +1065,8 @@ func TestHealObjectCorruptedPools(t *testing.T) {
} }
for i := 0; i < (nfi.Erasure.DataBlocks + nfi.Erasure.ParityBlocks); i++ { for i := 0; i < (nfi.Erasure.DataBlocks + nfi.Erasure.ParityBlocks); i++ {
_, err = erasureDisks[i].StatInfoFile(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) stats, _ := erasureDisks[i].StatInfoFile(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
if err == nil { if len(stats) != 0 {
t.Errorf("Expected xl.meta file to be not present, but succeeeded") t.Errorf("Expected xl.meta file to be not present, but succeeeded")
} }
} }

View File

@ -23,6 +23,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"path/filepath"
"sync" "sync"
"time" "time"
@ -97,6 +98,12 @@ func bgFormatErasureCleanupTmp(diskPath string) {
err)) err))
} }
// Delete all temporary files created for DirectIO write check
files, _ := filepath.Glob(filepath.Join(diskPath, ".writable-check-*.tmp"))
for _, file := range files {
removeAll(file)
}
// Remove the entire folder in case there are leftovers that didn't get cleaned up before restart. // Remove the entire folder in case there are leftovers that didn't get cleaned up before restart.
go removeAll(pathJoin(diskPath, minioMetaTmpBucket+"-old")) go removeAll(pathJoin(diskPath, minioMetaTmpBucket+"-old"))
// Renames and schedules for purging all bucket metacache. // Renames and schedules for purging all bucket metacache.

View File

@ -90,8 +90,8 @@ func isValidVolname(volname string) bool {
// xlStorage - implements StorageAPI interface. // xlStorage - implements StorageAPI interface.
type xlStorage struct { type xlStorage struct {
diskPath string drivePath string
endpoint Endpoint endpoint Endpoint
globalSync bool globalSync bool
oDirect bool // indicates if this disk supports ODirect oDirect bool // indicates if this disk supports ODirect
@ -237,7 +237,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
} }
s = &xlStorage{ s = &xlStorage{
diskPath: path, drivePath: path,
endpoint: ep, endpoint: ep,
globalSync: globalFSOSync, globalSync: globalFSOSync,
rootDisk: rootDisk, rootDisk: rootDisk,
@ -253,10 +253,10 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
} }
if cleanUp { if cleanUp {
bgFormatErasureCleanupTmp(s.diskPath) // cleanup any old data. bgFormatErasureCleanupTmp(s.drivePath) // cleanup any old data.
} }
formatData, formatFi, err := formatErasureMigrate(s.diskPath) formatData, formatFi, err := formatErasureMigrate(s.drivePath)
if err != nil && !errors.Is(err, os.ErrNotExist) { if err != nil && !errors.Is(err, os.ErrNotExist) {
if os.IsPermission(err) { if os.IsPermission(err) {
return nil, errDiskAccessDenied return nil, errDiskAccessDenied
@ -268,19 +268,6 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
s.formatData = formatData s.formatData = formatData
s.formatFileInfo = formatFi s.formatFileInfo = formatFi
// Return an error if ODirect is not supported
// unless it is a single erasure disk mode
if err := s.checkODirectDiskSupport(); err == nil {
s.oDirect = true
} else {
// Allow if unsupported platform or single disk.
if errors.Is(err, errUnsupportedDisk) && globalIsErasureSD || !disk.ODirectPlatform {
s.oDirect = false
} else {
return s, err
}
}
if len(s.formatData) == 0 { if len(s.formatData) == 0 {
// Create all necessary bucket folders if possible. // Create all necessary bucket folders if possible.
if err = makeFormatErasureMetaVolumes(s); err != nil { if err = makeFormatErasureMetaVolumes(s); err != nil {
@ -297,14 +284,27 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
s.formatLegacy = format.Erasure.DistributionAlgo == formatErasureVersionV2DistributionAlgoV1 s.formatLegacy = format.Erasure.DistributionAlgo == formatErasureVersionV2DistributionAlgoV1
} }
// Return an error if ODirect is not supported
// unless it is a single erasure disk mode
if err := s.checkODirectDiskSupport(); err == nil {
s.oDirect = true
} else {
// Allow if unsupported platform or single disk.
if errors.Is(err, errUnsupportedDisk) && globalIsErasureSD || !disk.ODirectPlatform {
s.oDirect = false
} else {
return s, err
}
}
// Success. // Success.
return s, nil return s, nil
} }
// getDiskInfo returns given disk information. // getDiskInfo returns given disk information.
func getDiskInfo(diskPath string) (di disk.Info, err error) { func getDiskInfo(drivePath string) (di disk.Info, err error) {
if err = checkPathLength(diskPath); err == nil { if err = checkPathLength(drivePath); err == nil {
di, err = disk.GetInfo(diskPath) di, err = disk.GetInfo(drivePath)
} }
switch { switch {
case osIsNotExist(err): case osIsNotExist(err):
@ -320,7 +320,7 @@ func getDiskInfo(diskPath string) (di disk.Info, err error) {
// Implements stringer compatible interface. // Implements stringer compatible interface.
func (s *xlStorage) String() string { func (s *xlStorage) String() string {
return s.diskPath return s.drivePath
} }
func (s *xlStorage) Hostname() string { func (s *xlStorage) Hostname() string {
@ -366,7 +366,7 @@ func (s *xlStorage) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
} }
func (s *xlStorage) Healing() *healingTracker { func (s *xlStorage) Healing() *healingTracker {
healingFile := pathJoin(s.diskPath, minioMetaBucket, healingFile := pathJoin(s.drivePath, minioMetaBucket,
bucketMetaPrefix, healingTrackerFilename) bucketMetaPrefix, healingTrackerFilename)
b, err := os.ReadFile(healingFile) b, err := os.ReadFile(healingFile)
if err != nil { if err != nil {
@ -388,9 +388,7 @@ func (s *xlStorage) checkODirectDiskSupport() error {
// Check if backend is writable and supports O_DIRECT // Check if backend is writable and supports O_DIRECT
uuid := mustGetUUID() uuid := mustGetUUID()
filePath := pathJoin(s.diskPath, ".writable-check-"+uuid+".tmp") filePath := pathJoin(s.drivePath, minioMetaTmpDeletedBucket, ".writable-check-"+uuid+".tmp")
defer renameAll(filePath, pathJoin(s.diskPath, minioMetaTmpDeletedBucket, uuid))
w, err := s.openFileDirect(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL) w, err := s.openFileDirect(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL)
if err != nil { if err != nil {
return err return err
@ -447,7 +445,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
atomic.AddInt32(&s.scanning, 1) atomic.AddInt32(&s.scanning, 1)
defer atomic.AddInt32(&s.scanning, -1) defer atomic.AddInt32(&s.scanning, -1)
var err error var err error
stopFn := globalScannerMetrics.log(scannerMetricScanBucketDrive, s.diskPath, cache.Info.Name) stopFn := globalScannerMetrics.log(scannerMetricScanBucketDrive, s.drivePath, cache.Info.Name)
defer func() { defer func() {
res := make(map[string]string) res := make(map[string]string)
if err != nil { if err != nil {
@ -499,14 +497,14 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
cache.Info.updates = updates cache.Info.updates = updates
dataUsageInfo, err := scanDataFolder(ctx, disks, s.diskPath, cache, func(item scannerItem) (sizeSummary, error) { dataUsageInfo, err := scanDataFolder(ctx, disks, s.drivePath, cache, func(item scannerItem) (sizeSummary, error) {
// Look for `xl.meta/xl.json' at the leaf. // Look for `xl.meta/xl.json' at the leaf.
if !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFile) && if !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFile) &&
!strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFileV1) { !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFileV1) {
// if no xl.meta/xl.json found, skip the file. // if no xl.meta/xl.json found, skip the file.
return sizeSummary{}, errSkipFile return sizeSummary{}, errSkipFile
} }
stopFn := globalScannerMetrics.log(scannerMetricScanObject, s.diskPath, pathJoin(item.bucket, item.objectPath())) stopFn := globalScannerMetrics.log(scannerMetricScanObject, s.drivePath, pathJoin(item.bucket, item.objectPath()))
res := make(map[string]string, 8) res := make(map[string]string, 8)
defer func() { defer func() {
stopFn(res) stopFn(res)
@ -633,10 +631,10 @@ func (s *xlStorage) DiskInfo(_ context.Context) (info DiskInfo, err error) {
s.diskInfoCache.Update = func() (interface{}, error) { s.diskInfoCache.Update = func() (interface{}, error) {
dcinfo := DiskInfo{ dcinfo := DiskInfo{
RootDisk: s.rootDisk, RootDisk: s.rootDisk,
MountPath: s.diskPath, MountPath: s.drivePath,
Endpoint: s.endpoint.String(), Endpoint: s.endpoint.String(),
} }
di, err := getDiskInfo(s.diskPath) di, err := getDiskInfo(s.drivePath)
if err != nil { if err != nil {
return dcinfo, err return dcinfo, err
} }
@ -674,17 +672,17 @@ func (s *xlStorage) getVolDir(volume string) (string, error) {
if volume == "" || volume == "." || volume == ".." { if volume == "" || volume == "." || volume == ".." {
return "", errVolumeNotFound return "", errVolumeNotFound
} }
volumeDir := pathJoin(s.diskPath, volume) volumeDir := pathJoin(s.drivePath, volume)
return volumeDir, nil return volumeDir, nil
} }
func (s *xlStorage) checkFormatJSON() (os.FileInfo, error) { func (s *xlStorage) checkFormatJSON() (os.FileInfo, error) {
formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) formatFile := pathJoin(s.drivePath, minioMetaBucket, formatConfigFile)
fi, err := Lstat(formatFile) fi, err := Lstat(formatFile)
if err != nil { if err != nil {
// If the disk is still not initialized. // If the disk is still not initialized.
if osIsNotExist(err) { if osIsNotExist(err) {
if err = Access(s.diskPath); err == nil { if err = Access(s.drivePath); err == nil {
// Disk is present but missing `format.json` // Disk is present but missing `format.json`
return nil, errUnformattedDisk return nil, errUnformattedDisk
} }
@ -731,12 +729,12 @@ func (s *xlStorage) GetDiskID() (string, error) {
return diskID, nil return diskID, nil
} }
formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) formatFile := pathJoin(s.drivePath, minioMetaBucket, formatConfigFile)
b, err := os.ReadFile(formatFile) b, err := os.ReadFile(formatFile)
if err != nil { if err != nil {
// If the disk is still not initialized. // If the disk is still not initialized.
if osIsNotExist(err) { if osIsNotExist(err) {
if err = Access(s.diskPath); err == nil { if err = Access(s.drivePath); err == nil {
// Disk is present but missing `format.json` // Disk is present but missing `format.json`
return "", errUnformattedDisk return "", errUnformattedDisk
} }
@ -819,10 +817,10 @@ func (s *xlStorage) MakeVol(ctx context.Context, volume string) error {
// ListVols - list volumes. // ListVols - list volumes.
func (s *xlStorage) ListVols(ctx context.Context) (volsInfo []VolInfo, err error) { func (s *xlStorage) ListVols(ctx context.Context) (volsInfo []VolInfo, err error) {
return listVols(ctx, s.diskPath) return listVols(ctx, s.drivePath)
} }
// List all the volumes from diskPath. // List all the volumes from drivePath.
func listVols(ctx context.Context, dirPath string) ([]VolInfo, error) { func listVols(ctx context.Context, dirPath string) ([]VolInfo, error) {
if err := checkPathLength(dirPath); err != nil { if err := checkPathLength(dirPath); err != nil {
return nil, err return nil, err
@ -1062,7 +1060,7 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
func (s *xlStorage) moveToTrash(filePath string, recursive, force bool) error { func (s *xlStorage) moveToTrash(filePath string, recursive, force bool) error {
pathUUID := mustGetUUID() pathUUID := mustGetUUID()
targetPath := pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, pathUUID) targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID)
var renameFn func(source, target string) error var renameFn func(source, target string) error
if recursive { if recursive {
@ -1445,8 +1443,8 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
// Check the data path if there is a part with data. // Check the data path if there is a part with data.
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number) partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
dataPath := pathJoin(path, fi.DataDir, partPath) dataPath := pathJoin(path, fi.DataDir, partPath)
_, err = s.StatInfoFile(ctx, volume, dataPath, false) _, lerr := Lstat(pathJoin(volumeDir, dataPath))
if err != nil { if lerr != nil {
// Set the inline header, our inlined data is fine. // Set the inline header, our inlined data is fine.
fi.SetInlineData() fi.SetInlineData()
return fi, nil return fi, nil
@ -1611,9 +1609,11 @@ func (s *xlStorage) ReadFile(ctx context.Context, volume string, path string, of
var n int var n int
// Stat a volume entry. if !skipAccessChecks(volume) {
if err = Access(volumeDir); err != nil { // Stat a volume entry.
return 0, convertAccessError(err, errFileAccessDenied) if err = Access(volumeDir); err != nil {
return 0, convertAccessError(err, errFileAccessDenied)
}
} }
// Validate effective path length before reading. // Validate effective path length before reading.
@ -1994,9 +1994,11 @@ func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string,
return err return err
} }
// Stat a volume entry. if !skipAccessChecks(volume) {
if err = Access(volumeDir); err != nil { // Stat a volume entry.
return convertAccessError(err, errVolumeAccessDenied) if err = Access(volumeDir); err != nil {
return convertAccessError(err, errVolumeAccessDenied)
}
} }
filePath := pathJoin(volumeDir, path) filePath := pathJoin(volumeDir, path)
@ -2032,14 +2034,6 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
return err return err
} }
// Stat a volume entry.
if err = Access(volumeDir); err != nil {
if osIsNotExist(err) {
return errVolumeNotFound
}
return err
}
for _, part := range fi.Parts { for _, part := range fi.Parts {
partPath := pathJoin(path, fi.DataDir, fmt.Sprintf("part.%d", part.Number)) partPath := pathJoin(path, fi.DataDir, fmt.Sprintf("part.%d", part.Number))
filePath := pathJoin(volumeDir, partPath) filePath := pathJoin(volumeDir, partPath)
@ -2048,6 +2042,15 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
} }
st, err := Lstat(filePath) st, err := Lstat(filePath)
if err != nil { if err != nil {
if osIsNotExist(err) {
// Stat a volume entry.
if verr := Access(volumeDir); verr != nil {
if osIsNotExist(verr) {
return errVolumeNotFound
}
return verr
}
}
return osErrToFileErr(err) return osErrToFileErr(err)
} }
if st.Mode().IsDir() { if st.Mode().IsDir() {
@ -2137,9 +2140,11 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, dele
return err return err
} }
// Stat a volume entry. if !skipAccessChecks(volume) {
if err = Access(volumeDir); err != nil { // Stat a volume entry.
return convertAccessError(err, errVolumeAccessDenied) if err = Access(volumeDir); err != nil {
return convertAccessError(err, errVolumeAccessDenied)
}
} }
// Following code is needed so that we retain SlashSeparator suffix if any in // Following code is needed so that we retain SlashSeparator suffix if any in
@ -2155,10 +2160,10 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, dele
func skipAccessChecks(volume string) (ok bool) { func skipAccessChecks(volume string) (ok bool) {
for _, prefix := range []string{ for _, prefix := range []string{
minioMetaTmpBucket,
minioMetaBucket, minioMetaBucket,
minioMetaMultipartBucket, minioMetaMultipartBucket,
minioMetaTmpDeletedBucket, minioMetaTmpDeletedBucket,
minioMetaTmpBucket,
} { } {
if strings.HasPrefix(volume, prefix) { if strings.HasPrefix(volume, prefix) {
return true return true
@ -2179,7 +2184,8 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
} }
if err != nil && !IsErr(err, ignoredErrs...) && !contextCanceled(ctx) { if err != nil && !IsErr(err, ignoredErrs...) && !contextCanceled(ctx) {
// Only log these errors if context is not yet canceled. // Only log these errors if context is not yet canceled.
logger.LogOnceIf(ctx, fmt.Errorf("srcVolume: %s, srcPath: %s, dstVolume: %s:, dstPath: %s - error %v", logger.LogOnceIf(ctx, fmt.Errorf("drive:%s, srcVolume: %s, srcPath: %s, dstVolume: %s:, dstPath: %s - error %v",
s.drivePath,
srcVolume, srcPath, srcVolume, srcPath,
dstVolume, dstPath, dstVolume, dstPath,
err), "xl-storage-rename-data-"+srcVolume+"-"+dstVolume) err), "xl-storage-rename-data-"+srcVolume+"-"+dstVolume)
@ -2604,9 +2610,11 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
return err return err
} }
// Stat a volume entry. if !skipAccessChecks(volume) {
if err = Access(volumeDir); err != nil { // Stat a volume entry.
return convertAccessError(err, errVolumeAccessDenied) if err = Access(volumeDir); err != nil {
return convertAccessError(err, errVolumeAccessDenied)
}
} }
erasure := fi.Erasure erasure := fi.Erasure
@ -2639,7 +2647,7 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error {
defer close(resp) defer close(resp)
volumeDir := pathJoin(s.diskPath, req.Bucket) volumeDir := pathJoin(s.drivePath, req.Bucket)
found := 0 found := 0
for _, f := range req.Files { for _, f := range req.Files {
if contextCanceled(ctx) { if contextCanceled(ctx) {
@ -2702,10 +2710,6 @@ func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string, glob
return stat, err return stat, err
} }
// Stat a volume entry.
if err = Access(volumeDir); err != nil {
return stat, convertAccessError(err, errVolumeAccessDenied)
}
files := []string{pathJoin(volumeDir, path)} files := []string{pathJoin(volumeDir, path)}
if glob { if glob {
files, err = filepathx.Glob(filepath.Join(volumeDir, path)) files, err = filepathx.Glob(filepath.Join(volumeDir, path))
@ -2719,6 +2723,10 @@ func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string, glob
} }
st, _ := Lstat(filePath) st, _ := Lstat(filePath)
if st == nil { if st == nil {
// Stat a volume entry.
if verr := Access(volumeDir); verr != nil {
return stat, convertAccessError(verr, errVolumeAccessDenied)
}
return stat, errPathNotFound return stat, errPathNotFound
} }
name, err := filepath.Rel(volumeDir, filePath) name, err := filepath.Rel(volumeDir, filePath)