diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 72a22ebc1..e46dd8b64 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -25,6 +25,7 @@ import ( "os" "path" "sort" + "strings" "sync" "time" @@ -32,9 +33,13 @@ import ( "github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/policy" ) +// Default etag is used for pre-existing objects. +var defaultEtag = "00000000000000000000000000000000-1" + // FSObjects - Implements fs object layer. type FSObjects struct { // Path to be exported over S3 API. @@ -519,12 +524,48 @@ func (fs *FSObjects) getObject(ctx context.Context, bucket, object string, offse return toObjectErr(err, bucket, object) } +// Create a new fs.json file, if the existing one is corrupt. Should happen very rarely. +func (fs *FSObjects) createFsJSON(object, fsMetaPath string) error { + fsMeta := newFSMetaV1() + fsMeta.Meta = make(map[string]string) + fsMeta.Meta["etag"] = GenETag() + if objectExt := path.Ext(object); objectExt != "" { + if content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]; ok { + fsMeta.Meta["content-type"] = content.ContentType + } else { + fsMeta.Meta["content-type"] = "application/octet-stream" + } + } + wlk, werr := fs.rwPool.Create(fsMetaPath) + if werr == nil { + _, err := fsMeta.WriteTo(wlk) + wlk.Close() + return err + } + return werr +} + +// Used to return default etag values when a pre-existing object's meta data is queried. +func (fs *FSObjects) defaultFsJSON(object string) fsMetaV1 { + fsMeta := newFSMetaV1() + fsMeta.Meta = make(map[string]string) + fsMeta.Meta["etag"] = defaultEtag + if objectExt := path.Ext(object); objectExt != "" { + if content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]; ok { + fsMeta.Meta["content-type"] = content.ContentType + } else { + fsMeta.Meta["content-type"] = "application/octet-stream" + } + } + return fsMeta +} + // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) { fsMeta := fsMetaV1{} fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)) if err != nil && err != errFileAccessDenied { - return oi, toObjectErr(err, bucket, object) + return oi, err } if fi != nil { // If file found and request was with object ending with "/", consider it @@ -532,8 +573,7 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) ( if hasSuffix(object, slashSeparator) { return fsMeta.ToObjectInfo(bucket, object, fi), nil } - logger.LogIf(ctx, errFileNotFound) - return oi, toObjectErr(errFileNotFound, bucket, object) + return oi, errFileNotFound } fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) @@ -543,34 +583,35 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) ( rlk, err := fs.rwPool.Open(fsMetaPath) if err == nil { // Read from fs metadata only if it exists. - defer fs.rwPool.Close(fsMetaPath) - if _, rerr := fsMeta.ReadFrom(ctx, rlk.LockedFile); rerr != nil { - // `fs.json` can be empty due to previously failed - // PutObject() transaction, if we arrive at such - // a situation we just ignore and continue. - if rerr != io.EOF { - return oi, toObjectErr(rerr, bucket, object) - } + _, rerr := fsMeta.ReadFrom(ctx, rlk.LockedFile) + fs.rwPool.Close(fsMetaPath) + if rerr != nil { + return oi, rerr } } + // Return a default etag and content-type based on the object's extension. + if err == errFileNotFound { + fsMeta = fs.defaultFsJSON(object) + } + // Ignore if `fs.json` is not available, this is true for pre-existing data. if err != nil && err != errFileNotFound { logger.LogIf(ctx, err) - return oi, toObjectErr(err, bucket, object) + return oi, err } // Stat the file to get file size. fi, err = fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)) if err != nil { - return oi, toObjectErr(err, bucket, object) + return oi, err } return fsMeta.ToObjectInfo(bucket, object, fi), nil } -// GetObjectInfo - reads object metadata and replies back ObjectInfo. -func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) { +// getObjectInfoWithLock - reads object metadata and replies back ObjectInfo. +func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) { // Lock the object before reading. objectLock := fs.nsMutex.NewNSLock(bucket, object) if err := objectLock.GetRLock(globalObjectTimeout); err != nil { @@ -589,6 +630,27 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string) ( return fs.getObjectInfo(ctx, bucket, object) } +// GetObjectInfo - reads object metadata and replies back ObjectInfo. +func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) { + oi, err := fs.getObjectInfoWithLock(ctx, bucket, object) + if err == errCorruptedFormat || err == io.EOF { + objectLock := fs.nsMutex.NewNSLock(bucket, object) + if err = objectLock.GetLock(globalObjectTimeout); err != nil { + return oi, toObjectErr(err, bucket, object) + } + + fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) + err = fs.createFsJSON(object, fsMetaPath) + objectLock.Unlock() + if err != nil { + return oi, toObjectErr(err, bucket, object) + } + + oi, err = fs.getObjectInfoWithLock(ctx, bucket, object) + } + return oi, toObjectErr(err, bucket, object) +} + // This function does the following check, suppose // object is "a/b/c/d", stat makes sure that objects ""a/b/c"" // "a/b" and "a" do not exist.