mirror of
https://github.com/minio/minio.git
synced 2025-04-05 04:10:28 -04:00
remove strict persistence requirements for List() .metacache objects (#17917)
.metacache objects are transient in nature, and are better left to use page-cache effectively to avoid using more IOPs on the disks. this allows for incoming calls to be not taxed heavily due to multiple large batch listings.
This commit is contained in:
parent
62c9e500de
commit
124e28578c
@ -1969,8 +1969,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
|||||||
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateObjectMeta will update the metadata of a file.
|
func (er erasureObjects) updateObjectMetaWithOpts(ctx context.Context, bucket, object string, fi FileInfo, onlineDisks []StorageAPI, opts UpdateMetadataOpts) error {
|
||||||
func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo, onlineDisks []StorageAPI) error {
|
|
||||||
if len(fi.Metadata) == 0 {
|
if len(fi.Metadata) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1984,7 +1983,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
|||||||
if onlineDisks[index] == nil {
|
if onlineDisks[index] == nil {
|
||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
return onlineDisks[index].UpdateMetadata(ctx, bucket, object, fi)
|
return onlineDisks[index].UpdateMetadata(ctx, bucket, object, fi, opts)
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1994,6 +1993,11 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
|||||||
return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, er.defaultWQuorum())
|
return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, er.defaultWQuorum())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateObjectMeta will update the metadata of a file.
|
||||||
|
func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo, onlineDisks []StorageAPI) error {
|
||||||
|
return er.updateObjectMetaWithOpts(ctx, bucket, object, fi, onlineDisks, UpdateMetadataOpts{})
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteObjectTags - delete object tags from an existing object
|
// DeleteObjectTags - delete object tags from an existing object
|
||||||
func (er erasureObjects) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
func (er erasureObjects) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||||
return er.PutObjectTags(ctx, bucket, object, "", opts)
|
return er.PutObjectTags(ctx, bucket, object, "", opts)
|
||||||
|
@ -785,7 +785,7 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
|
|||||||
for k, v := range meta {
|
for k, v := range meta {
|
||||||
fi.Metadata[k] = v
|
fi.Metadata[k] = v
|
||||||
}
|
}
|
||||||
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi, er.getDisks())
|
err := er.updateObjectMetaWithOpts(ctx, minioMetaBucket, o.objectPath(0), fi, er.getDisks(), UpdateMetadataOpts{NoPersistence: true})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -246,11 +246,11 @@ func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi
|
|||||||
return d.disk.WriteMetadata(ctx, volume, path, fi)
|
return d.disk.WriteMetadata(ctx, volume, path, fi)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
func (d *naughtyDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) (err error) {
|
||||||
if err := d.calcError(); err != nil {
|
if err := d.calcError(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return d.disk.UpdateMetadata(ctx, volume, path, fi)
|
return d.disk.UpdateMetadata(ctx, volume, path, fi, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
|
func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
|
||||||
|
@ -82,7 +82,7 @@ type StorageAPI interface {
|
|||||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error
|
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error
|
||||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error
|
DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error
|
||||||
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||||
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error
|
||||||
ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error)
|
ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error)
|
||||||
ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error)
|
ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error)
|
||||||
RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error)
|
RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error)
|
||||||
@ -252,7 +252,7 @@ func (p *unrecognizedDisk) DeleteVersion(ctx context.Context, volume, path strin
|
|||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *unrecognizedDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
func (p *unrecognizedDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) (err error) {
|
||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,10 +406,11 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error {
|
func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Set(storageRESTVolume, volume)
|
values.Set(storageRESTVolume, volume)
|
||||||
values.Set(storageRESTFilePath, path)
|
values.Set(storageRESTFilePath, path)
|
||||||
|
values.Set(storageRESTNoPersistence, strconv.FormatBool(opts.NoPersistence))
|
||||||
|
|
||||||
var reader bytes.Buffer
|
var reader bytes.Buffer
|
||||||
if err := msgp.Encode(&reader, &fi); err != nil {
|
if err := msgp.Encode(&reader, &fi); err != nil {
|
||||||
|
@ -84,4 +84,5 @@ const (
|
|||||||
storageRESTGlob = "glob"
|
storageRESTGlob = "glob"
|
||||||
storageRESTScanMode = "scan-mode"
|
storageRESTScanMode = "scan-mode"
|
||||||
storageRESTMetrics = "metrics"
|
storageRESTMetrics = "metrics"
|
||||||
|
storageRESTNoPersistence = "no-persistence"
|
||||||
)
|
)
|
||||||
|
@ -442,6 +442,7 @@ func (s *storageRESTServer) UpdateMetadataHandler(w http.ResponseWriter, r *http
|
|||||||
}
|
}
|
||||||
volume := r.Form.Get(storageRESTVolume)
|
volume := r.Form.Get(storageRESTVolume)
|
||||||
filePath := r.Form.Get(storageRESTFilePath)
|
filePath := r.Form.Get(storageRESTFilePath)
|
||||||
|
noPersistence := r.Form.Get(storageRESTNoPersistence) == "true"
|
||||||
|
|
||||||
if r.ContentLength < 0 {
|
if r.ContentLength < 0 {
|
||||||
s.writeErrorResponse(w, errInvalidArgument)
|
s.writeErrorResponse(w, errInvalidArgument)
|
||||||
@ -454,7 +455,7 @@ func (s *storageRESTServer) UpdateMetadataHandler(w http.ResponseWriter, r *http
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.storage.UpdateMetadata(r.Context(), volume, filePath, fi)
|
err := s.storage.UpdateMetadata(r.Context(), volume, filePath, fi, UpdateMetadataOpts{NoPersistence: noPersistence})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.writeErrorResponse(w, err)
|
s.writeErrorResponse(w, err)
|
||||||
}
|
}
|
||||||
|
@ -526,7 +526,7 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s
|
|||||||
return w.Run(func() error { return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) })
|
return w.Run(func() error { return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) })
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) (err error) {
|
||||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricUpdateMetadata, volume, path)
|
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricUpdateMetadata, volume, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -534,7 +534,7 @@ func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path
|
|||||||
defer done(&err)
|
defer done(&err)
|
||||||
|
|
||||||
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
|
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
|
||||||
return w.Run(func() error { return p.storage.UpdateMetadata(ctx, volume, path, fi) })
|
return w.Run(func() error { return p.storage.UpdateMetadata(ctx, volume, path, fi, opts) })
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
||||||
|
@ -964,19 +964,23 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis ...FileInfo) error {
|
func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis ...FileInfo) error {
|
||||||
|
volumeDir, err := s.getVolDir(volume)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
var legacyJSON bool
|
var legacyJSON bool
|
||||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
buf, _, err := s.readAllData(ctx, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, errFileNotFound) {
|
if !errors.Is(err, errFileNotFound) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
metaDataPoolPut(buf) // Never used, return it
|
|
||||||
|
|
||||||
s.RLock()
|
s.RLock()
|
||||||
legacy := s.formatLegacy
|
legacy := s.formatLegacy
|
||||||
s.RUnlock()
|
s.RUnlock()
|
||||||
if legacy {
|
if legacy {
|
||||||
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1))
|
buf, _, err = s.readAllData(ctx, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -988,11 +992,6 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
|
|||||||
return errFileNotFound
|
return errFileNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeDir, err := s.getVolDir(volume)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if legacyJSON {
|
if legacyJSON {
|
||||||
// Delete the meta file, if there are no more versions the
|
// Delete the meta file, if there are no more versions the
|
||||||
// top level parent is automatically removed.
|
// top level parent is automatically removed.
|
||||||
@ -1210,8 +1209,13 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
|||||||
return s.deleteFile(volumeDir, filePath, true, false)
|
return s.deleteFile(volumeDir, filePath, true, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateMetadataOpts provides an optional input to indicate if xl.meta updates need to be fully synced to disk.
|
||||||
|
type UpdateMetadataOpts struct {
|
||||||
|
NoPersistence bool
|
||||||
|
}
|
||||||
|
|
||||||
// Updates only metadata for a given version.
|
// Updates only metadata for a given version.
|
||||||
func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error {
|
func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error {
|
||||||
if len(fi.Metadata) == 0 {
|
if len(fi.Metadata) == 0 {
|
||||||
return errInvalidArgument
|
return errInvalidArgument
|
||||||
}
|
}
|
||||||
@ -1247,7 +1251,7 @@ func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi
|
|||||||
}
|
}
|
||||||
defer metaDataPoolPut(wbuf)
|
defer metaDataPoolPut(wbuf)
|
||||||
|
|
||||||
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), wbuf)
|
return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), wbuf, !opts.NoPersistence)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
||||||
@ -1365,7 +1369,7 @@ func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) {
|
|||||||
|
|
||||||
func (s *xlStorage) readRaw(ctx context.Context, volumeDir, filePath string, readData bool) (buf []byte, dmTime time.Time, err error) {
|
func (s *xlStorage) readRaw(ctx context.Context, volumeDir, filePath string, readData bool) (buf []byte, dmTime time.Time, err error) {
|
||||||
if readData {
|
if readData {
|
||||||
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFile))
|
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFile), true)
|
||||||
} else {
|
} else {
|
||||||
buf, dmTime, err = s.readMetadataWithDMTime(ctx, pathJoin(filePath, xlStorageFormatFile))
|
buf, dmTime, err = s.readMetadataWithDMTime(ctx, pathJoin(filePath, xlStorageFormatFile))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1380,7 +1384,7 @@ func (s *xlStorage) readRaw(ctx context.Context, volumeDir, filePath string, rea
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errFileNotFound {
|
if err == errFileNotFound {
|
||||||
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFileV1))
|
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, time.Time{}, err
|
return nil, time.Time{}, err
|
||||||
}
|
}
|
||||||
@ -1488,7 +1492,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||||||
len(fi.Parts) == 1 {
|
len(fi.Parts) == 1 {
|
||||||
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
|
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
|
||||||
dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath)
|
dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath)
|
||||||
fi.Data, _, err = s.readAllData(ctx, volumeDir, dataPath)
|
fi.Data, _, err = s.readAllData(ctx, volumeDir, dataPath, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return FileInfo{}, err
|
return FileInfo{}, err
|
||||||
}
|
}
|
||||||
@ -1498,14 +1502,14 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||||||
return fi, nil
|
return fi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) readAllData(ctx context.Context, volumeDir string, filePath string) (buf []byte, dmTime time.Time, err error) {
|
func (s *xlStorage) readAllData(ctx context.Context, volumeDir string, filePath string, sync bool) (buf []byte, dmTime time.Time, err error) {
|
||||||
if contextCanceled(ctx) {
|
if contextCanceled(ctx) {
|
||||||
return nil, time.Time{}, ctx.Err()
|
return nil, time.Time{}, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect
|
odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect
|
||||||
var f *os.File
|
var f *os.File
|
||||||
if odirectEnabled {
|
if odirectEnabled && sync {
|
||||||
f, err = OpenFileDirectIO(filePath, readMode, 0o666)
|
f, err = OpenFileDirectIO(filePath, readMode, 0o666)
|
||||||
} else {
|
} else {
|
||||||
f, err = OpenFile(filePath, readMode, 0o666)
|
f, err = OpenFile(filePath, readMode, 0o666)
|
||||||
@ -1606,7 +1610,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf, _, err = s.readAllData(ctx, volumeDir, filePath)
|
buf, _, err = s.readAllData(ctx, volumeDir, filePath, true)
|
||||||
return buf, err
|
return buf, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2705,7 +2709,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
|
|||||||
if req.MetadataOnly {
|
if req.MetadataOnly {
|
||||||
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)
|
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)
|
||||||
} else {
|
} else {
|
||||||
data, mt, err = s.readAllData(ctx, volumeDir, fullPath)
|
data, mt, err = s.readAllData(ctx, volumeDir, fullPath, false)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
@ -2798,7 +2802,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path
|
|||||||
}
|
}
|
||||||
baseDir := pathJoin(volumeDir, path+slashSeparator)
|
baseDir := pathJoin(volumeDir, path+slashSeparator)
|
||||||
metaPath := pathutil.Join(baseDir, xlStorageFormatFile)
|
metaPath := pathutil.Join(baseDir, xlStorageFormatFile)
|
||||||
buf, _, err := s.readAllData(ctx, volumeDir, metaPath)
|
buf, _, err := s.readAllData(ctx, volumeDir, metaPath, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user