diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index f413138e4..1cc32d08d 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -153,6 +153,8 @@ func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultiple errFileVersionNotFound, io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors io.EOF, // some times we would read without locks, ignore these errors + context.DeadlineExceeded, + context.Canceled, } ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...) diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index dcdfee994..42c503a95 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -403,7 +403,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, origbucket, bu if fi.IsValid() { return disks[index].WriteMetadata(ctx, origbucket, bucket, prefix, fi) } - return errCorruptedFormat + return errFileCorrupt }, index) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index d888f8d92..536b0f3db 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -739,6 +739,15 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix) onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) if err != nil { + if errors.Is(err, errFileNotFound) { + // An in-quorum errFileNotFound means that client stream + // prematurely closed and we do not find any xl.meta or + // part.1's - in such a scenario we must return as if client + // disconnected. This means that erasure.Encode() CreateFile() + // did not do anything. + return pi, IncompleteBody{Bucket: bucket, Object: object} + } + return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } @@ -1314,11 +1323,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, bucket, object, writeQuorum) if err != nil { - return oi, toObjectErr(err, bucket, object) + return oi, toObjectErr(err, bucket, object, uploadID) } if err = er.commitRenameDataDir(ctx, bucket, object, oldDataDir, onlineDisks); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) + return ObjectInfo{}, toObjectErr(err, bucket, object, uploadID) } if !opts.Speedtest && len(versions) > 0 { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index ade16abd1..1195c03d5 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -562,7 +562,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVers, allParts bool) (FileInfo, error) { var xl xlMetaV2 if err := xl.LoadOrConvert(ri.Buf); err != nil { - return FileInfo{}, err + return FileInfo{}, errFileCorrupt } fi, err := xl.ToFileInfo(bucket, object, "", inclFreeVers, allParts) @@ -571,7 +571,7 @@ func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVe } if !fi.IsValid() { - return FileInfo{}, errCorruptedFormat + return FileInfo{}, errFileCorrupt } versionID := fi.VersionID @@ -661,7 +661,7 @@ func pickLatestQuorumFilesInfo(ctx context.Context, rawFileInfos []RawFileInfo, if !lfi.IsValid() { for i := range errs { if errs[i] == nil { - errs[i] = errCorruptedFormat + errs[i] = errFileCorrupt } } return metaFileInfos, errs @@ -1519,7 +1519,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum) if err != nil { if errors.Is(err, errFileNotFound) { - return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object) + // An in-quorum errFileNotFound means that client stream + // prematurely closed and we do not find any xl.meta or + // part.1's - in such a scenario we must return as if client + // disconnected. This means that erasure.Encode() CreateFile() + // did not do anything. + return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object} } return ObjectInfo{}, toObjectErr(err, bucket, object) } diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index 30529d58a..0925c38c8 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -1237,6 +1237,7 @@ func testAPIPutObjectStreamSigV4Handler(obj ObjectLayer, instanceType, bucketNam if err != nil { t.Fatalf("Error injecting faults into the request: %v.", err) } + // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler. // Call the ServeHTTP to execute the handler,`func (api objectAPIHandlers) GetObjectHandler` handles the request. apiRouter.ServeHTTP(rec, req) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 3cd4dec9e..294300774 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -54,6 +54,9 @@ func isNetworkError(err error) bool { if down := xnet.IsNetworkOrHostDown(nerr.Err, false); down { return true } + if errors.Is(nerr.Err, rest.ErrClientClosed) { + return true + } } if errors.Is(err, grid.ErrDisconnected) { return true @@ -61,7 +64,7 @@ func isNetworkError(err error) bool { // More corner cases suitable for storage REST API switch { // A peer node can be in shut down phase and proactively - // return 503 server closed error,consider it as an offline node + // return 503 server closed error, consider it as an offline node case strings.Contains(err.Error(), http.ErrServerClosed.Error()): return true // Corner case, the server closed the connection with a keep-alive timeout diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index ba829fc1b..a1387f5c0 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -791,9 +791,26 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func defer xioutil.SafeClose(doneCh) // Initiate ticker after body has been read. ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + for { select { case <-ticker.C: + // The done() might have been called + // concurrently, check for it before we + // write the filler byte. + select { + case err := <-doneCh: + if err != nil { + write([]byte{1}) + write([]byte(err.Error())) + } else { + write([]byte{0}) + } + return + default: + } + // Response not ready, write a filler byte. write([]byte{32}) if canWrite { @@ -806,7 +823,6 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func } else { write([]byte{0}) } - ticker.Stop() return } } @@ -854,6 +870,21 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) { for { select { case <-ticker.C: + // The done() might have been called + // concurrently, check for it before we + // write the filler byte. + select { + case err := <-doneCh: + if err != nil { + write([]byte{1}) + write([]byte(err.Error())) + } else { + write([]byte{0}) + } + return + default: + } + // Response not ready, write a filler byte. write([]byte{32}) if canWrite { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 0e9d7aab9..b987e5eb2 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -909,7 +909,8 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration, fn string) }) if err == nil { - logger.Event(context.Background(), "node(%s): Read/Write/Delete successful, bringing drive %s online", globalLocalNodeName, p.storage.String()) + logger.Event(context.Background(), "healthcheck", + "node(%s): Read/Write/Delete successful, bringing drive %s online", globalLocalNodeName, p.storage.String()) p.health.status.Store(diskHealthOK) p.health.waiting.Add(-1) return diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index ee9815856..1050a9e04 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -2558,8 +2558,12 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f } } + // Preserve all the legacy data, could be slow, but at max there can be 10,000 parts. + currentDataPath := pathJoin(dstVolumeDir, dstPath) + var xlMeta xlMetaV2 var legacyPreserved bool + var legacyEntries []string if len(dstBuf) > 0 { if isXL2V1Format(dstBuf) { if err = xlMeta.Load(dstBuf); err != nil { @@ -2590,8 +2594,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f // from `xl.json` to `xl.meta`, we can avoid // one extra readdir operation here for all // new deployments. - currentDataPath := pathJoin(dstVolumeDir, dstPath) - entries, err := readDirN(currentDataPath, 1) + entries, err := readDir(currentDataPath) if err != nil && err != errFileNotFound { return res, osErrToFileErr(err) } @@ -2601,6 +2604,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f } if strings.HasPrefix(entry, "part.") { legacyPreserved = true + legacyEntries = entries break } } @@ -2611,32 +2615,30 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f if formatLegacy { legacyDataPath = pathJoin(dstVolumeDir, dstPath, legacyDataDir) if legacyPreserved { - // Preserve all the legacy data, could be slow, but at max there can be 1res,000 parts. - currentDataPath := pathJoin(dstVolumeDir, dstPath) - entries, err := readDir(currentDataPath) - if err != nil { - return res, osErrToFileErr(err) + if contextCanceled(ctx) { + return res, ctx.Err() } - // legacy data dir means its old content, honor system umask. - if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil { - // any failed mkdir-calls delete them. - s.deleteFile(dstVolumeDir, legacyDataPath, true, false) - return res, osErrToFileErr(err) - } - - for _, entry := range entries { - // Skip xl.meta renames further, also ignore any directories such as `legacyDataDir` - if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) { - continue - } - - if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil { - // Any failed rename calls un-roll previous transaction. + if len(legacyEntries) > 0 { + // legacy data dir means its old content, honor system umask. + if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil { + // any failed mkdir-calls delete them. s.deleteFile(dstVolumeDir, legacyDataPath, true, false) - return res, osErrToFileErr(err) } + for _, entry := range legacyEntries { + // Skip xl.meta renames further, also ignore any directories such as `legacyDataDir` + if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) { + continue + } + + if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil { + // Any failed rename calls un-roll previous transaction. + s.deleteFile(dstVolumeDir, legacyDataPath, true, false) + + return res, osErrToFileErr(err) + } + } } } } @@ -2726,6 +2728,10 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f return res, errFileCorrupt } + if contextCanceled(ctx) { + return res, ctx.Err() + } + if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), newDstBuf); err != nil { if legacyPreserved { s.deleteFile(dstVolumeDir, legacyDataPath, true, false) @@ -2749,6 +2755,9 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f // on a versioned bucket. s.moveToTrash(legacyDataPath, true, false) } + if contextCanceled(ctx) { + return res, ctx.Err() + } if err = renameAll(srcDataPath, dstDataPath, skipParent); err != nil { if legacyPreserved { // Any failed rename calls un-roll previous transaction. @@ -2758,11 +2767,16 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f s.deleteFile(dstVolumeDir, dstDataPath, false, false) return res, osErrToFileErr(err) } + diskHealthCheckOK(ctx, err) } // If we have oldDataDir then we must preserve current xl.meta // as backup, in-case needing renames(). if res.OldDataDir != "" { + if contextCanceled(ctx) { + return res, ctx.Err() + } + // preserve current xl.meta inside the oldDataDir. if err = s.writeAll(ctx, dstVolume, pathJoin(dstPath, res.OldDataDir, xlStorageFormatFileBackup), dstBuf, true, skipParent); err != nil { if legacyPreserved { @@ -2773,6 +2787,10 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f diskHealthCheckOK(ctx, err) } + if contextCanceled(ctx) { + return res, ctx.Err() + } + // Commit meta-file if err = renameAll(srcFilePath, dstFilePath, skipParent); err != nil { if legacyPreserved { diff --git a/internal/rest/client.go b/internal/rest/client.go index b143ce673..710d1ffe0 100644 --- a/internal/rest/client.go +++ b/internal/rest/client.go @@ -286,12 +286,23 @@ func (c *Client) dumpHTTP(req *http.Request, resp *http.Response) { return } +// ErrClientClosed returned when *Client is closed. +var ErrClientClosed = errors.New("rest client is closed") + // Call - make a REST call with context. func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { - if !c.IsOnline() { + switch atomic.LoadInt32(&c.connected) { + case closed: + // client closed, this is usually a manual process + // so return a local error as client is closed + return nil, &NetworkError{Err: ErrClientClosed} + case offline: + // client offline, return last error captured. return nil, &NetworkError{Err: c.LastError()} } + // client is still connected, attempt the request. + // Shallow copy. We don't modify the *UserInfo, if set. // All other fields are copied. u := *c.url @@ -393,8 +404,6 @@ func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string) clnt := &Client{ httpClient: &http.Client{Transport: tr}, url: u, - lastErr: err, - lastErrTime: time.Now(), newAuthToken: newAuthToken, connected: connected, lastConn: time.Now().UnixNano(), @@ -402,6 +411,11 @@ func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string) HealthCheckReconnectUnit: 200 * time.Millisecond, HealthCheckTimeout: time.Second, } + if err != nil { + clnt.lastErr = err + clnt.lastErrTime = time.Now() + } + if clnt.HealthCheckFn != nil { // make connection pre-emptively. go clnt.HealthCheckFn() @@ -468,7 +482,7 @@ func (c *Client) runHealthCheck() bool { if atomic.CompareAndSwapInt32(&c.connected, offline, online) { now := time.Now() disconnected := now.Sub(c.LastConn()) - logger.Event(context.Background(), "Client '%s' re-connected in %s", c.url.String(), disconnected) + logger.Event(context.Background(), "healthcheck", "Client '%s' re-connected in %s", c.url.String(), disconnected) atomic.StoreInt64(&c.lastConn, now.UnixNano()) } return