mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
allow writes to pools with inconsistent xl.meta (#17008)
This commit is contained in:
@@ -432,30 +432,37 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
|
||||
for _, pinfo := range poolObjInfos {
|
||||
// skip all objects from suspended pools if asked by the
|
||||
// caller.
|
||||
if z.IsSuspended(pinfo.Index) && opts.SkipDecommissioned {
|
||||
if opts.SkipDecommissioned && z.IsSuspended(pinfo.Index) {
|
||||
continue
|
||||
}
|
||||
// Skip object if it's from pools participating in a rebalance operation.
|
||||
if opts.SkipRebalancing && z.IsPoolRebalancing(pinfo.Index) {
|
||||
continue
|
||||
}
|
||||
|
||||
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
|
||||
return pinfo, pinfo.Err
|
||||
if pinfo.Err == nil {
|
||||
// found a pool
|
||||
return pinfo, nil
|
||||
}
|
||||
if isErrReadQuorum(pinfo.Err) {
|
||||
// read quorum is returned when the object is visibly
|
||||
// present but its unreadable, we simply ask the writes to
|
||||
// schedule to this pool instead. If there is no quorum
|
||||
// it will fail anyways, however if there is quorum available
|
||||
// with enough disks online but sufficiently inconsistent to
|
||||
// break parity threshold, allow them to be overwritten
|
||||
// or allow new versions to be added.
|
||||
return pinfo, nil
|
||||
}
|
||||
defPool = pinfo
|
||||
if isErrObjectNotFound(pinfo.Err) {
|
||||
// No object exists or its a delete marker,
|
||||
// check objInfo to confirm.
|
||||
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
|
||||
return pinfo, nil
|
||||
}
|
||||
// objInfo is not valid, truly the object doesn't
|
||||
// exist proceed to next pool.
|
||||
continue
|
||||
if !isErrObjectNotFound(pinfo.Err) {
|
||||
return pinfo, pinfo.Err
|
||||
}
|
||||
|
||||
return pinfo, nil
|
||||
// No object exists or its a delete marker,
|
||||
// check objInfo to confirm.
|
||||
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
|
||||
return pinfo, nil
|
||||
}
|
||||
}
|
||||
if opts.ReplicationRequest && opts.DeleteMarker && defPool.Index >= 0 {
|
||||
// If the request is a delete marker replication request, return a default pool
|
||||
|
||||
@@ -674,6 +674,12 @@ func isErrBucketNotFound(err error) bool {
|
||||
return errors.As(err, &bkNotFound)
|
||||
}
|
||||
|
||||
// isErrReadQuorum check if the error type is InsufficentReadQuorum
|
||||
func isErrReadQuorum(err error) bool {
|
||||
var rquorum InsufficientReadQuorum
|
||||
return errors.As(err, &rquorum)
|
||||
}
|
||||
|
||||
// isErrObjectNotFound - Check if error type is ObjectNotFound.
|
||||
func isErrObjectNotFound(err error) bool {
|
||||
var objNotFound ObjectNotFound
|
||||
|
||||
Reference in New Issue
Block a user