check for errors on bitrotWriter Close() (#20982)

This commit is contained in:
Anis Eleuch 2025-02-26 20:26:13 +01:00 committed by GitHub
parent 7cc0c69228
commit 953a3e2bbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 34 additions and 10 deletions

View File

@ -128,14 +128,20 @@ func closeBitrotReaders(rs []io.ReaderAt) {
} }
// Close all the writers. // Close all the writers.
func closeBitrotWriters(ws []io.Writer) { func closeBitrotWriters(ws []io.Writer) []error {
for _, w := range ws { errs := make([]error, len(ws))
if w != nil { for i, w := range ws {
if bw, ok := w.(io.Closer); ok { if w == nil {
bw.Close() errs[i] = errDiskNotFound
} continue
}
if bw, ok := w.(io.Closer); ok {
errs[i] = bw.Close()
} else {
errs[i] = nil
} }
} }
return errs
} }
// Returns hash sum for whole-bitrot, nil for streaming-bitrot. // Returns hash sum for whole-bitrot, nil for streaming-bitrot.

View File

@ -608,7 +608,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
// later to the final location. // later to the final location.
err = erasure.Heal(ctx, writers, readers, partSize, prefer) err = erasure.Heal(ctx, writers, readers, partSize, prefer)
closeBitrotReaders(readers) closeBitrotReaders(readers)
closeBitrotWriters(writers) closeErrs := closeBitrotWriters(writers)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -628,6 +628,13 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
continue continue
} }
// A non-nil stale disk which got error on Close()
if closeErrs[i] != nil {
outDatedDisks[i] = nil
disksToHealCount--
continue
}
partsMetadata[i].DataDir = dstDataDir partsMetadata[i].DataDir = dstDataDir
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx, partChecksums) partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx, partChecksums)
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {

View File

@ -668,10 +668,13 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
} }
n, err := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum) n, err := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
closeBitrotWriters(writers) closeErrs := closeBitrotWriters(writers)
if err != nil { if err != nil {
return pi, toObjectErr(err, bucket, object) return pi, toObjectErr(err, bucket, object)
} }
if closeErr := reduceWriteQuorumErrs(ctx, closeErrs, objectOpIgnoredErrs, writeQuorum); closeErr != nil {
return pi, toObjectErr(closeErr, bucket, object)
}
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.

View File

@ -1178,11 +1178,15 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
} }
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum) n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
closeBitrotWriters(writers) closeErrs := closeBitrotWriters(writers)
if erasureErr != nil { if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key) return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key)
} }
if closeErr := reduceWriteQuorumErrs(ctx, closeErrs, objectOpIgnoredErrs, writeQuorum); closeErr != nil {
return ObjectInfo{}, toObjectErr(closeErr, minioMetaBucket, key)
}
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.
if n < data.Size() { if n < data.Size() {
@ -1425,11 +1429,15 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
bugLogIf(ctx, err) bugLogIf(ctx, err)
} }
n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum) n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
closeBitrotWriters(writers) closeErrs := closeBitrotWriters(writers)
if erasureErr != nil { if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, bucket, object) return ObjectInfo{}, toObjectErr(erasureErr, bucket, object)
} }
if closeErr := reduceWriteQuorumErrs(ctx, closeErrs, objectOpIgnoredErrs, writeQuorum); closeErr != nil {
return ObjectInfo{}, toObjectErr(closeErr, bucket, object)
}
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.
if n < data.Size() { if n < data.Size() {