From e6d740ce09b3020d2c7debfbea445c21b0acb86c Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Mon, 27 Aug 2018 02:58:23 -0700 Subject: [PATCH] Implement GetObjectNInfo object layer call (#6290) This combines calling GetObjectInfo and GetObject while returning a io.ReadCloser for the object's body. This allows the two operations to be under a single lock, fixing a race between getting object info and reading the object body. --- cmd/api-headers.go | 8 +- cmd/disk-cache.go | 71 ++++++++ cmd/dummy-object-layer_test.go | 4 + cmd/encryption-v1.go | 266 +++++++++++++++++++++++++++++ cmd/fs-v1.go | 91 ++++++++++ cmd/gateway/azure/gateway-azure.go | 21 +++ cmd/gateway/b2/gateway-b2.go | 21 +++ cmd/gateway/gcs/gateway-gcs.go | 21 +++ cmd/gateway/manta/gateway-manta.go | 21 +++ cmd/gateway/oss/gateway-oss.go | 23 ++- cmd/gateway/s3/gateway-s3.go | 24 +++ cmd/gateway/sia/gateway-sia.go | 21 +++ cmd/httprange.go | 105 ++++++++++++ cmd/object-api-interface.go | 3 + cmd/object-api-utils.go | 55 ++++++ cmd/object-handlers.go | 156 ++++++++++------- cmd/xl-sets.go | 5 + cmd/xl-v1-object.go | 48 ++++++ pkg/ioutil/ioutil.go | 31 ++++ 19 files changed, 926 insertions(+), 69 deletions(-) diff --git a/cmd/api-headers.go b/cmd/api-headers.go index 1ed996d86..c382a2fd7 100644 --- a/cmd/api-headers.go +++ b/cmd/api-headers.go @@ -61,7 +61,7 @@ func encodeResponseJSON(response interface{}) []byte { } // Write object header -func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, contentRange *httpRange) { +func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSpec) { // set common headers setCommonHeaders(w) @@ -96,9 +96,9 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, contentRange *h } // for providing ranged content - if contentRange != nil && contentRange.offsetBegin > -1 { + if rs != nil { // Override content-length - w.Header().Set("Content-Length", strconv.FormatInt(contentRange.getLength(), 10)) - w.Header().Set("Content-Range", contentRange.String()) + w.Header().Set("Content-Length", strconv.FormatInt(rs.GetLength(objInfo.Size), 10)) + w.Header().Set("Content-Range", rs.ContentRangeString(objInfo.Size)) } } diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index f44ef7375..650f487bf 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -57,6 +57,7 @@ type cacheObjects struct { // file path patterns to exclude from cache exclude []string // Object functions pointing to the corresponding functions of backend implementation. + GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) GetObjectInfoFn func(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) PutObjectFn func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) @@ -88,6 +89,7 @@ type CacheObjectLayer interface { ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) DeleteBucket(ctx context.Context, bucket string) error // Object operations. + GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) @@ -175,6 +177,75 @@ func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string { return metadata } +func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, r io.ReadCloser, err error) { + + bkObjInfo, bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs) + + if c.isCacheExclude(bucket, object) { + return bkObjInfo, bkReader, bkErr + } + + // fetch cacheFSObjects if object is currently cached or nearest available cache drive + dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object) + if err != nil { + return bkObjInfo, bkReader, bkErr + } + + backendDown := backendDownError(bkErr) + if bkErr != nil && !backendDown { + if _, ok := err.(ObjectNotFound); ok { + // Delete the cached entry if backend object was deleted. + dcache.Delete(ctx, bucket, object) + } + return oi, r, bkErr + } + + if !backendDown && filterFromCache(bkObjInfo.UserDefined) { + return bkObjInfo, bkReader, bkErr + } + + cacheObjInfo, cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs) + if cacheErr == nil { + if backendDown { + // If the backend is down, serve the request from cache. + return cacheObjInfo, cacheReader, nil + } + if cacheObjInfo.ETag == bkObjInfo.ETag && !isStaleCache(bkObjInfo) { + return cacheObjInfo, cacheReader, nil + } + dcache.Delete(ctx, bucket, object) + } + + if rs != nil { + // We don't cache partial objects. + return bkObjInfo, bkReader, bkErr + } + if !dcache.diskAvailable(bkObjInfo.Size * cacheSizeMultiplier) { + // cache only objects < 1/100th of disk capacity + return bkObjInfo, bkReader, bkErr + } + + // Initialize pipe. + pipeReader, pipeWriter := io.Pipe() + teeReader := io.TeeReader(bkReader, pipeWriter) + hashReader, herr := hash.NewReader(pipeReader, bkObjInfo.Size, "", "") + if err != nil { + return oi, r, herr + } + + cleanupBackend := func() { bkReader.Close() } + getObjReader := NewGetObjectReader(teeReader, nil, cleanupBackend) + + go func() { + putErr := dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(bkObjInfo)) + // close the write end of the pipe, so the error gets + // propagated to getObjReader + pipeWriter.CloseWithError(putErr) + }() + + return bkObjInfo, getObjReader, nil +} + // Uses cached-object to serve the request. If object is not cached it serves the request from the backend and also // stores it in the cache for serving subsequent requests. func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { diff --git a/cmd/dummy-object-layer_test.go b/cmd/dummy-object-layer_test.go index 0c0a512da..192866cff 100644 --- a/cmd/dummy-object-layer_test.go +++ b/cmd/dummy-object-layer_test.go @@ -59,6 +59,10 @@ func (api *DummyObjectLayer) ListObjectsV2(ctx context.Context, bucket, prefix, return } +func (api *DummyObjectLayer) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { + return +} + func (api *DummyObjectLayer) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { return } diff --git a/cmd/encryption-v1.go b/cmd/encryption-v1.go index f822d6d50..94223dc6a 100644 --- a/cmd/encryption-v1.go +++ b/cmd/encryption-v1.go @@ -312,6 +312,155 @@ func newDecryptWriterWithObjectKey(client io.Writer, objectEncryptionKey []byte, return writer, nil } +// Adding support for reader based interface + +// DecryptRequestWithSequenceNumberR - same as +// DecryptRequestWithSequenceNumber but with a reader +func DecryptRequestWithSequenceNumberR(client io.Reader, r *http.Request, bucket, object string, seqNumber uint32, metadata map[string]string) (io.Reader, error) { + if crypto.S3.IsEncrypted(metadata) { + return newDecryptReader(client, nil, bucket, object, seqNumber, metadata) + } + + key, err := ParseSSECustomerRequest(r) + if err != nil { + return nil, err + } + delete(metadata, crypto.SSECKey) // make sure we do not save the key by accident + return newDecryptReader(client, key, bucket, object, seqNumber, metadata) +} + +// DecryptCopyRequestR - same as DecryptCopyRequest, but with a +// Reader +func DecryptCopyRequestR(client io.Reader, r *http.Request, bucket, object string, metadata map[string]string) (io.Reader, error) { + var ( + key []byte + err error + ) + if crypto.SSECopy.IsRequested(r.Header) { + key, err = ParseSSECopyCustomerRequest(r, metadata) + if err != nil { + return nil, err + } + } + delete(metadata, crypto.SSECopyKey) // make sure we do not save the key by accident + return newDecryptReader(client, key, bucket, object, 0, metadata) +} + +func newDecryptReader(client io.Reader, key []byte, bucket, object string, seqNumber uint32, metadata map[string]string) (io.Reader, error) { + objectEncryptionKey, err := decryptObjectInfo(key, bucket, object, metadata) + if err != nil { + return nil, err + } + return newDecryptReaderWithObjectKey(client, objectEncryptionKey, seqNumber, metadata) +} + +func newDecryptReaderWithObjectKey(client io.Reader, objectEncryptionKey []byte, seqNumber uint32, metadata map[string]string) (io.Reader, error) { + reader, err := sio.DecryptReader(client, sio.Config{ + Key: objectEncryptionKey, + SequenceNumber: seqNumber, + }) + if err != nil { + return nil, crypto.ErrInvalidCustomerKey + } + delete(metadata, crypto.SSEIV) + delete(metadata, crypto.SSESealAlgorithm) + delete(metadata, crypto.SSECSealedKey) + delete(metadata, crypto.SSEMultipart) + delete(metadata, crypto.S3SealedKey) + delete(metadata, crypto.S3KMSSealedKey) + delete(metadata, crypto.S3KMSKeyID) + return reader, nil +} + +// DecryptBlocksRequestR - same as DecryptBlocksRequest but with a +// reader +func DecryptBlocksRequestR(client io.Reader, r *http.Request, bucket, object string, startOffset, length int64, objInfo ObjectInfo, copySource bool) (io.Reader, int64, int64, error) { + var seqNumber uint32 + var encStartOffset, encLength int64 + + if len(objInfo.Parts) == 0 || !crypto.IsMultiPart(objInfo.UserDefined) { + seqNumber, encStartOffset, encLength = getEncryptedSinglePartOffsetLength(startOffset, length, objInfo) + + var reader io.Reader + var err error + if copySource { + reader, err = DecryptCopyRequestR(client, r, bucket, object, objInfo.UserDefined) + } else { + reader, err = DecryptRequestWithSequenceNumberR(client, r, bucket, object, seqNumber, objInfo.UserDefined) + } + if err != nil { + return nil, 0, 0, err + } + return reader, encStartOffset, encLength, nil + } + + seqNumber, encStartOffset, encLength = getEncryptedMultipartsOffsetLength(startOffset, length, objInfo) + var partStartIndex int + var partStartOffset = startOffset + // Skip parts until final offset maps to a particular part offset. + for i, part := range objInfo.Parts { + decryptedSize, err := sio.DecryptedSize(uint64(part.Size)) + if err != nil { + return nil, -1, -1, errObjectTampered + } + + partStartIndex = i + + // Offset is smaller than size we have reached the + // proper part offset, break out we start from + // this part index. + if partStartOffset < int64(decryptedSize) { + break + } + + // Continue to look for next part. + partStartOffset -= int64(decryptedSize) + } + + startSeqNum := partStartOffset / sseDAREPackageBlockSize + partEncRelOffset := int64(startSeqNum) * (sseDAREPackageBlockSize + sseDAREPackageMetaSize) + + w := &DecryptBlocksReader{ + reader: client, + startSeqNum: uint32(startSeqNum), + partEncRelOffset: partEncRelOffset, + parts: objInfo.Parts, + partIndex: partStartIndex, + req: r, + bucket: bucket, + object: object, + customerKeyHeader: r.Header.Get(crypto.SSECKey), + copySource: copySource, + } + + w.metadata = map[string]string{} + // Copy encryption metadata for internal use. + for k, v := range objInfo.UserDefined { + w.metadata[k] = v + } + + // Purge all the encryption headers. + delete(objInfo.UserDefined, crypto.SSEIV) + delete(objInfo.UserDefined, crypto.SSESealAlgorithm) + delete(objInfo.UserDefined, crypto.SSECSealedKey) + delete(objInfo.UserDefined, crypto.SSEMultipart) + + if crypto.S3.IsEncrypted(objInfo.UserDefined) { + delete(objInfo.UserDefined, crypto.S3SealedKey) + delete(objInfo.UserDefined, crypto.S3KMSKeyID) + delete(objInfo.UserDefined, crypto.S3KMSSealedKey) + } + if w.copySource { + w.customerKeyHeader = r.Header.Get(crypto.SSECopyKey) + } + + if err := w.buildDecrypter(w.parts[w.partIndex].Number); err != nil { + return nil, 0, 0, err + } + + return w, encStartOffset, encLength, nil +} + // DecryptRequestWithSequenceNumber decrypts the object with the client provided key. It also removes // the client-side-encryption metadata from the object and sets the correct headers. func DecryptRequestWithSequenceNumber(client io.Writer, r *http.Request, bucket, object string, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) { @@ -333,6 +482,123 @@ func DecryptRequest(client io.Writer, r *http.Request, bucket, object string, me return DecryptRequestWithSequenceNumber(client, r, bucket, object, 0, metadata) } +// DecryptBlocksReader - decrypts multipart parts, while implementing +// a io.Reader compatible interface. +type DecryptBlocksReader struct { + // Source of the encrypted content that will be decrypted + reader io.Reader + // Current decrypter for the current encrypted data block + decrypter io.Reader + // Start sequence number + startSeqNum uint32 + // Current part index + partIndex int + // Parts information + parts []objectPartInfo + req *http.Request + bucket, object string + metadata map[string]string + + partEncRelOffset int64 + + copySource bool + // Customer Key + customerKeyHeader string +} + +func (d *DecryptBlocksReader) buildDecrypter(partID int) error { + m := make(map[string]string) + for k, v := range d.metadata { + m[k] = v + } + // Initialize the first decrypter; new decrypters will be + // initialized in Read() operation as needed. + var key []byte + var err error + if d.copySource { + if crypto.SSEC.IsEncrypted(d.metadata) { + d.req.Header.Set(crypto.SSECopyKey, d.customerKeyHeader) + key, err = ParseSSECopyCustomerRequest(d.req, d.metadata) + } + } else { + if crypto.SSEC.IsEncrypted(d.metadata) { + d.req.Header.Set(crypto.SSECKey, d.customerKeyHeader) + key, err = ParseSSECustomerRequest(d.req) + } + } + if err != nil { + return err + } + + objectEncryptionKey, err := decryptObjectInfo(key, d.bucket, d.object, m) + if err != nil { + return err + } + + var partIDbin [4]byte + binary.LittleEndian.PutUint32(partIDbin[:], uint32(partID)) // marshal part ID + + mac := hmac.New(sha256.New, objectEncryptionKey) // derive part encryption key from part ID and object key + mac.Write(partIDbin[:]) + partEncryptionKey := mac.Sum(nil) + + // make sure we do not save the key by accident + if d.copySource { + delete(m, crypto.SSECopyKey) + } else { + delete(m, crypto.SSECKey) + } + + // make sure to provide a NopCloser such that a Close + // on sio.decryptWriter doesn't close the underlying writer's + // close which perhaps can close the stream prematurely. + decrypter, err := newDecryptReaderWithObjectKey(d.reader, partEncryptionKey, d.startSeqNum, m) + if err != nil { + return err + } + + d.decrypter = decrypter + return nil +} + +func (d *DecryptBlocksReader) Read(p []byte) (int, error) { + var err error + var n1 int + if int64(len(p)) < d.parts[d.partIndex].Size-d.partEncRelOffset { + n1, err = d.decrypter.Read(p) + if err != nil { + return 0, err + } + d.partEncRelOffset += int64(n1) + } else { + n1, err = d.decrypter.Read(p[:d.parts[d.partIndex].Size-d.partEncRelOffset]) + if err != nil { + return 0, err + } + + // We should now proceed to next part, reset all + // values appropriately. + d.partEncRelOffset = 0 + d.startSeqNum = 0 + + d.partIndex++ + + err = d.buildDecrypter(d.partIndex + 1) + if err != nil { + return 0, err + } + + n1, err = d.decrypter.Read(p[n1:]) + if err != nil { + return 0, err + } + + d.partEncRelOffset += int64(n1) + } + + return len(p), nil +} + // DecryptBlocksWriter - decrypts multipart parts, while implementing a io.Writer compatible interface. type DecryptBlocksWriter struct { // Original writer where the plain data will be written diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index d7ea10b59..fb25aec04 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "encoding/hex" "io" @@ -498,6 +499,96 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu return objInfo, nil } +// GetObjectNInfo - returns object info and a reader for object +// content. +func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { + + if err = checkGetObjArgs(ctx, bucket, object); err != nil { + return objInfo, reader, err + } + + if _, err = fs.statBucketDir(ctx, bucket); err != nil { + return objInfo, reader, toObjectErr(err, bucket) + } + + // Lock the object before reading. + lock := fs.nsMutex.NewNSLock(bucket, object) + if err = lock.GetRLock(globalObjectTimeout); err != nil { + logger.LogIf(ctx, err) + return objInfo, reader, err + } + + // For a directory, we need to send an empty body. + if hasSuffix(object, slashSeparator) { + // The lock taken above is released when + // objReader.Close() is called by the caller. + objReader := NewGetObjectReader(bytes.NewBuffer(nil), lock, nil) + return objInfo, objReader, nil + } + + // Otherwise we get the object info + objInfo, err = fs.getObjectInfo(ctx, bucket, object) + err = toObjectErr(err, bucket, object) + if err != nil { + lock.RUnlock() + return objInfo, nil, err + } + + // Take a rwPool lock for NFS gateway type deployment + var cleanUp func() + if bucket != minioMetaBucket { + fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) + _, err = fs.rwPool.Open(fsMetaPath) + if err != nil && err != errFileNotFound { + logger.LogIf(ctx, err) + lock.RUnlock() + return objInfo, nil, toObjectErr(err, bucket, object) + } + cleanUp = func() { + // Need to clean up lock after getObject is + // completed. + fs.rwPool.Close(fsMetaPath) + } + } + + offset, length := int64(0), objInfo.Size + if rs != nil { + offset, length = rs.GetOffsetLength(objInfo.Size) + } + + // Read the object, doesn't exist returns an s3 compatible error. + fsObjPath := pathJoin(fs.fsPath, bucket, object) + reader, size, err := fsOpenFile(ctx, fsObjPath, offset) + if err != nil { + lock.RUnlock() + cleanUp() + return objInfo, nil, toObjectErr(err, bucket, object) + } + + bufSize := int64(readSizeV1) + if length > 0 && bufSize > length { + bufSize = length + } + + // For negative length we read everything. + if length < 0 { + length = size - offset + } + + // Reply back invalid range if the input offset and length + // fall out of range. + if offset > size || offset+length > size { + err = InvalidRange{offset, length, size} + logger.LogIf(ctx, err) + lock.RUnlock() + cleanUp() + return objInfo, nil, err + } + + objReader := NewGetObjectReader(io.LimitReader(reader, length), lock, cleanUp) + return objInfo, objReader, nil +} + // GetObject - reads an object from the disk. // Supports additional parameters like offset and length // which are synonymous with HTTP Range requests. diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index c52c11699..8f12ee669 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -615,6 +615,27 @@ func (a *azureObjects) ListObjectsV2(ctx context.Context, bucket, prefix, contin return result, nil } +func (a *azureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { + objInfo, err = a.GetObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, reader, err + } + + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader := minio.NewGetObjectReader(pr, nil, nil) + go func() { + err := a.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + // GetObject - reads an object from azure. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/b2/gateway-b2.go b/cmd/gateway/b2/gateway-b2.go index 4c3366f99..ecd0d7455 100644 --- a/cmd/gateway/b2/gateway-b2.go +++ b/cmd/gateway/b2/gateway-b2.go @@ -394,6 +394,27 @@ func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat return loi, nil } +func (l *b2Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { + objInfo, err = l.GetObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, reader, err + } + + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader := minio.NewGetObjectReader(pr, nil, nil) + go func() { + err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + // GetObject reads an object from B2. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index cd1135a2b..23924c337 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -732,6 +732,27 @@ func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continua }, nil } +func (l *gcsGateway) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { + objInfo, err = l.GetObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, reader, err + } + + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader := minio.NewGetObjectReader(pr, nil, nil) + go func() { + err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + // GetObject - reads an object from GCS. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/manta/gateway-manta.go b/cmd/gateway/manta/gateway-manta.go index 16f51f0a6..d0fd21d62 100644 --- a/cmd/gateway/manta/gateway-manta.go +++ b/cmd/gateway/manta/gateway-manta.go @@ -497,6 +497,27 @@ func (t *tritonObjects) ListObjectsV2(ctx context.Context, bucket, prefix, conti return result, nil } +func (t *tritonObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { + objInfo, err = t.GetObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, reader, err + } + + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader := minio.NewGetObjectReader(pr, nil, nil) + go func() { + err := t.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + // GetObject - Reads an object from Manta. Supports additional parameters like // offset and length which are synonymous with HTTP Range requests. // diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index 1423d6b7a..a8c7d5b17 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -69,7 +69,7 @@ ENVIRONMENT VARIABLES: DOMAIN: MINIO_DOMAIN: To enable virtual-host-style requests, set this value to Minio host domain name. - + CACHE: MINIO_CACHE_DRIVES: List of mounted drives or directories delimited by ";". MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by ";". @@ -547,6 +547,27 @@ func ossGetObject(ctx context.Context, client *oss.Client, bucket, key string, s return nil } +func (l *ossObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { + objInfo, err = l.GetObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, reader, err + } + + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader := minio.NewGetObjectReader(pr, nil, nil) + go func() { + err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + // GetObject reads an object on OSS. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 851e3a07d..3569b0266 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -301,6 +301,27 @@ func (l *s3Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat return minio.FromMinioClientListBucketV2Result(bucket, result), nil } +func (l *s3Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { + objInfo, err = l.GetObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, reader, err + } + + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader := minio.NewGetObjectReader(pr, nil, nil) + go func() { + err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + // GetObject reads an object from S3. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. @@ -313,6 +334,9 @@ func (l *s3Objects) GetObject(ctx context.Context, bucket string, key string, st } opts := miniogo.GetObjectOptions{} + if etag != "" { + opts.SetMatchETag(etag) + } if startOffset >= 0 && length >= 0 { if err := opts.SetRange(startOffset, startOffset+length-1); err != nil { logger.LogIf(ctx, err) diff --git a/cmd/gateway/sia/gateway-sia.go b/cmd/gateway/sia/gateway-sia.go index f2d2f49dd..33bb84634 100644 --- a/cmd/gateway/sia/gateway-sia.go +++ b/cmd/gateway/sia/gateway-sia.go @@ -431,6 +431,27 @@ func (s *siaObjects) ListObjects(ctx context.Context, bucket string, prefix stri return loi, nil } +func (s *siaObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { + objInfo, err = s.GetObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, reader, err + } + + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader := minio.NewGetObjectReader(pr, nil, nil) + go func() { + err := s.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + func (s *siaObjects) GetObject(ctx context.Context, bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error { dstFile := path.Join(s.TempDir, minio.MustGetUUID()) defer os.Remove(dstFile) diff --git a/cmd/httprange.go b/cmd/httprange.go index bed49eba8..77eaaad76 100644 --- a/cmd/httprange.go +++ b/cmd/httprange.go @@ -133,3 +133,108 @@ func parseRequestRange(rangeString string, resourceSize int64) (hrange *httpRang return &httpRange{offsetBegin, offsetEnd, resourceSize}, nil } + +// HTTPRangeSpec represents a range specification as supported by S3 GET +// object request. +// +// Case 1: Not present -> represented by a nil RangeSpec +// Case 2: bytes=1-10 (absolute start and end offsets) -> RangeSpec{false, 1, 10} +// Case 3: bytes=10- (absolute start offset with end offset unspecified) -> RangeSpec{false, 10, -1} +// Case 4: bytes=-30 (suffix length specification) -> RangeSpec{true, -30, -1} +type HTTPRangeSpec struct { + // Does the range spec refer to a suffix of the object? + IsSuffixLength bool + + // Start and end offset specified in range spec + Start, End int64 +} + +// ContentRangeString populate range stringer interface +func (h *HTTPRangeSpec) ContentRangeString(resourceSize int64) string { + start, rangeLength := h.GetOffsetLength(resourceSize) + return fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLength-1, resourceSize) +} + +// GetLength - get length of range +func (h *HTTPRangeSpec) GetLength(resourceSize int64) int64 { + switch { + case h.IsSuffixLength: + specifiedLen := -h.Start + if specifiedLen > resourceSize { + specifiedLen = resourceSize + } + return specifiedLen + case h.End > -1: + end := h.End + if resourceSize < end { + end = resourceSize - 1 + } + return end - h.Start + 1 + default: + return resourceSize - h.Start + } +} + +// GetOffsetLength computes the start offset and length of the range +// given the size of the resource +func (h *HTTPRangeSpec) GetOffsetLength(resourceSize int64) (start int64, length int64) { + length = h.GetLength(resourceSize) + start = h.Start + if h.IsSuffixLength { + start = resourceSize + h.Start + } + return +} + +// Parses a range header value into a HTTPRangeSpec +func parseRequestRangeSpec(rangeString string) (hrange *HTTPRangeSpec, err error) { + // Return error if given range string doesn't start with byte range prefix. + if !strings.HasPrefix(rangeString, byteRangePrefix) { + return nil, fmt.Errorf("'%s' does not start with '%s'", rangeString, byteRangePrefix) + } + + // Trim byte range prefix. + byteRangeString := strings.TrimPrefix(rangeString, byteRangePrefix) + + // Check if range string contains delimiter '-', else return error. eg. "bytes=8" + sepIndex := strings.Index(byteRangeString, "-") + if sepIndex == -1 { + return nil, fmt.Errorf("'%s' does not have a valid range value", rangeString) + } + + offsetBeginString := byteRangeString[:sepIndex] + offsetBegin := int64(-1) + // Convert offsetBeginString only if its not empty. + if len(offsetBeginString) > 0 { + if offsetBegin, err = strconv.ParseInt(offsetBeginString, 10, 64); err != nil { + return nil, fmt.Errorf("'%s' does not have a valid first byte position value", rangeString) + } + } + + offsetEndString := byteRangeString[sepIndex+1:] + offsetEnd := int64(-1) + // Convert offsetEndString only if its not empty. + if len(offsetEndString) > 0 { + if offsetEnd, err = strconv.ParseInt(offsetEndString, 10, 64); err != nil { + return nil, fmt.Errorf("'%s' does not have a valid last byte position value", rangeString) + } + } + + switch { + case offsetBegin > -1 && offsetEnd > -1: + if offsetBegin > offsetEnd { + return nil, errInvalidRange + } + return &HTTPRangeSpec{false, offsetBegin, offsetEnd}, nil + case offsetBegin > -1: + return &HTTPRangeSpec{false, offsetBegin, -1}, nil + case offsetEnd > -1: + if offsetEnd == 0 { + return nil, errInvalidRange + } + return &HTTPRangeSpec{true, -offsetEnd, -1}, nil + default: + // rangeString contains first and last byte positions missing. eg. "bytes=-" + return nil, fmt.Errorf("'%s' does not have valid range value", rangeString) + } +} diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 066da89e1..2808309e0 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -40,6 +40,9 @@ type ObjectLayer interface { ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) // Object operations. + + GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) + GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 972868a2b..8e24edbc2 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -20,10 +20,12 @@ import ( "context" "encoding/hex" "fmt" + "io" "math/rand" "path" "runtime" "strings" + "sync" "time" "unicode/utf8" @@ -302,3 +304,56 @@ type byBucketName []BucketInfo func (d byBucketName) Len() int { return len(d) } func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name } + +// GetObjectReader is a type that wraps a reader with a lock to +// provide a ReadCloser interface that unlocks on Close() +type GetObjectReader struct { + lock RWLocker + pr io.Reader + + // register any clean up actions (happens before unlocking) + cleanUp func() + + once sync.Once +} + +// NewGetObjectReader creates a new GetObjectReader. The cleanUp +// action is called on Close() before the lock is unlocked. +func NewGetObjectReader(reader io.Reader, lock RWLocker, cleanUp func()) io.ReadCloser { + return &GetObjectReader{ + lock: lock, + pr: reader, + cleanUp: cleanUp, + } +} + +// Close - calls the cleanup action if provided, and *then* unlocks +// the object. Calling Close multiple times is safe. +func (g *GetObjectReader) Close() error { + // sync.Once is used here to ensure that Close() is + // idempotent. + g.once.Do(func() { + // Unlocking is defer-red - this ensures that + // unlocking happens even if cleanUp panics. + defer func() { + if g.lock != nil { + g.lock.RUnlock() + } + }() + if g.cleanUp != nil { + g.cleanUp() + } + }) + return nil +} + +// Read - to implement Reader interface. +func (g *GetObjectReader) Read(p []byte) (n int, err error) { + n, err = g.pr.Read(p) + if err != nil { + // Calling code may not Close() in case of error, so + // we ensure it. + g.Close() + } + return +} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 56497713b..d2b1c9ed3 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -272,53 +272,20 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req bucket := vars["bucket"] object := vars["object"] - getObjectInfo := objectAPI.GetObjectInfo + getObjectNInfo := objectAPI.GetObjectNInfo if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - } - - if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { - if getRequestAuthType(r) == authTypeAnonymous { - // As per "Permission" section in https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html - // If the object you request does not exist, the error Amazon S3 returns depends on whether you also have the s3:ListBucket permission. - // * If you have the s3:ListBucket permission on the bucket, Amazon S3 will return an HTTP status code 404 ("no such key") error. - // * if you don’t have the s3:ListBucket permission, Amazon S3 will return an HTTP status code 403 ("access denied") error.` - if globalPolicySys.IsAllowed(policy.Args{ - Action: policy.ListBucketAction, - BucketName: bucket, - ConditionValues: getConditionValues(r, ""), - IsOwner: false, - }) { - _, err := getObjectInfo(ctx, bucket, object) - if toAPIErrorCode(err) == ErrNoSuchKey { - s3Error = ErrNoSuchKey - } - } - } - writeErrorResponse(w, s3Error, r.URL) - return - } - - objInfo, err := getObjectInfo(ctx, bucket, object) - if err != nil { - writeErrorResponse(w, toAPIErrorCode(err), r.URL) - return - } - - if objectAPI.IsEncryptionSupported() { - if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil { - writeErrorResponse(w, toAPIErrorCode(err), r.URL) - return - } + getObjectNInfo = api.CacheAPI().GetObjectNInfo } // Get request range. - var hrange *httpRange + var rs *HTTPRangeSpec rangeHeader := r.Header.Get("Range") if rangeHeader != "" { - if hrange, err = parseRequestRange(rangeHeader, objInfo.Size); err != nil { - // Handle only errInvalidRange - // Ignore other parse error and treat it as regular Get request like Amazon S3. + var err error + if rs, err = parseRequestRangeSpec(rangeHeader); err != nil { + // Handle only errInvalidRange. Ignore other + // parse error and treat it as regular Get + // request like Amazon S3. if err == errInvalidRange { writeErrorResponse(w, ErrInvalidRange, r.URL) return @@ -329,33 +296,99 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req } } + objInfo, reader, err := getObjectNInfo(ctx, bucket, object, rs) + if reader != nil { + defer reader.Close() + } + // Before check err value above, we need to check the auth + // type to return the correct error (NoSuchKey vs AccessDenied) + if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { + if getRequestAuthType(r) == authTypeAnonymous { + // As per "Permission" section in + // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html + // If the object you request does not exist, + // the error Amazon S3 returns depends on + // whether you also have the s3:ListBucket + // permission. + // * If you have the s3:ListBucket permission + // on the bucket, Amazon S3 will return an + // HTTP status code 404 ("no such key") + // error. + // * if you don’t have the s3:ListBucket + // permission, Amazon S3 will return an HTTP + // status code 403 ("access denied") error.` + if globalPolicySys.IsAllowed(policy.Args{ + Action: policy.ListBucketAction, + BucketName: bucket, + ConditionValues: getConditionValues(r, ""), + IsOwner: false, + }) { + if toAPIErrorCode(err) == ErrNoSuchKey { + s3Error = ErrNoSuchKey + } + } + } + writeErrorResponse(w, s3Error, r.URL) + return + } + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + // If object is encrypted, we avoid the cache layer. + isEncrypted := objectAPI.IsEncryptionSupported() && (crypto.SSEC.IsRequested(r.Header) || + crypto.S3.IsEncrypted(objInfo.UserDefined)) + if isEncrypted && api.CacheAPI() != nil { + // Close the existing reader before re-querying the backend + if reader != nil { + reader.Close() + } + // Query the backend + objInfo, reader, err = objectAPI.GetObjectNInfo(ctx, bucket, object, rs) + if reader != nil { + defer reader.Close() + } + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + } + + if objectAPI.IsEncryptionSupported() { + if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + } + // Validate pre-conditions if any. if checkPreconditions(w, r, objInfo) { return } - // Get the object. - var startOffset int64 - length := objInfo.Size - if hrange != nil { - startOffset = hrange.offsetBegin - length = hrange.getLength() + startOffset, length := int64(0), objInfo.Size + if rs != nil { + startOffset, length = rs.GetOffsetLength(objInfo.Size) } - var writer io.Writer - writer = w + // Get the object. if objectAPI.IsEncryptionSupported() { s3Encrypted := crypto.S3.IsEncrypted(objInfo.UserDefined) if crypto.SSEC.IsRequested(r.Header) || s3Encrypted { - // Response writer should be limited early on for decryption upto required length, - // additionally also skipping mod(offset)64KiB boundaries. - writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) - - writer, startOffset, length, err = DecryptBlocksRequest(writer, r, bucket, object, startOffset, length, objInfo, false) + var encReader io.Reader + encReader, startOffset, length, err = DecryptBlocksRequestR(reader, r, bucket, object, startOffset, length, objInfo, false) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } + // Resulting reader should be limited early on + // for decryption upto required length, + // additionally also skipping mod(offset)64KiB + // boundaries. + encReader = io.LimitReader(ioutil.NewSkipReader(encReader, startOffset%(64*1024)), length) + cleanUp := func() { reader.Close() } + reader = NewGetObjectReader(encReader, nil, cleanUp) if s3Encrypted { w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256) } else { @@ -365,24 +398,19 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req } } - setObjectHeaders(w, objInfo, hrange) + setObjectHeaders(w, objInfo, rs) setHeadGetRespHeaders(w, r.URL.Query()) - getObject := objectAPI.GetObject - if api.CacheAPI() != nil && !crypto.SSEC.IsRequested(r.Header) && !crypto.S3.IsEncrypted(objInfo.UserDefined) { - getObject = api.CacheAPI().GetObject - } - statusCodeWritten := false - httpWriter := ioutil.WriteOnClose(writer) + httpWriter := ioutil.WriteOnClose(w) - if hrange != nil && hrange.offsetBegin > -1 { + if rs != nil { statusCodeWritten = true w.WriteHeader(http.StatusPartialContent) } - // Reads the object at startOffset and writes to mw. - if err = getObject(ctx, bucket, object, startOffset, length, httpWriter, objInfo.ETag); err != nil { + // Write object content to response body + if _, err = io.Copy(httpWriter, reader); err != nil { if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet writeErrorResponse(w, toAPIErrorCode(err), r.URL) } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index ee65adfff..4cbb727cc 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -550,6 +550,11 @@ func (s *xlSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err err // --- Object Operations --- +// GetObjectNInfo +func (s *xlSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { + return s.getHashedSet(object).GetObjectNInfo(ctx, bucket, object, rs) +} + // GetObject - reads an object from the hashedSet based on the object name. func (s *xlSets) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error { return s.getHashedSet(object).GetObject(ctx, bucket, object, startOffset, length, writer, etag) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index d056fb30b..91740970e 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "encoding/hex" "io" @@ -162,6 +163,53 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc return objInfo, nil } +func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { + + // Acquire lock + lock := xl.nsMutex.NewNSLock(bucket, object) + if err = lock.GetRLock(globalObjectTimeout); err != nil { + return objInfo, nil, err + } + objReader := &GetObjectReader{ + lock: lock, + } + + if err = checkGetObjArgs(ctx, bucket, object); err != nil { + return objInfo, objReader, err + } + + if hasSuffix(object, slashSeparator) { + if !xl.isObjectDir(bucket, object) { + return objInfo, objReader, toObjectErr(errFileNotFound, bucket, object) + } + var e error + if objInfo, e = xl.getObjectInfoDir(ctx, bucket, object); e != nil { + return objInfo, objReader, toObjectErr(e, bucket, object) + } + objReader.pr = bytes.NewBuffer(nil) + return objInfo, objReader, nil + } + + objInfo, err = xl.getObjectInfo(ctx, bucket, object) + if err != nil { + return objInfo, objReader, toObjectErr(err, bucket, object) + } + + startOffset, readLength := int64(0), objInfo.Size + if rs != nil { + startOffset, readLength = rs.GetOffsetLength(objInfo.Size) + } + + pr, pw := io.Pipe() + objReader.pr = pr + go func() { + err := xl.getObject(ctx, bucket, object, startOffset, readLength, pw, "") + pw.CloseWithError(err) + }() + + return objInfo, objReader, nil +} + // GetObject - reads an object erasured coded across multiple // disks. Supports additional parameters like offset and length // which are synonymous with HTTP Range requests. diff --git a/pkg/ioutil/ioutil.go b/pkg/ioutil/ioutil.go index 91827eb4f..bcedf84a6 100644 --- a/pkg/ioutil/ioutil.go +++ b/pkg/ioutil/ioutil.go @@ -126,3 +126,34 @@ func (nopCloser) Close() error { return nil } func NopCloser(w io.Writer) io.WriteCloser { return nopCloser{w} } + +// SkipReader skips a given number of bytes and then returns all +// remaining data. +type SkipReader struct { + io.Reader + + skipCount int64 +} + +func (s *SkipReader) Read(p []byte) (int, error) { + l := int64(len(p)) + if l == 0 { + return 0, nil + } + for s.skipCount > 0 { + if l > s.skipCount { + l = s.skipCount + } + n, err := s.Reader.Read(p[:l]) + if err != nil { + return 0, err + } + s.skipCount -= int64(n) + } + return s.Reader.Read(p) +} + +// NewSkipReader - creates a SkipReader +func NewSkipReader(r io.Reader, n int64) io.Reader { + return &SkipReader{r, n} +}