From 038fdeea8334753bae0952272ea697c484bd86f4 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 6 Dec 2021 09:45:23 -0800 Subject: [PATCH] snowball: return errors on failures (#13836) Return errors when untar fails at once. Current error handling was quite a mess. Errors are written to the stream, but processing continues. Instead, return errors when they occur and transform internal errors to bad request errors, since it is likely a problem with the input. Fixes #13832 --- cmd/object-handlers.go | 62 +++++++++++++++++++++++------------------- cmd/untar.go | 19 ++++++++++--- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 11d093eba..6fc5a9560 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2021,7 +2021,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h getObjectInfo = api.CacheAPI().GetObjectInfo } - putObjectTar := func(reader io.Reader, info os.FileInfo, object string) { + putObjectTar := func(reader io.Reader, info os.FileInfo, object string) error { size := info.Size() metadata := map[string]string{ xhttp.AmzStorageClass: sc, @@ -2035,8 +2035,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h actualReader, err := hash.NewReader(reader, size, "", "", actualSize) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + return err } // Set compression metrics. @@ -2048,8 +2047,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h hashReader, err := hash.NewReader(reader, size, "", "", actualSize) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + return err } rawReader := hashReader @@ -2057,8 +2055,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { if s3Err = isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.ReplicateObjectAction); s3Err != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) - return + return err } metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String() metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano) @@ -2067,24 +2064,23 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h // get encryption options opts, err := putOpts(ctx, r, bucket, object, metadata) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + return err } opts.MTime = info.ModTime() - retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms) - if s3Err == ErrNone && retentionMode.Valid() { + retentionMode, retentionDate, legalHold, s3err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms) + if s3err == ErrNone && retentionMode.Valid() { metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode) metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat) } - if s3Err == ErrNone && legalHold.Status.Valid() { + if s3err == ErrNone && legalHold.Status.Valid() { metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status) } - if s3Err != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) - return + if s3err != ErrNone { + s3Err = s3err + return ObjectLocked{} } if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ @@ -2099,14 +2095,12 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h if objectAPI.IsEncryptionSupported() { if _, ok := crypto.IsRequested(r.Header); ok && !HasSuffix(object, SlashSeparator) { // handle SSE requests if crypto.SSECopy.IsRequested(r.Header) { - writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL) - return + return errInvalidEncryptionParameters } reader, objectEncryptionKey, err = EncryptRequest(hashReader, r, bucket, object, metadata) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + return err } wantSize := int64(-1) @@ -2118,14 +2112,12 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h // do not try to verify encrypted content hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + return err } pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + return err } } } @@ -2136,20 +2128,34 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h // Create the object.. objInfo, err := putObject(ctx, bucket, object, pReader, opts) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + return err } if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ UserDefined: metadata, }, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) - } - + return nil } - untar(hreader, putObjectTar) + if err = untar(hreader, putObjectTar); err != nil { + apiErr := errorCodes.ToAPIErr(s3Err) + // If not set, convert or use BadRequest + if s3Err == ErrNone { + apiErr = toAPIError(ctx, err) + if apiErr.Code == "InternalError" { + // Convert generic internal errors to bad requests. + apiErr = APIError{ + Code: "BadRequest", + Description: err.Error(), + HTTPStatusCode: http.StatusBadRequest, + } + } + } + writeErrorResponse(ctx, w, apiErr, r.URL) + return + } w.Header()[xhttp.ETag] = []string{`"` + hex.EncodeToString(hreader.MD5Current()) + `"`} writeSuccessResponseHeadersOnly(w) diff --git a/cmd/untar.go b/cmd/untar.go index 6492feffb..dd489fb8b 100644 --- a/cmd/untar.go +++ b/cmd/untar.go @@ -97,7 +97,7 @@ var magicHeaders = []struct { }, } -func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string)) error { +func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string) error) error { bf := bufio.NewReader(r) switch f := detect(bf); f { case formatGzip: @@ -130,6 +130,7 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name return fmt.Errorf("Unsupported format %s", f) } tarReader := tar.NewReader(r) + n := 0 for { header, err := tarReader.Next() @@ -141,7 +142,11 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name // return any other error case err != nil: - return err + extra := "" + if n > 0 { + extra = fmt.Sprintf(" after %d successful object(s)", n) + } + return fmt.Errorf("tar file error: %w%s", err, extra) // if the header is nil, just skip it (not sure how this happens) case header == nil: @@ -155,9 +160,15 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name switch header.Typeflag { case tar.TypeDir: // = directory - putObject(tarReader, header.FileInfo(), trimLeadingSlash(pathJoin(name, slashSeparator))) + if err := putObject(tarReader, header.FileInfo(), trimLeadingSlash(pathJoin(name, slashSeparator))); err != nil { + return err + } + n++ case tar.TypeReg, tar.TypeChar, tar.TypeBlock, tar.TypeFifo, tar.TypeGNUSparse: // = regular - putObject(tarReader, header.FileInfo(), trimLeadingSlash(path.Clean(name))) + if err := putObject(tarReader, header.FileInfo(), trimLeadingSlash(path.Clean(name))); err != nil { + return err + } + n++ default: // ignore symlink'ed continue