mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
2d79d6d847
p.writers is a verbatim value of bitrotWriter backed by a pipe() that should never be nil'ed, instead use the captured errors to skip the writes. additionally detect also short writes, and reject them as errors.
111 lines
2.9 KiB
Go
111 lines
2.9 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
|
|
"sync"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
)
|
|
|
|
// Writes in parallel to writers
|
|
type parallelWriter struct {
|
|
writers []io.Writer
|
|
writeQuorum int
|
|
errs []error
|
|
}
|
|
|
|
// Write writes data to writers in parallel.
|
|
func (p *parallelWriter) Write(ctx context.Context, blocks [][]byte) error {
|
|
var wg sync.WaitGroup
|
|
|
|
for i := range p.writers {
|
|
if p.writers[i] == nil {
|
|
p.errs[i] = errDiskNotFound
|
|
continue
|
|
}
|
|
if p.errs[i] != nil {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
var n int
|
|
n, p.errs[i] = p.writers[i].Write(blocks[i])
|
|
if p.errs[i] == nil {
|
|
if n != len(blocks[i]) {
|
|
p.errs[i] = io.ErrShortWrite
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
// If nilCount >= p.writeQuorum, we return nil. This is because HealFile() uses
|
|
// CreateFile with p.writeQuorum=1 to accommodate healing of single disk.
|
|
// i.e if we do no return here in such a case, reduceWriteQuorumErrs() would
|
|
// return a quorum error to HealFile().
|
|
nilCount := countErrs(p.errs, nil)
|
|
if nilCount >= p.writeQuorum {
|
|
return nil
|
|
}
|
|
return reduceWriteQuorumErrs(ctx, p.errs, objectOpIgnoredErrs, p.writeQuorum)
|
|
}
|
|
|
|
// Encode reads from the reader, erasure-encodes the data and writes to the writers.
|
|
func (e *Erasure) Encode(ctx context.Context, src io.Reader, writers []io.Writer, buf []byte, quorum int) (total int64, err error) {
|
|
writer := ¶llelWriter{
|
|
writers: writers,
|
|
writeQuorum: quorum,
|
|
errs: make([]error, len(writers)),
|
|
}
|
|
|
|
for {
|
|
var blocks [][]byte
|
|
n, err := io.ReadFull(src, buf)
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
logger.LogIf(ctx, err)
|
|
return 0, err
|
|
}
|
|
eof := err == io.EOF || err == io.ErrUnexpectedEOF
|
|
if n == 0 && total != 0 {
|
|
// Reached EOF, nothing more to be done.
|
|
break
|
|
}
|
|
// We take care of the situation where if n == 0 and total == 0 by creating empty data and parity files.
|
|
blocks, err = e.EncodeData(ctx, buf[:n])
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return 0, err
|
|
}
|
|
|
|
if err = writer.Write(ctx, blocks); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return 0, err
|
|
}
|
|
total += int64(n)
|
|
if eof {
|
|
break
|
|
}
|
|
}
|
|
return total, nil
|
|
}
|