diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index 6ff68c61d..5e860b0b6 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -159,6 +159,7 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error opts.DatanodeDialFunc = dialFunc // Not addresses found, load it from command line. + var commonPath string if len(opts.Addresses) == 0 { var addresses []string for _, s := range g.args { @@ -166,6 +167,15 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error if err != nil { return nil, err } + if u.Scheme != "hdfs" { + return nil, fmt.Errorf("unsupported scheme %s, only supports hdfs://", u) + } + if commonPath != "" && commonPath != u.Path { + return nil, fmt.Errorf("all namenode paths should be same %s", g.args) + } + if commonPath == "" { + commonPath = u.Path + } addresses = append(addresses, u.Host) } opts.Addresses = addresses @@ -173,13 +183,13 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error u, err := user.Current() if err != nil { - return nil, fmt.Errorf("Unable to lookup local user: %s", err) + return nil, fmt.Errorf("unable to lookup local user: %s", err) } if opts.KerberosClient != nil { opts.KerberosClient, err = getKerberosClient() if err != nil { - return nil, fmt.Errorf("Unable to initialize kerberos client: %s", err) + return nil, fmt.Errorf("unable to initialize kerberos client: %s", err) } } else { opts.User = env.Get("HADOOP_USER_NAME", u.Username) @@ -187,14 +197,14 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error clnt, err := hdfs.NewClient(opts) if err != nil { + return nil, fmt.Errorf("unable to initialize hdfsClient") + } + + if err = clnt.MkdirAll(minio.PathJoin(commonPath, hdfsSeparator, minioMetaTmpBucket), os.FileMode(0755)); err != nil { return nil, err } - if err = clnt.MkdirAll(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket), os.FileMode(0755)); err != nil { - return nil, err - } - - return &hdfsObjects{clnt: clnt, listPool: minio.NewTreeWalkPool(time.Minute * 30)}, nil + return &hdfsObjects{clnt: clnt, subPath: commonPath, listPool: minio.NewTreeWalkPool(time.Minute * 30)}, nil } // Production - hdfs gateway is production ready. @@ -223,6 +233,7 @@ func (n *hdfsObjects) StorageInfo(ctx context.Context, _ bool) (si minio.Storage type hdfsObjects struct { minio.GatewayUnsupported clnt *hdfs.Client + subPath string listPool *minio.TreeWalkPool } @@ -276,14 +287,18 @@ func hdfsIsValidBucketName(bucket string) bool { return s3utils.CheckValidBucketNameStrict(bucket) == nil } +func (n *hdfsObjects) hdfsPathJoin(args ...string) string { + return minio.PathJoin(append([]string{n.subPath, hdfsSeparator}, args...)...) +} + func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { if !hdfsIsValidBucketName(bucket) { return minio.BucketNameInvalid{Bucket: bucket} } if forceDelete { - return hdfsToObjectErr(ctx, n.clnt.RemoveAll(minio.PathJoin(hdfsSeparator, bucket)), bucket) + return hdfsToObjectErr(ctx, n.clnt.RemoveAll(n.hdfsPathJoin(bucket)), bucket) } - return hdfsToObjectErr(ctx, n.clnt.Remove(minio.PathJoin(hdfsSeparator, bucket)), bucket) + return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(bucket)), bucket) } func (n *hdfsObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts minio.BucketOptions) error { @@ -294,11 +309,11 @@ func (n *hdfsObjects) MakeBucketWithLocation(ctx context.Context, bucket string, if !hdfsIsValidBucketName(bucket) { return minio.BucketNameInvalid{Bucket: bucket} } - return hdfsToObjectErr(ctx, n.clnt.Mkdir(minio.PathJoin(hdfsSeparator, bucket), os.FileMode(0755)), bucket) + return hdfsToObjectErr(ctx, n.clnt.Mkdir(n.hdfsPathJoin(bucket), os.FileMode(0755)), bucket) } func (n *hdfsObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) { - fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return bi, hdfsToObjectErr(ctx, err, bucket) } @@ -336,7 +351,7 @@ func (n *hdfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketIn func (n *hdfsObjects) listDirFactory() minio.ListDirFunc { // listDir - lists all the entries at a given prefix and given entry in the prefix. listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string) { - f, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, prefixDir)) + f, err := n.clnt.Open(n.hdfsPathJoin(bucket, prefixDir)) if err != nil { if os.IsNotExist(err) { err = nil @@ -369,12 +384,12 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc { // ListObjects lists all blobs in HDFS bucket filtered by prefix. func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) { - if _, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)); err != nil { + if _, err := n.clnt.Stat(n.hdfsPathJoin(bucket)); err != nil { return loi, hdfsToObjectErr(ctx, err, bucket) } getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) { - fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket, entry)) + fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket, entry)) if err != nil { return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket, entry) } @@ -443,7 +458,7 @@ func (n *hdfsObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continu } func (n *hdfsObjects) DeleteObject(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) { - err := hdfsToObjectErr(ctx, n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), minio.PathJoin(hdfsSeparator, bucket, object)), bucket, object) + err := hdfsToObjectErr(ctx, n.deleteObject(n.hdfsPathJoin(bucket), n.hdfsPathJoin(bucket, object)), bucket, object) return minio.ObjectInfo{ Bucket: bucket, Name: object, @@ -490,7 +505,7 @@ func (n *hdfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string, } func (n *hdfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.ObjectInfo, error) { - cpSrcDstSame := minio.IsStringEqual(minio.PathJoin(hdfsSeparator, srcBucket, srcObject), minio.PathJoin(hdfsSeparator, dstBucket, dstObject)) + cpSrcDstSame := minio.IsStringEqual(n.hdfsPathJoin(srcBucket, srcObject), n.hdfsPathJoin(dstBucket, dstObject)) if cpSrcDstSame { return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{}) } @@ -502,10 +517,10 @@ func (n *hdfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstB } func (n *hdfsObjects) GetObject(ctx context.Context, bucket, key string, startOffset, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error { - if _, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)); err != nil { + if _, err := n.clnt.Stat(n.hdfsPathJoin(bucket)); err != nil { return hdfsToObjectErr(ctx, err, bucket) } - rd, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, key)) + rd, err := n.clnt.Open(n.hdfsPathJoin(bucket, key)) if err != nil { return hdfsToObjectErr(ctx, err, bucket, key) } @@ -521,7 +536,7 @@ func (n *hdfsObjects) GetObject(ctx context.Context, bucket, key string, startOf } func (n *hdfsObjects) isObjectDir(ctx context.Context, bucket, object string) bool { - f, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, object)) + f, err := n.clnt.Open(n.hdfsPathJoin(bucket, object)) if err != nil { if os.IsNotExist(err) { return false @@ -541,7 +556,7 @@ func (n *hdfsObjects) isObjectDir(ctx context.Context, bucket, object string) bo // GetObjectInfo reads object info and replies back ObjectInfo. func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return objInfo, hdfsToObjectErr(ctx, err, bucket) } @@ -549,7 +564,7 @@ func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, return objInfo, hdfsToObjectErr(ctx, os.ErrNotExist, bucket, object) } - fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket, object)) + fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket, object)) if err != nil { return objInfo, hdfsToObjectErr(ctx, err, bucket, object) } @@ -564,27 +579,27 @@ func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, } func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return objInfo, hdfsToObjectErr(ctx, err, bucket) } - name := minio.PathJoin(hdfsSeparator, bucket, object) + name := n.hdfsPathJoin(bucket, object) // If its a directory create a prefix { if strings.HasSuffix(object, hdfsSeparator) && r.Size() == 0 { if err = n.clnt.MkdirAll(name, os.FileMode(0755)); err != nil { - n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), name) + n.deleteObject(n.hdfsPathJoin(bucket), name) return objInfo, hdfsToObjectErr(ctx, err, bucket, object) } } else { - tmpname := minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, minio.MustGetUUID()) + tmpname := n.hdfsPathJoin(minioMetaTmpBucket, minio.MustGetUUID()) var w *hdfs.FileWriter w, err = n.clnt.Create(tmpname) if err != nil { return objInfo, hdfsToObjectErr(ctx, err, bucket, object) } - defer n.deleteObject(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket), tmpname) + defer n.deleteObject(n.hdfsPathJoin(minioMetaTmpBucket), tmpname) if _, err = io.Copy(w, r); err != nil { w.Close() return objInfo, hdfsToObjectErr(ctx, err, bucket, object) @@ -593,7 +608,7 @@ func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object strin if dir != "" { if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil { w.Close() - n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir) + n.deleteObject(n.hdfsPathJoin(bucket), dir) return objInfo, hdfsToObjectErr(ctx, err, bucket, object) } } @@ -618,13 +633,13 @@ func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object strin } func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (uploadID string, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return uploadID, hdfsToObjectErr(ctx, err, bucket) } uploadID = minio.MustGetUUID() - if err = n.clnt.CreateEmptyFile(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)); err != nil { + if err = n.clnt.CreateEmptyFile(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)); err != nil { return uploadID, hdfsToObjectErr(ctx, err, bucket) } @@ -632,7 +647,7 @@ func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, obj } func (n *hdfsObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return lmi, hdfsToObjectErr(ctx, err, bucket) } @@ -642,7 +657,7 @@ func (n *hdfsObjects) ListMultipartUploads(ctx context.Context, bucket string, p } func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)) + _, err = n.clnt.Stat(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)) if err != nil { return hdfsToObjectErr(ctx, err, bucket, object, uploadID) } @@ -651,7 +666,7 @@ func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, u // GetMultipartInfo returns multipart info of the uploadId of the object func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return result, hdfsToObjectErr(ctx, err, bucket) } @@ -667,7 +682,7 @@ func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uplo } func (n *hdfsObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return result, hdfsToObjectErr(ctx, err, bucket) } @@ -686,13 +701,13 @@ func (n *hdfsObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, } func (n *hdfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (info minio.PartInfo, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return info, hdfsToObjectErr(ctx, err, bucket) } var w *hdfs.FileWriter - w, err = n.clnt.Append(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)) + w, err = n.clnt.Append(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)) if err != nil { return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID) } @@ -711,7 +726,7 @@ func (n *hdfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI } func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, parts []minio.CompletePart, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return objInfo, hdfsToObjectErr(ctx, err, bucket) } @@ -720,7 +735,7 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec return objInfo, err } - name := minio.PathJoin(hdfsSeparator, bucket, object) + name := n.hdfsPathJoin(bucket, object) dir := path.Dir(name) if dir != "" { if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil { @@ -728,19 +743,19 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec } } - err = n.clnt.Rename(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID), name) + err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name) // Object already exists is an error on HDFS // remove it and then create it again. if os.IsExist(err) { if err = n.clnt.Remove(name); err != nil { if dir != "" { - n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir) + n.deleteObject(n.hdfsPathJoin(bucket), dir) } return objInfo, hdfsToObjectErr(ctx, err, bucket, object) } - if err = n.clnt.Rename(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID), name); err != nil { + if err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name); err != nil { if dir != "" { - n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir) + n.deleteObject(n.hdfsPathJoin(bucket), dir) } return objInfo, hdfsToObjectErr(ctx, err, bucket, object) } @@ -765,11 +780,11 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec } func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) { - _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return hdfsToObjectErr(ctx, err, bucket) } - return hdfsToObjectErr(ctx, n.clnt.Remove(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)), bucket, object, uploadID) + return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)), bucket, object, uploadID) } // IsReady returns whether the layer is ready to take requests.