mirror of
https://github.com/minio/minio.git
synced 2025-11-09 05:34:56 -05:00
converge SNSD deployments into single code (#15988)
This commit is contained in:
@@ -564,170 +564,6 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
||||
}
|
||||
}
|
||||
|
||||
func (es *erasureSingle) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
||||
retries := 0
|
||||
rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix))
|
||||
|
||||
for {
|
||||
if contextCanceled(ctx) {
|
||||
return entries, ctx.Err()
|
||||
}
|
||||
|
||||
// If many failures, check the cache state.
|
||||
if retries > 10 {
|
||||
err := o.checkMetacacheState(ctx, rpc)
|
||||
if err != nil {
|
||||
return entries, fmt.Errorf("remote listing canceled: %w", err)
|
||||
}
|
||||
retries = 1
|
||||
}
|
||||
|
||||
const retryDelay = 250 * time.Millisecond
|
||||
// All operations are performed without locks, so we must be careful and allow for failures.
|
||||
// Read metadata associated with the object from a disk.
|
||||
if retries > 0 {
|
||||
_, err := es.disk.ReadVersion(ctx, minioMetaBucket,
|
||||
o.objectPath(0), "", false)
|
||||
if err != nil {
|
||||
time.Sleep(retryDelay)
|
||||
retries++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Load first part metadata...
|
||||
// Read metadata associated with the object from all disks.
|
||||
fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true)
|
||||
if err != nil {
|
||||
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
|
||||
case ObjectNotFound:
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
continue
|
||||
case InsufficientReadQuorum:
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
continue
|
||||
default:
|
||||
return entries, fmt.Errorf("reading first part metadata: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
partN, err := o.findFirstPart(fi)
|
||||
switch {
|
||||
case err == nil:
|
||||
case errors.Is(err, io.ErrUnexpectedEOF):
|
||||
if retries == 10 {
|
||||
err := o.checkMetacacheState(ctx, rpc)
|
||||
if err != nil {
|
||||
return entries, fmt.Errorf("remote listing canceled: %w", err)
|
||||
}
|
||||
retries = -1
|
||||
}
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
continue
|
||||
case errors.Is(err, io.EOF):
|
||||
return entries, io.EOF
|
||||
}
|
||||
|
||||
// We got a stream to start at.
|
||||
loadedPart := 0
|
||||
for {
|
||||
if contextCanceled(ctx) {
|
||||
return entries, ctx.Err()
|
||||
}
|
||||
|
||||
if partN != loadedPart {
|
||||
if retries > 10 {
|
||||
err := o.checkMetacacheState(ctx, rpc)
|
||||
if err != nil {
|
||||
return entries, fmt.Errorf("waiting for next part %d: %w", partN, err)
|
||||
}
|
||||
retries = 1
|
||||
}
|
||||
|
||||
if retries > 0 {
|
||||
// Load from one disk only
|
||||
_, err := es.disk.ReadVersion(ctx, minioMetaBucket,
|
||||
o.objectPath(partN), "", false)
|
||||
if err != nil {
|
||||
time.Sleep(retryDelay)
|
||||
retries++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Load partN metadata...
|
||||
fi, metaArr, onlineDisks, err = es.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true)
|
||||
if err != nil {
|
||||
time.Sleep(retryDelay)
|
||||
retries++
|
||||
continue
|
||||
}
|
||||
loadedPart = partN
|
||||
bi, err := getMetacacheBlockInfo(fi, partN)
|
||||
logger.LogIf(ctx, err)
|
||||
if err == nil {
|
||||
if bi.pastPrefix(o.Prefix) {
|
||||
return entries, io.EOF
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
werr := es.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0,
|
||||
fi.Size, pw, fi, metaArr, onlineDisks)
|
||||
pw.CloseWithError(werr)
|
||||
}()
|
||||
|
||||
tmp := newMetacacheReader(pr)
|
||||
e, err := tmp.filter(o)
|
||||
pr.CloseWithError(err)
|
||||
entries.o = append(entries.o, e.o...)
|
||||
if o.Limit > 0 && entries.len() > o.Limit {
|
||||
entries.truncate(o.Limit)
|
||||
return entries, nil
|
||||
}
|
||||
if err == nil {
|
||||
// We stopped within the listing, we are done for now...
|
||||
return entries, nil
|
||||
}
|
||||
if err != nil && err.Error() != io.EOF.Error() {
|
||||
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
|
||||
case ObjectNotFound:
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
continue
|
||||
case InsufficientReadQuorum:
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
continue
|
||||
default:
|
||||
logger.LogIf(ctx, err)
|
||||
return entries, err
|
||||
}
|
||||
}
|
||||
|
||||
// We finished at the end of the block.
|
||||
// And should not expect any more results.
|
||||
bi, err := getMetacacheBlockInfo(fi, partN)
|
||||
logger.LogIf(ctx, err)
|
||||
if err != nil || bi.EOS {
|
||||
// We are done and there are no more parts.
|
||||
return entries, io.EOF
|
||||
}
|
||||
if bi.endedPrefix(o.Prefix) {
|
||||
// Nothing more for prefix.
|
||||
return entries, io.EOF
|
||||
}
|
||||
partN++
|
||||
retries = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getListQuorum interprets list quorum values and returns appropriate
|
||||
// acceptable quorum expected for list operations
|
||||
func getListQuorum(quorum string, driveCount int) int {
|
||||
@@ -747,60 +583,6 @@ func getListQuorum(quorum string, driveCount int) int {
|
||||
return 3
|
||||
}
|
||||
|
||||
// Will return io.EOF if continuing would not yield more results.
|
||||
func (es *erasureSingle) listPathInner(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) {
|
||||
defer close(results)
|
||||
o.debugf(color.Green("listPath:")+" with options: %#v", o)
|
||||
|
||||
// How to resolve results.
|
||||
resolver := metadataResolutionParams{
|
||||
dirQuorum: 1,
|
||||
objQuorum: 1,
|
||||
bucket: o.Bucket,
|
||||
}
|
||||
|
||||
// Maximum versions requested for "latest" object
|
||||
// resolution on versioned buckets, this is to be only
|
||||
// used when o.Versioned is false
|
||||
if !o.Versioned {
|
||||
resolver.requestedVersions = 1
|
||||
}
|
||||
|
||||
var limit int
|
||||
if o.Limit > 0 && o.StopDiskAtLimit {
|
||||
// Over-read by 2 to know if we truncate results and not reach false EOF.
|
||||
limit = o.Limit + 2
|
||||
}
|
||||
|
||||
ctxDone := ctx.Done()
|
||||
return listPathRaw(ctx, listPathRawOptions{
|
||||
disks: []StorageAPI{es.disk},
|
||||
bucket: o.Bucket,
|
||||
path: o.BaseDir,
|
||||
recursive: o.Recursive,
|
||||
filterPrefix: o.FilterPrefix,
|
||||
minDisks: 1,
|
||||
forwardTo: o.Marker,
|
||||
perDiskLimit: limit,
|
||||
agreed: func(entry metaCacheEntry) {
|
||||
select {
|
||||
case <-ctxDone:
|
||||
case results <- entry:
|
||||
}
|
||||
},
|
||||
partial: func(entries metaCacheEntries, errs []error) {
|
||||
// Results Disagree :-(
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if ok {
|
||||
select {
|
||||
case <-ctxDone:
|
||||
case results <- *entry:
|
||||
}
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Will return io.EOF if continuing would not yield more results.
|
||||
func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) {
|
||||
defer close(results)
|
||||
@@ -899,133 +681,6 @@ func (m *metaCacheRPC) setErr(err string) {
|
||||
*m.meta = meta
|
||||
}
|
||||
|
||||
func (es *erasureSingle) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) {
|
||||
o := mc.o
|
||||
o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o)
|
||||
|
||||
metaMu := &mc.mu
|
||||
rpc := mc.rpc
|
||||
cancel := mc.cancel
|
||||
defer func() {
|
||||
o.debugln(color.Green("saveMetaCacheStream:")+"err:", err)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
go mc.setErr(err.Error())
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
defer cancel()
|
||||
// Save continuous updates
|
||||
go func() {
|
||||
var err error
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
var exit bool
|
||||
for !exit {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-ctx.Done():
|
||||
exit = true
|
||||
}
|
||||
metaMu.Lock()
|
||||
meta := *mc.meta
|
||||
meta, err = o.updateMetacacheListing(meta, rpc)
|
||||
if err == nil && time.Since(meta.lastHandout) > metacacheMaxClientWait {
|
||||
cancel()
|
||||
exit = true
|
||||
meta.status = scanStateError
|
||||
meta.error = fmt.Sprintf("listing canceled since time since last handout was %v ago", time.Since(meta.lastHandout).Round(time.Second))
|
||||
o.debugln(color.Green("saveMetaCacheStream: ") + meta.error)
|
||||
meta, err = o.updateMetacacheListing(meta, rpc)
|
||||
}
|
||||
if err == nil {
|
||||
*mc.meta = meta
|
||||
if meta.status == scanStateError {
|
||||
cancel()
|
||||
exit = true
|
||||
}
|
||||
}
|
||||
metaMu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
const retryDelay = 200 * time.Millisecond
|
||||
const maxTries = 5
|
||||
|
||||
// Keep destination...
|
||||
// Write results to disk.
|
||||
bw := newMetacacheBlockWriter(entries, func(b *metacacheBlock) error {
|
||||
// if the block is 0 bytes and its a first block skip it.
|
||||
// skip only this for Transient caches.
|
||||
if len(b.data) == 0 && b.n == 0 && o.Transient {
|
||||
return nil
|
||||
}
|
||||
o.debugln(color.Green("saveMetaCacheStream:")+" saving block", b.n, "to", o.objectPath(b.n))
|
||||
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
|
||||
logger.LogIf(ctx, err)
|
||||
custom := b.headerKV()
|
||||
_, err = es.putMetacacheObject(ctx, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
|
||||
UserDefined: custom,
|
||||
})
|
||||
if err != nil {
|
||||
mc.setErr(err.Error())
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
if b.n == 0 {
|
||||
return nil
|
||||
}
|
||||
// Update block 0 metadata.
|
||||
var retries int
|
||||
for {
|
||||
meta := b.headerKV()
|
||||
fi := FileInfo{
|
||||
Metadata: make(map[string]string, len(meta)),
|
||||
}
|
||||
for k, v := range meta {
|
||||
fi.Metadata[k] = v
|
||||
}
|
||||
err := es.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi, es.disk)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
return err
|
||||
case StorageErr:
|
||||
return err
|
||||
case InsufficientReadQuorum:
|
||||
default:
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
if retries >= maxTries {
|
||||
return err
|
||||
}
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Blocks while consuming entries or an error occurs.
|
||||
err = bw.Close()
|
||||
if err != nil {
|
||||
mc.setErr(err.Error())
|
||||
}
|
||||
metaMu.Lock()
|
||||
defer metaMu.Unlock()
|
||||
if mc.meta.error != "" {
|
||||
return err
|
||||
}
|
||||
// Save success
|
||||
mc.meta.status = scanStateSuccess
|
||||
meta, err := o.updateMetacacheListing(*mc.meta, rpc)
|
||||
if err == nil {
|
||||
*mc.meta = meta
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) {
|
||||
o := mc.o
|
||||
o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o)
|
||||
|
||||
Reference in New Issue
Block a user