hdfs gateway handle listing single files (#10362)

This commit is contained in:
Jorge Israel Peña 2020-08-26 16:03:53 -07:00 committed by GitHub
parent 7e80afdd7f
commit 0a2e6d58a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -392,12 +392,27 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
// ListObjects lists all blobs in HDFS bucket filtered by prefix.
func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
fileInfos := make(map[string]os.FileInfo)
directoryPath := n.hdfsPathJoin(bucket, prefix)
targetPath := n.hdfsPathJoin(bucket, prefix)
if err = n.populateDirectoryListing(directoryPath, fileInfos); err != nil {
var targetFileInfo os.FileInfo
if targetFileInfo, err = n.populateDirectoryListing(targetPath, fileInfos); err != nil {
return loi, hdfsToObjectErr(ctx, err, bucket)
}
// If the user is trying to list a single file, bypass the entire directory-walking code below
// and just return the single file's information.
if !targetFileInfo.IsDir() {
return minio.ListObjectsInfo{
IsTruncated: false,
NextMarker: "",
Objects: []minio.ObjectInfo{
fileInfoToObjectInfo(bucket, prefix, targetFileInfo),
},
Prefixes: []string{},
}, nil
}
getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) {
filePath := path.Clean(n.hdfsPathJoin(bucket, entry))
fi, ok := fileInfos[filePath]
@ -407,7 +422,7 @@ func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, d
if !ok {
parentPath := path.Dir(filePath)
if err := n.populateDirectoryListing(parentPath, fileInfos); err != nil {
if _, err := n.populateDirectoryListing(parentPath, fileInfos); err != nil {
return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket)
}
@ -419,14 +434,7 @@ func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, d
}
}
objectInfo := minio.ObjectInfo{
Bucket: bucket,
Name: entry,
ModTime: fi.ModTime(),
Size: fi.Size(),
IsDir: fi.IsDir(),
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
}
objectInfo := fileInfoToObjectInfo(bucket, entry, fi)
delete(fileInfos, filePath)
@ -436,23 +444,38 @@ func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, d
return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), n.isLeaf, n.isLeafDir, getObjectInfo, getObjectInfo)
}
func fileInfoToObjectInfo(bucket string, entry string, fi os.FileInfo) minio.ObjectInfo {
return minio.ObjectInfo{
Bucket: bucket,
Name: entry,
ModTime: fi.ModTime(),
Size: fi.Size(),
IsDir: fi.IsDir(),
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
}
}
// Lists a path's direct, first-level entries and populates them in the `fileInfos` cache which maps
// a path entry to an `os.FileInfo`. It also saves the listed path's `os.FileInfo` in the cache.
func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[string]os.FileInfo) error {
func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[string]os.FileInfo) (os.FileInfo, error) {
dirReader, err := n.clnt.Open(filePath)
if err != nil {
return err
return nil, err
}
dirStat := dirReader.Stat()
key := path.Clean(filePath)
if !dirStat.IsDir() {
return dirStat, nil
}
fileInfos[key] = dirStat
infos, err := dirReader.Readdir(0)
if err != nil {
return err
return nil, err
}
for _, fileInfo := range infos {
@ -460,7 +483,7 @@ func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[st
fileInfos[filePath] = fileInfo
}
return nil
return dirStat, nil
}
// deleteObject deletes a file path if its empty. If it's successfully deleted,