a bunch of fixes for error handling (#19627)

- handle errFileCorrupt properly
- micro-optimization of sending done() response quicker
  to close the goroutine.
- fix logger.Event() usage in a couple of places
- handle the rest of the client to return a different error other than
  lastErr() when the client is closed.
This commit is contained in:
Harshavardhana 2024-04-28 10:53:50 -07:00 committed by GitHub
parent 93b2f8a0c5
commit a372c6a377
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 121 additions and 37 deletions

View File

@ -153,6 +153,8 @@ func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultiple
errFileVersionNotFound,
io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors
io.EOF, // some times we would read without locks, ignore these errors
context.DeadlineExceeded,
context.Canceled,
}
ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...)

View File

@ -403,7 +403,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, origbucket, bu
if fi.IsValid() {
return disks[index].WriteMetadata(ctx, origbucket, bucket, prefix, fi)
}
return errCorruptedFormat
return errFileCorrupt
}, index)
}

View File

@ -739,6 +739,15 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
if err != nil {
if errors.Is(err, errFileNotFound) {
// An in-quorum errFileNotFound means that client stream
// prematurely closed and we do not find any xl.meta or
// part.1's - in such a scenario we must return as if client
// disconnected. This means that erasure.Encode() CreateFile()
// did not do anything.
return pi, IncompleteBody{Bucket: bucket, Object: object}
}
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
@ -1314,11 +1323,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
partsMetadata, bucket, object, writeQuorum)
if err != nil {
return oi, toObjectErr(err, bucket, object)
return oi, toObjectErr(err, bucket, object, uploadID)
}
if err = er.commitRenameDataDir(ctx, bucket, object, oldDataDir, onlineDisks); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
return ObjectInfo{}, toObjectErr(err, bucket, object, uploadID)
}
if !opts.Speedtest && len(versions) > 0 {

View File

@ -562,7 +562,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVers, allParts bool) (FileInfo, error) {
var xl xlMetaV2
if err := xl.LoadOrConvert(ri.Buf); err != nil {
return FileInfo{}, err
return FileInfo{}, errFileCorrupt
}
fi, err := xl.ToFileInfo(bucket, object, "", inclFreeVers, allParts)
@ -571,7 +571,7 @@ func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVe
}
if !fi.IsValid() {
return FileInfo{}, errCorruptedFormat
return FileInfo{}, errFileCorrupt
}
versionID := fi.VersionID
@ -661,7 +661,7 @@ func pickLatestQuorumFilesInfo(ctx context.Context, rawFileInfos []RawFileInfo,
if !lfi.IsValid() {
for i := range errs {
if errs[i] == nil {
errs[i] = errCorruptedFormat
errs[i] = errFileCorrupt
}
}
return metaFileInfos, errs
@ -1519,7 +1519,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum)
if err != nil {
if errors.Is(err, errFileNotFound) {
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
// An in-quorum errFileNotFound means that client stream
// prematurely closed and we do not find any xl.meta or
// part.1's - in such a scenario we must return as if client
// disconnected. This means that erasure.Encode() CreateFile()
// did not do anything.
return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
}
return ObjectInfo{}, toObjectErr(err, bucket, object)
}

View File

@ -1237,6 +1237,7 @@ func testAPIPutObjectStreamSigV4Handler(obj ObjectLayer, instanceType, bucketNam
if err != nil {
t.Fatalf("Error injecting faults into the request: <ERROR> %v.", err)
}
// Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler.
// Call the ServeHTTP to execute the handler,`func (api objectAPIHandlers) GetObjectHandler` handles the request.
apiRouter.ServeHTTP(rec, req)

View File

@ -54,6 +54,9 @@ func isNetworkError(err error) bool {
if down := xnet.IsNetworkOrHostDown(nerr.Err, false); down {
return true
}
if errors.Is(nerr.Err, rest.ErrClientClosed) {
return true
}
}
if errors.Is(err, grid.ErrDisconnected) {
return true
@ -61,7 +64,7 @@ func isNetworkError(err error) bool {
// More corner cases suitable for storage REST API
switch {
// A peer node can be in shut down phase and proactively
// return 503 server closed error,consider it as an offline node
// return 503 server closed error, consider it as an offline node
case strings.Contains(err.Error(), http.ErrServerClosed.Error()):
return true
// Corner case, the server closed the connection with a keep-alive timeout

View File

@ -791,9 +791,26 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func
defer xioutil.SafeClose(doneCh)
// Initiate ticker after body has been read.
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// The done() might have been called
// concurrently, check for it before we
// write the filler byte.
select {
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
return
default:
}
// Response not ready, write a filler byte.
write([]byte{32})
if canWrite {
@ -806,7 +823,6 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func
} else {
write([]byte{0})
}
ticker.Stop()
return
}
}
@ -854,6 +870,21 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
for {
select {
case <-ticker.C:
// The done() might have been called
// concurrently, check for it before we
// write the filler byte.
select {
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
return
default:
}
// Response not ready, write a filler byte.
write([]byte{32})
if canWrite {

View File

@ -909,7 +909,8 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration, fn string)
})
if err == nil {
logger.Event(context.Background(), "node(%s): Read/Write/Delete successful, bringing drive %s online", globalLocalNodeName, p.storage.String())
logger.Event(context.Background(), "healthcheck",
"node(%s): Read/Write/Delete successful, bringing drive %s online", globalLocalNodeName, p.storage.String())
p.health.status.Store(diskHealthOK)
p.health.waiting.Add(-1)
return

View File

@ -2558,8 +2558,12 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
}
}
// Preserve all the legacy data, could be slow, but at max there can be 10,000 parts.
currentDataPath := pathJoin(dstVolumeDir, dstPath)
var xlMeta xlMetaV2
var legacyPreserved bool
var legacyEntries []string
if len(dstBuf) > 0 {
if isXL2V1Format(dstBuf) {
if err = xlMeta.Load(dstBuf); err != nil {
@ -2590,8 +2594,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// from `xl.json` to `xl.meta`, we can avoid
// one extra readdir operation here for all
// new deployments.
currentDataPath := pathJoin(dstVolumeDir, dstPath)
entries, err := readDirN(currentDataPath, 1)
entries, err := readDir(currentDataPath)
if err != nil && err != errFileNotFound {
return res, osErrToFileErr(err)
}
@ -2601,6 +2604,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
}
if strings.HasPrefix(entry, "part.") {
legacyPreserved = true
legacyEntries = entries
break
}
}
@ -2611,32 +2615,30 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
if formatLegacy {
legacyDataPath = pathJoin(dstVolumeDir, dstPath, legacyDataDir)
if legacyPreserved {
// Preserve all the legacy data, could be slow, but at max there can be 1res,000 parts.
currentDataPath := pathJoin(dstVolumeDir, dstPath)
entries, err := readDir(currentDataPath)
if err != nil {
return res, osErrToFileErr(err)
if contextCanceled(ctx) {
return res, ctx.Err()
}
// legacy data dir means its old content, honor system umask.
if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil {
// any failed mkdir-calls delete them.
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
return res, osErrToFileErr(err)
}
for _, entry := range entries {
// Skip xl.meta renames further, also ignore any directories such as `legacyDataDir`
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
continue
}
if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil {
// Any failed rename calls un-roll previous transaction.
if len(legacyEntries) > 0 {
// legacy data dir means its old content, honor system umask.
if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil {
// any failed mkdir-calls delete them.
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
return res, osErrToFileErr(err)
}
for _, entry := range legacyEntries {
// Skip xl.meta renames further, also ignore any directories such as `legacyDataDir`
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
continue
}
if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
return res, osErrToFileErr(err)
}
}
}
}
}
@ -2726,6 +2728,10 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
return res, errFileCorrupt
}
if contextCanceled(ctx) {
return res, ctx.Err()
}
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), newDstBuf); err != nil {
if legacyPreserved {
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
@ -2749,6 +2755,9 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// on a versioned bucket.
s.moveToTrash(legacyDataPath, true, false)
}
if contextCanceled(ctx) {
return res, ctx.Err()
}
if err = renameAll(srcDataPath, dstDataPath, skipParent); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
@ -2758,11 +2767,16 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
s.deleteFile(dstVolumeDir, dstDataPath, false, false)
return res, osErrToFileErr(err)
}
diskHealthCheckOK(ctx, err)
}
// If we have oldDataDir then we must preserve current xl.meta
// as backup, in-case needing renames().
if res.OldDataDir != "" {
if contextCanceled(ctx) {
return res, ctx.Err()
}
// preserve current xl.meta inside the oldDataDir.
if err = s.writeAll(ctx, dstVolume, pathJoin(dstPath, res.OldDataDir, xlStorageFormatFileBackup), dstBuf, true, skipParent); err != nil {
if legacyPreserved {
@ -2773,6 +2787,10 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
diskHealthCheckOK(ctx, err)
}
if contextCanceled(ctx) {
return res, ctx.Err()
}
// Commit meta-file
if err = renameAll(srcFilePath, dstFilePath, skipParent); err != nil {
if legacyPreserved {

View File

@ -286,12 +286,23 @@ func (c *Client) dumpHTTP(req *http.Request, resp *http.Response) {
return
}
// ErrClientClosed returned when *Client is closed.
var ErrClientClosed = errors.New("rest client is closed")
// Call - make a REST call with context.
func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) {
if !c.IsOnline() {
switch atomic.LoadInt32(&c.connected) {
case closed:
// client closed, this is usually a manual process
// so return a local error as client is closed
return nil, &NetworkError{Err: ErrClientClosed}
case offline:
// client offline, return last error captured.
return nil, &NetworkError{Err: c.LastError()}
}
// client is still connected, attempt the request.
// Shallow copy. We don't modify the *UserInfo, if set.
// All other fields are copied.
u := *c.url
@ -393,8 +404,6 @@ func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string)
clnt := &Client{
httpClient: &http.Client{Transport: tr},
url: u,
lastErr: err,
lastErrTime: time.Now(),
newAuthToken: newAuthToken,
connected: connected,
lastConn: time.Now().UnixNano(),
@ -402,6 +411,11 @@ func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string)
HealthCheckReconnectUnit: 200 * time.Millisecond,
HealthCheckTimeout: time.Second,
}
if err != nil {
clnt.lastErr = err
clnt.lastErrTime = time.Now()
}
if clnt.HealthCheckFn != nil {
// make connection pre-emptively.
go clnt.HealthCheckFn()
@ -468,7 +482,7 @@ func (c *Client) runHealthCheck() bool {
if atomic.CompareAndSwapInt32(&c.connected, offline, online) {
now := time.Now()
disconnected := now.Sub(c.LastConn())
logger.Event(context.Background(), "Client '%s' re-connected in %s", c.url.String(), disconnected)
logger.Event(context.Background(), "healthcheck", "Client '%s' re-connected in %s", c.url.String(), disconnected)
atomic.StoreInt64(&c.lastConn, now.UnixNano())
}
return