mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
ignore typed errors correctly in list cache layer (#10879)
bonus write bucket metadata cache with enough quorum Possible fix for #10868
This commit is contained in:
parent
f86d3538f6
commit
ca88ca753c
@ -440,7 +440,11 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
|
|||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
err := store.GetObject(ctx, dataUsageBucket, name, 0, -1, &buf, "", ObjectOptions{})
|
err := store.GetObject(ctx, dataUsageBucket, name, 0, -1, &buf, "", ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) && !errors.Is(err, InsufficientReadQuorum{}) {
|
switch err.(type) {
|
||||||
|
case ObjectNotFound:
|
||||||
|
case BucketNotFound:
|
||||||
|
case InsufficientReadQuorum:
|
||||||
|
default:
|
||||||
return toObjectErr(err, dataUsageBucket, name)
|
return toObjectErr(err, dataUsageBucket, name)
|
||||||
}
|
}
|
||||||
*d = dataUsageCache{}
|
*d = dataUsageCache{}
|
||||||
|
@ -102,14 +102,14 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
|
|||||||
err := objAPI.GetObject(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), 0, -1, w, "", ObjectOptions{})
|
err := objAPI.GetObject(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), 0, -1, w, "", ObjectOptions{})
|
||||||
logger.LogIf(ctx, w.CloseWithError(err))
|
logger.LogIf(ctx, w.CloseWithError(err))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectNotFound(err) {
|
switch err.(type) {
|
||||||
|
case ObjectNotFound:
|
||||||
err = nil
|
err = nil
|
||||||
} else {
|
case InsufficientReadQuorum:
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
}
|
|
||||||
if errors.Is(err, InsufficientReadQuorum{}) {
|
|
||||||
// Cache is likely lost. Clean up and return new.
|
// Cache is likely lost. Clean up and return new.
|
||||||
return newBucketMetacache(bucket, true), nil
|
return newBucketMetacache(bucket, true), nil
|
||||||
|
default:
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
}
|
}
|
||||||
return newBucketMetacache(bucket, false), err
|
return newBucketMetacache(bucket, false), err
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -29,8 +28,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/cmd/config/storageclass"
|
|
||||||
xhttp "github.com/minio/minio/cmd/http"
|
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/minio/minio/pkg/console"
|
"github.com/minio/minio/pkg/console"
|
||||||
"github.com/minio/minio/pkg/hash"
|
"github.com/minio/minio/pkg/hash"
|
||||||
@ -386,17 +383,23 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{})
|
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errFileNotFound || errors.Is(err, errErasureReadQuorum) || errors.Is(err, InsufficientReadQuorum{}) {
|
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
|
||||||
|
case ObjectNotFound:
|
||||||
retries++
|
retries++
|
||||||
time.Sleep(retryDelay)
|
time.Sleep(retryDelay)
|
||||||
continue
|
continue
|
||||||
}
|
case InsufficientReadQuorum:
|
||||||
|
retries++
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
continue
|
||||||
|
default:
|
||||||
if debugPrint {
|
if debugPrint {
|
||||||
console.Infoln("first getObjectFileInfo", o.objectPath(0), "returned err:", err)
|
console.Infoln("first getObjectFileInfo", o.objectPath(0), "returned err:", err)
|
||||||
console.Infof("err type: %T\n", err)
|
console.Infof("err type: %T\n", err)
|
||||||
}
|
}
|
||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if fi.Deleted {
|
if fi.Deleted {
|
||||||
return entries, errFileNotFound
|
return entries, errFileNotFound
|
||||||
}
|
}
|
||||||
@ -404,7 +407,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
partN, err := o.findFirstPart(fi)
|
partN, err := o.findFirstPart(fi)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
case io.ErrUnexpectedEOF, errErasureReadQuorum, InsufficientReadQuorum{}:
|
case io.ErrUnexpectedEOF:
|
||||||
if retries == 10 {
|
if retries == 10 {
|
||||||
err := o.checkMetacacheState(ctx, rpc)
|
err := o.checkMetacacheState(ctx, rpc)
|
||||||
if debugPrint {
|
if debugPrint {
|
||||||
@ -462,16 +465,11 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
}
|
}
|
||||||
// Load first part metadata...
|
// Load first part metadata...
|
||||||
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{})
|
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{})
|
||||||
switch err {
|
if err != nil {
|
||||||
case errFileNotFound, errErasureReadQuorum, InsufficientReadQuorum{}:
|
|
||||||
time.Sleep(retryDelay)
|
time.Sleep(retryDelay)
|
||||||
retries++
|
retries++
|
||||||
continue
|
continue
|
||||||
default:
|
}
|
||||||
time.Sleep(retryDelay)
|
|
||||||
retries++
|
|
||||||
continue
|
|
||||||
case nil:
|
|
||||||
loadedPart = partN
|
loadedPart = partN
|
||||||
bi, err := getMetacacheBlockInfo(fi, partN)
|
bi, err := getMetacacheBlockInfo(fi, partN)
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
@ -480,22 +478,26 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
return entries, io.EOF
|
return entries, io.EOF
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if fi.Deleted {
|
if fi.Deleted {
|
||||||
return entries, io.ErrUnexpectedEOF
|
return entries, io.ErrUnexpectedEOF
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, &buf, fi, metaArr, onlineDisks)
|
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, &buf, fi, metaArr, onlineDisks)
|
||||||
switch err {
|
if err != nil {
|
||||||
case errFileNotFound, errErasureReadQuorum, InsufficientReadQuorum{}:
|
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
|
||||||
time.Sleep(retryDelay)
|
case ObjectNotFound:
|
||||||
retries++
|
retries++
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
continue
|
||||||
|
case InsufficientReadQuorum:
|
||||||
|
retries++
|
||||||
|
time.Sleep(retryDelay)
|
||||||
continue
|
continue
|
||||||
default:
|
default:
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
return entries, err
|
return entries, err
|
||||||
case nil:
|
}
|
||||||
}
|
}
|
||||||
tmp, err := newMetacacheReader(&buf)
|
tmp, err := newMetacacheReader(&buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -666,7 +668,6 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
|
|||||||
r, err := hash.NewReader(bytes.NewBuffer(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false)
|
r, err := hash.NewReader(bytes.NewBuffer(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false)
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
custom := b.headerKV()
|
custom := b.headerKV()
|
||||||
custom[xhttp.AmzStorageClass] = storageclass.RRS
|
|
||||||
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{UserDefined: custom})
|
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{UserDefined: custom})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metaMu.Lock()
|
metaMu.Lock()
|
||||||
|
@ -19,13 +19,13 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
jsoniter "github.com/json-iterator/go"
|
||||||
"github.com/klauspost/compress/s2"
|
"github.com/klauspost/compress/s2"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/tinylib/msgp/msgp"
|
"github.com/tinylib/msgp/msgp"
|
||||||
@ -816,6 +816,7 @@ type metacacheBlock struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b metacacheBlock) headerKV() map[string]string {
|
func (b metacacheBlock) headerKV() map[string]string {
|
||||||
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
v, err := json.Marshal(b)
|
v, err := json.Marshal(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(context.Background(), err) // Unlikely
|
logger.LogIf(context.Background(), err) // Unlikely
|
||||||
|
Loading…
Reference in New Issue
Block a user