fix: skip access checks further for known buckets (#17934)

This commit is contained in:
Harshavardhana 2023-08-28 15:16:41 -07:00 committed by GitHub
parent 8a57b6bced
commit 7cafdc0512
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 51 deletions

View File

@ -69,6 +69,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
return err return err
} }
if !skipAccessChecks(opts.Bucket) {
// Stat a volume entry. // Stat a volume entry.
if err = Access(volumeDir); err != nil { if err = Access(volumeDir); err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
@ -78,6 +79,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
return err return err
} }
}
s.RLock() s.RLock()
legacy := s.formatLegacy legacy := s.formatLegacy

View File

@ -38,8 +38,7 @@ func access(name string) error {
return nil return nil
} }
// Forked from Golang but chooses fast path upon os.Mkdir() // Forked from Golang but chooses to avoid performing lookup
// error to avoid os.Lstat() call.
// //
// osMkdirAll creates a directory named path, // osMkdirAll creates a directory named path,
// along with any necessary parents, and returns nil, // along with any necessary parents, and returns nil,
@ -49,15 +48,6 @@ func access(name string) error {
// If path is already a directory, MkdirAll does nothing // If path is already a directory, MkdirAll does nothing
// and returns nil. // and returns nil.
func osMkdirAll(dirPath string, perm os.FileMode) error { func osMkdirAll(dirPath string, perm os.FileMode) error {
// Fast path: if we can tell whether path is a directory or file, stop with success or error.
err := Access(dirPath)
if err == nil {
return nil
}
if !osIsNotExist(err) {
return &os.PathError{Op: "mkdir", Path: dirPath, Err: err}
}
// Slow path: make sure parent exists and then call Mkdir for path. // Slow path: make sure parent exists and then call Mkdir for path.
i := len(dirPath) i := len(dirPath)
for i > 0 && os.IsPathSeparator(dirPath[i-1]) { // Skip trailing path separator. for i > 0 && os.IsPathSeparator(dirPath[i-1]) { // Skip trailing path separator.
@ -71,13 +61,13 @@ func osMkdirAll(dirPath string, perm os.FileMode) error {
if j > 1 { if j > 1 {
// Create parent. // Create parent.
if err = osMkdirAll(dirPath[:j-1], perm); err != nil { if err := osMkdirAll(dirPath[:j-1], perm); err != nil {
return err return err
} }
} }
// Parent now exists; invoke Mkdir and use its result. // Parent now exists; invoke Mkdir and use its result.
if err = Mkdir(dirPath, perm); err != nil { if err := Mkdir(dirPath, perm); err != nil {
if osIsExist(err) { if osIsExist(err) {
return nil return nil
} }

View File

@ -949,6 +949,7 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
} }
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
if !skipAccessChecks(volume) {
if ierr := Access(volumeDir); ierr != nil { if ierr := Access(volumeDir); ierr != nil {
if osIsNotExist(ierr) { if osIsNotExist(ierr) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
@ -957,6 +958,7 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
} }
} }
} }
}
return nil, err return nil, err
} }
@ -970,7 +972,7 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
} }
var legacyJSON bool var legacyJSON bool
buf, _, err := s.readAllData(ctx, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile), false) buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile), false)
if err != nil { if err != nil {
if !errors.Is(err, errFileNotFound) { if !errors.Is(err, errFileNotFound) {
return err return err
@ -980,7 +982,7 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
legacy := s.formatLegacy legacy := s.formatLegacy
s.RUnlock() s.RUnlock()
if legacy { if legacy {
buf, _, err = s.readAllData(ctx, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1), false) buf, _, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1), false)
if err != nil { if err != nil {
return err return err
} }
@ -1367,24 +1369,26 @@ func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) {
return nil return nil
} }
func (s *xlStorage) readRaw(ctx context.Context, volumeDir, filePath string, readData bool) (buf []byte, dmTime time.Time, err error) { func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath string, readData bool) (buf []byte, dmTime time.Time, err error) {
if readData { if readData {
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFile), true) buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile), true)
} else { } else {
buf, dmTime, err = s.readMetadataWithDMTime(ctx, pathJoin(filePath, xlStorageFormatFile)) buf, dmTime, err = s.readMetadataWithDMTime(ctx, pathJoin(filePath, xlStorageFormatFile))
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
if !skipAccessChecks(volume) {
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) { if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
return nil, time.Time{}, errVolumeNotFound return nil, time.Time{}, errVolumeNotFound
} }
} }
}
err = osErrToFileErr(err) err = osErrToFileErr(err)
} }
} }
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true) buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true)
if err != nil { if err != nil {
return nil, time.Time{}, err return nil, time.Time{}, err
} }
@ -1414,7 +1418,7 @@ func (s *xlStorage) ReadXL(ctx context.Context, volume, path string, readData bo
return RawFileInfo{}, err return RawFileInfo{}, err
} }
buf, dmTime, err := s.readRaw(ctx, volumeDir, filePath, readData) buf, dmTime, err := s.readRaw(ctx, volume, volumeDir, filePath, readData)
return RawFileInfo{ return RawFileInfo{
Buf: buf, Buf: buf,
DiskMTime: dmTime, DiskMTime: dmTime,
@ -1435,7 +1439,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, err return fi, err
} }
buf, dmTime, err := s.readRaw(ctx, volumeDir, filePath, readData) buf, dmTime, err := s.readRaw(ctx, volume, volumeDir, filePath, readData)
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
if versionID != "" { if versionID != "" {
@ -1492,7 +1496,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
len(fi.Parts) == 1 { len(fi.Parts) == 1 {
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number) partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath) dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath)
fi.Data, _, err = s.readAllData(ctx, volumeDir, dataPath, true) fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, true)
if err != nil { if err != nil {
return FileInfo{}, err return FileInfo{}, err
} }
@ -1502,7 +1506,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, nil return fi, nil
} }
func (s *xlStorage) readAllData(ctx context.Context, volumeDir string, filePath string, sync bool) (buf []byte, dmTime time.Time, err error) { func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, filePath string, sync bool) (buf []byte, dmTime time.Time, err error) {
if contextCanceled(ctx) { if contextCanceled(ctx) {
return nil, time.Time{}, ctx.Err() return nil, time.Time{}, ctx.Err()
} }
@ -1519,9 +1523,11 @@ func (s *xlStorage) readAllData(ctx context.Context, volumeDir string, filePath
case osIsNotExist(err): case osIsNotExist(err):
// Check if the object doesn't exist because its bucket // Check if the object doesn't exist because its bucket
// is missing in order to return the correct error. // is missing in order to return the correct error.
if !skipAccessChecks(volume) {
if err = Access(volumeDir); err != nil && osIsNotExist(err) { if err = Access(volumeDir); err != nil && osIsNotExist(err) {
return nil, dmTime, errVolumeNotFound return nil, dmTime, errVolumeNotFound
} }
}
return nil, dmTime, errFileNotFound return nil, dmTime, errFileNotFound
case osIsPermission(err): case osIsPermission(err):
return nil, dmTime, errFileAccessDenied return nil, dmTime, errFileAccessDenied
@ -1610,7 +1616,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu
return nil, err return nil, err
} }
buf, _, err = s.readAllData(ctx, volumeDir, filePath, true) buf, _, err = s.readAllData(ctx, volume, volumeDir, filePath, true)
return buf, err return buf, err
} }
@ -1799,9 +1805,11 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
if err != nil { if err != nil {
switch { switch {
case osIsNotExist(err): case osIsNotExist(err):
if !skipAccessChecks(volume) {
if err = Access(volumeDir); err != nil && osIsNotExist(err) { if err = Access(volumeDir); err != nil && osIsNotExist(err) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
} }
}
return nil, errFileNotFound return nil, errFileNotFound
case osIsPermission(err): case osIsPermission(err):
return nil, errFileAccessDenied return nil, errFileAccessDenied
@ -2082,6 +2090,7 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
st, err := Lstat(filePath) st, err := Lstat(filePath)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
if !skipAccessChecks(volume) {
// Stat a volume entry. // Stat a volume entry.
if verr := Access(volumeDir); verr != nil { if verr := Access(volumeDir); verr != nil {
if osIsNotExist(verr) { if osIsNotExist(verr) {
@ -2090,6 +2099,7 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
return verr return verr
} }
} }
}
return osErrToFileErr(err) return osErrToFileErr(err)
} }
if st.Mode().IsDir() { if st.Mode().IsDir() {
@ -2716,7 +2726,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
if req.MetadataOnly { if req.MetadataOnly {
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)
} else { } else {
data, mt, err = s.readAllData(ctx, volumeDir, fullPath, false) data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, false)
} }
return err return err
}); err != nil { }); err != nil {
@ -2774,10 +2784,12 @@ func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string, glob
} }
st, _ := Lstat(filePath) st, _ := Lstat(filePath)
if st == nil { if st == nil {
if !skipAccessChecks(volume) {
// Stat a volume entry. // Stat a volume entry.
if verr := Access(volumeDir); verr != nil { if verr := Access(volumeDir); verr != nil {
return stat, convertAccessError(verr, errVolumeAccessDenied) return stat, convertAccessError(verr, errVolumeAccessDenied)
} }
}
return stat, errPathNotFound return stat, errPathNotFound
} }
name, err := filepath.Rel(volumeDir, filePath) name, err := filepath.Rel(volumeDir, filePath)
@ -2809,7 +2821,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path
} }
baseDir := pathJoin(volumeDir, path+slashSeparator) baseDir := pathJoin(volumeDir, path+slashSeparator)
metaPath := pathutil.Join(baseDir, xlStorageFormatFile) metaPath := pathutil.Join(baseDir, xlStorageFormatFile)
buf, _, err := s.readAllData(ctx, volumeDir, metaPath, false) buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, false)
if err != nil { if err != nil {
return err return err
} }