Add XL reader without data (#12351)

Add XL metadata reader that reads metadata only on larger files.

Use for scanning and listing for now.
This commit is contained in:
Klaus Post 2021-05-21 18:10:54 +02:00 committed by GitHub
parent 783ea5eb5c
commit 9d1b6fb37d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 170 additions and 2 deletions

View File

@ -154,11 +154,24 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry))
f, err := os.OpenFile(pathJoin(volumeDir, current, entry), readMode, 0)
if err != nil {
logger.LogIf(ctx, err)
continue
}
stat, err := f.Stat()
if err != nil {
logger.LogIf(ctx, err)
f.Close()
continue
}
meta.metadata, err = readXLMetaNoData(f, stat.Size())
if err != nil {
logger.LogIf(ctx, err)
f.Close()
continue
}
f.Close()
meta.metadata = xlMetaV2TrimData(meta.metadata)
meta.name = strings.TrimSuffix(entry, xlStorageFormatFile)
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)

View File

@ -22,6 +22,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
"sort"
"strings"
"time"
@ -1368,3 +1369,141 @@ func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e
return FileInfo{}, errFileVersionNotFound
}
// readXLMetaNoData will load the metadata, but skip data segments.
// This should only be used when data is never interesting.
// If data is not xlv2, it is returned in full.
func readXLMetaNoData(r io.Reader, size int64) ([]byte, error) {
// Read at most this much on initial read.
const readDefault = 4 << 10
initial := size
hasFull := true
if initial > readDefault {
initial = readDefault
hasFull = false
}
buf := make([]byte, initial)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, fmt.Errorf("readXLMetaNoData.ReadFull: %w", err)
}
readMore := func(n int64) error {
has := int64(len(buf))
if has >= n {
return nil
}
if hasFull || n > size {
return io.ErrUnexpectedEOF
}
extra := n - has
buf = append(buf, make([]byte, extra)...)
_, err := io.ReadFull(r, buf[has:])
if err != nil {
if err == io.EOF {
// Returned if we read nothing.
return io.ErrUnexpectedEOF
}
return fmt.Errorf("readXLMetaNoData.readMore: %w", err)
}
return nil
}
tmp, major, minor, err := checkXL2V1(buf)
if err != nil {
err = readMore(size)
return buf, err
}
switch major {
case 1:
switch minor {
case 0:
err = readMore(size)
return buf, err
case 1, 2:
sz, tmp, err := ReadBytesHeader(tmp)
if err != nil {
return nil, err
}
want := int64(sz) + int64(len(buf)-len(tmp))
// v1.1 does not have CRC.
if minor < 2 {
if err := readMore(want); err != nil {
return nil, err
}
return buf[:want], nil
}
// CRC is variable length, so we need to truncate exactly that.
wantMax := want + msgp.Uint32Size
if wantMax > size {
wantMax = size
}
if err := readMore(wantMax); err != nil {
return nil, err
}
tmp = buf[want:]
_, after, err := msgp.ReadUint32Bytes(tmp)
if err != nil {
return nil, err
}
want += int64(len(tmp) - len(after))
return buf[:want], err
default:
return nil, errors.New("unknown minor metadata version")
}
default:
return nil, errors.New("unknown major metadata version")
}
}
// ReadBytesHeader reads the 'bin' header size
// off of 'b' and returns the size and remaining bytes.
// Possible errors:
// - ErrShortBytes (too few bytes)
// - TypeError{} (not a bin object)
// TODO: Replace when https://github.com/tinylib/msgp/pull/289 is merged.
func ReadBytesHeader(b []byte) (sz uint32, o []byte, err error) {
if len(b) < 1 {
return 0, nil, msgp.ErrShortBytes
}
var big = binary.BigEndian
const (
mbin8 uint8 = 0xc4
mbin16 uint8 = 0xc5
mbin32 uint8 = 0xc6
)
switch b[0] {
case mbin8:
if len(b) < 2 {
err = msgp.ErrShortBytes
return
}
sz = uint32(b[1])
o = b[2:]
return
case mbin16:
if len(b) < 3 {
err = msgp.ErrShortBytes
return
}
sz = uint32(big.Uint16(b[1:]))
o = b[3:]
return
case mbin32:
if len(b) < 5 {
err = msgp.ErrShortBytes
return
}
sz = big.Uint32(b[1:])
o = b[5:]
return
default:
err = msgp.TypeError{Method: msgp.BinType, Encoded: msgp.NextType(b)}
return
}
}

View File

@ -421,13 +421,29 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
return sizeSummary{}, errSkipFile
}
buf, err := xioutil.ReadFile(item.Path)
f, err := os.OpenFile(item.Path, readMode, 0)
if err != nil {
if intDataUpdateTracker.debug {
console.Debugf(color.Green("scannerBucket:")+" object path missing: %v: %w\n", item.Path, err)
}
return sizeSummary{}, errSkipFile
}
defer f.Close()
stat, err := f.Stat()
if err != nil {
if intDataUpdateTracker.debug {
console.Debugf(color.Green("scannerBucket:")+" stat failed: %v: %w\n", item.Path, err)
}
return sizeSummary{}, errSkipFile
}
buf, err := readXLMetaNoData(f, stat.Size())
if err != nil {
if intDataUpdateTracker.debug {
console.Debugf(color.Green("scannerBucket:")+" readXLMetaNoData: %v: %w\n", item.Path, err)
}
return sizeSummary{}, errSkipFile
}
// Remove filename which is the meta file.
item.transformMetaDir()