mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
@@ -19,12 +19,10 @@ package cmd
|
||||
|
||||
import "github.com/minio/madmin-go"
|
||||
|
||||
// GatewayMinioSysTmp prefix is used in Azure/GCS gateway for save metadata sent by Initialize Multipart Upload API.
|
||||
// GatewayMinioSysTmp prefix is used in Azure gateway for save metadata sent by Initialize Multipart Upload API.
|
||||
const (
|
||||
GatewayMinioSysTmp = "minio.sys.tmp/"
|
||||
AzureBackendGateway = "azure"
|
||||
GCSBackendGateway = "gcs"
|
||||
HDFSBackendGateway = "hdfs"
|
||||
NASBackendGateway = "nas"
|
||||
S3BackendGateway = "s3"
|
||||
)
|
||||
|
||||
@@ -27,12 +27,6 @@ import (
|
||||
|
||||
// S3
|
||||
_ "github.com/minio/minio/cmd/gateway/s3"
|
||||
|
||||
// HDFS
|
||||
_ "github.com/minio/minio/cmd/gateway/hdfs"
|
||||
|
||||
// GCS (use only if you must, GCS already supports S3 API)
|
||||
_ "github.com/minio/minio/cmd/gateway/gcs"
|
||||
// gateway functionality is frozen, no new gateways are being implemented
|
||||
// or considered for upstream inclusion at this point in time. if needed
|
||||
// please keep a fork of the project.
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,500 +0,0 @@
|
||||
/*
|
||||
* MinIO Object Storage (c) 2021 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package gcs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"google.golang.org/api/googleapi"
|
||||
|
||||
miniogo "github.com/minio/minio-go/v7"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
)
|
||||
|
||||
func TestToGCSPageToken(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Name string
|
||||
Token string
|
||||
}{
|
||||
{
|
||||
Name: "A",
|
||||
Token: "CgFB",
|
||||
},
|
||||
{
|
||||
Name: "AAAAAAAAAA",
|
||||
Token: "CgpBQUFBQUFBQUFB",
|
||||
},
|
||||
{
|
||||
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||
Token: "CmRBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB",
|
||||
},
|
||||
{
|
||||
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||
Token: "CpEDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE=",
|
||||
},
|
||||
{
|
||||
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||
Token: "CpIDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB",
|
||||
},
|
||||
{
|
||||
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||
Token: "CpMDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQ==",
|
||||
},
|
||||
{
|
||||
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||
Token: "CvQDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE=",
|
||||
},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
if toGCSPageToken(testCase.Name) != testCase.Token {
|
||||
t.Errorf("Test %d: Expected %s, got %s", i+1, toGCSPageToken(testCase.Name), testCase.Token)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsValidGCSProjectIDFormat tests isValidGCSProjectIDFormat
|
||||
func TestValidGCSProjectIDFormat(t *testing.T) {
|
||||
testCases := []struct {
|
||||
ProjectID string
|
||||
Valid bool
|
||||
}{
|
||||
{"", false},
|
||||
{"a", false},
|
||||
{"Abc", false},
|
||||
{"1bcd", false},
|
||||
// 5 chars
|
||||
{"abcdb", false},
|
||||
// 6 chars
|
||||
{"abcdbz", true},
|
||||
// 30 chars
|
||||
{"project-id-1-project-id-more-1", true},
|
||||
// 31 chars
|
||||
{"project-id-1-project-id-more-11", false},
|
||||
{"storage.googleapis.com", false},
|
||||
{"http://storage.googleapis.com", false},
|
||||
{"http://localhost:9000", false},
|
||||
{"project-id-1", true},
|
||||
{"project-id-1988832", true},
|
||||
{"projectid1414", true},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
valid := isValidGCSProjectIDFormat(testCase.ProjectID)
|
||||
if valid != testCase.Valid {
|
||||
t.Errorf("Test %d: Expected %v, got %v", i+1, valid, testCase.Valid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test for isGCSMarker.
|
||||
func TestIsGCSMarker(t *testing.T) {
|
||||
testCases := []struct {
|
||||
marker string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
marker: "{minio}gcs123",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
marker: "{mini_no}tgcs123",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
marker: "{minioagainnotgcs123",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
marker: "obj1",
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
if actual := isGCSMarker(tc.marker); actual != tc.expected {
|
||||
t.Errorf("Test %d: marker is %s, expected %v but got %v",
|
||||
i+1, tc.marker, tc.expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test for gcsMultipartMetaName.
|
||||
func TestGCSMultipartMetaName(t *testing.T) {
|
||||
uploadID := "a"
|
||||
expected := path.Join(gcsMinioMultipartPathV1, uploadID, gcsMinioMultipartMeta)
|
||||
got := gcsMultipartMetaName(uploadID)
|
||||
if expected != got {
|
||||
t.Errorf("expected: %s, got: %s", expected, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test for gcsMultipartDataName.
|
||||
func TestGCSMultipartDataName(t *testing.T) {
|
||||
var (
|
||||
uploadID = "a"
|
||||
etag = "b"
|
||||
partNumber = 1
|
||||
)
|
||||
expected := path.Join(gcsMinioMultipartPathV1, uploadID, fmt.Sprintf("%05d.%s", partNumber, etag))
|
||||
got := gcsMultipartDataName(uploadID, partNumber, etag)
|
||||
if expected != got {
|
||||
t.Errorf("expected: %s, got: %s", expected, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFromMinioClientListBucketResultToV2Info(t *testing.T) {
|
||||
listBucketResult := miniogo.ListBucketResult{
|
||||
IsTruncated: false,
|
||||
Marker: "testMarker",
|
||||
NextMarker: "testMarker2",
|
||||
CommonPrefixes: []miniogo.CommonPrefix{{Prefix: "one"}, {Prefix: "two"}},
|
||||
Contents: []miniogo.ObjectInfo{{Key: "testobj", ContentType: ""}},
|
||||
}
|
||||
|
||||
listBucketV2Info := minio.ListObjectsV2Info{
|
||||
Prefixes: []string{"one", "two"},
|
||||
Objects: []minio.ObjectInfo{{Name: "testobj", Bucket: "testbucket", UserDefined: map[string]string{"Content-Type": ""}}},
|
||||
IsTruncated: false,
|
||||
ContinuationToken: "testMarker",
|
||||
NextContinuationToken: "testMarker2",
|
||||
}
|
||||
|
||||
if got := minio.FromMinioClientListBucketResultToV2Info("testbucket", listBucketResult); !reflect.DeepEqual(got, listBucketV2Info) {
|
||||
t.Errorf("fromMinioClientListBucketResultToV2Info() = %v, want %v", got, listBucketV2Info)
|
||||
}
|
||||
}
|
||||
|
||||
// Test for gcsParseProjectID
|
||||
func TestGCSParseProjectID(t *testing.T) {
|
||||
f, err := ioutil.TempFile("", "TestGCSParseProjectID-*")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
contents := `
|
||||
{
|
||||
"type": "service_account",
|
||||
"project_id": "miniotesting"
|
||||
}
|
||||
`
|
||||
f.WriteString(contents)
|
||||
f.Close()
|
||||
projectID, err := gcsParseProjectID(f.Name())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if projectID != "miniotesting" {
|
||||
t.Errorf(`Expected projectID value to be "miniotesting"`)
|
||||
}
|
||||
|
||||
if _, err = gcsParseProjectID("non-existent"); err == nil {
|
||||
t.Errorf(`Expected to fail but succeeded reading "non-existent"`)
|
||||
}
|
||||
|
||||
contents = `
|
||||
{
|
||||
"type": "service_account",
|
||||
"project_id": "miniotesting"
|
||||
},}
|
||||
`
|
||||
f, err = ioutil.TempFile("", "TestGCSParseProjectID-*")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer os.Remove(f.Name())
|
||||
f.WriteString(contents)
|
||||
f.Close()
|
||||
if _, err := gcsParseProjectID(f.Name()); err == nil {
|
||||
t.Errorf(`Expected to fail reading corrupted credentials file`)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGCSToObjectError(t *testing.T) {
|
||||
testCases := []struct {
|
||||
params []string
|
||||
gcsErr error
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
[]string{}, nil, nil,
|
||||
},
|
||||
{
|
||||
[]string{}, fmt.Errorf("Not *Error"), fmt.Errorf("Not *Error"),
|
||||
},
|
||||
{
|
||||
[]string{"bucket"},
|
||||
fmt.Errorf("storage: bucket doesn't exist"),
|
||||
minio.BucketNotFound{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
fmt.Errorf("storage: object doesn't exist"),
|
||||
minio.ObjectNotFound{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object", "uploadID"},
|
||||
fmt.Errorf("storage: object doesn't exist"),
|
||||
minio.InvalidUploadID{
|
||||
UploadID: "uploadID",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{},
|
||||
fmt.Errorf("Unknown error"),
|
||||
fmt.Errorf("Unknown error"),
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Message: "No list of errors",
|
||||
},
|
||||
&googleapi.Error{
|
||||
Message: "No list of errors",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "conflict",
|
||||
Message: "You already own this bucket. Please select another name.",
|
||||
}},
|
||||
},
|
||||
minio.BucketAlreadyOwnedByYou{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "conflict",
|
||||
Message: "Sorry, that name is not available. Please try a different one.",
|
||||
}},
|
||||
},
|
||||
minio.BucketAlreadyExists{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "conflict",
|
||||
}},
|
||||
},
|
||||
minio.BucketNotEmpty{Bucket: "bucket"},
|
||||
},
|
||||
{
|
||||
[]string{"bucket"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "notFound",
|
||||
}},
|
||||
},
|
||||
minio.BucketNotFound{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "notFound",
|
||||
}},
|
||||
},
|
||||
minio.ObjectNotFound{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "invalid",
|
||||
}},
|
||||
},
|
||||
minio.BucketNameInvalid{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "forbidden",
|
||||
}},
|
||||
},
|
||||
minio.PrefixAccessDenied{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "keyInvalid",
|
||||
}},
|
||||
},
|
||||
minio.PrefixAccessDenied{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"},
|
||||
&googleapi.Error{
|
||||
Errors: []googleapi.ErrorItem{{
|
||||
Reason: "required",
|
||||
}},
|
||||
},
|
||||
minio.PrefixAccessDenied{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
actualErr := gcsToObjectError(testCase.gcsErr, testCase.params...)
|
||||
if actualErr != nil {
|
||||
if actualErr.Error() != testCase.expectedErr.Error() {
|
||||
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.expectedErr, actualErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3MetaToGCSAttributes(t *testing.T) {
|
||||
headers := map[string]string{
|
||||
"accept-encoding": "gzip",
|
||||
"content-encoding": "gzip",
|
||||
"cache-control": "age: 3600",
|
||||
"content-disposition": "dummy",
|
||||
"content-type": "application/javascript",
|
||||
"Content-Language": "en",
|
||||
"X-Amz-Meta-Hdr": "value",
|
||||
"X-Amz-Meta-X-Amz-Key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
|
||||
"X-Amz-Meta-X-Amz-Matdesc": "{}",
|
||||
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
}
|
||||
// Only X-Amz-Meta- prefixed entries will be returned in
|
||||
// Metadata (without the prefix!)
|
||||
expectedHeaders := map[string]string{
|
||||
"x-goog-meta-Hdr": "value",
|
||||
"x-goog-meta-X-Amz-Key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
|
||||
"x-goog-meta-X-Amz-Matdesc": "{}",
|
||||
"x-goog-meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
}
|
||||
|
||||
attrs := storage.ObjectAttrs{}
|
||||
applyMetadataToGCSAttrs(headers, &attrs)
|
||||
|
||||
if !reflect.DeepEqual(attrs.Metadata, expectedHeaders) {
|
||||
t.Fatalf("Test failed, expected %#v, got %#v", expectedHeaders, attrs.Metadata)
|
||||
}
|
||||
|
||||
if attrs.CacheControl != headers["cache-control"] {
|
||||
t.Fatalf("Test failed with Cache-Control mistmatch, expected %s, got %s", headers["cache-control"], attrs.CacheControl)
|
||||
}
|
||||
if attrs.ContentDisposition != headers["content-disposition"] {
|
||||
t.Fatalf("Test failed with Content-Disposition mistmatch, expected %s, got %s", headers["content-disposition"], attrs.ContentDisposition)
|
||||
}
|
||||
if attrs.ContentEncoding != headers["content-encoding"] {
|
||||
t.Fatalf("Test failed with Content-Encoding mistmatch, expected %s, got %s", headers["content-encoding"], attrs.ContentEncoding)
|
||||
}
|
||||
if attrs.ContentLanguage != headers["Content-Language"] {
|
||||
t.Fatalf("Test failed with Content-Language mistmatch, expected %s, got %s", headers["Content-Language"], attrs.ContentLanguage)
|
||||
}
|
||||
if attrs.ContentType != headers["content-type"] {
|
||||
t.Fatalf("Test failed with Content-Type mistmatch, expected %s, got %s", headers["content-type"], attrs.ContentType)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGCSAttrsToObjectInfo(t *testing.T) {
|
||||
metadata := map[string]string{
|
||||
"x-goog-meta-Hdr": "value",
|
||||
"x-goog-meta-x_amz_key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
|
||||
"x-goog-meta-x-amz-matdesc": "{}",
|
||||
"x-goog-meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
}
|
||||
expectedMeta := map[string]string{
|
||||
"X-Amz-Meta-Hdr": "value",
|
||||
"X-Amz-Meta-X_amz_key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
|
||||
"X-Amz-Meta-X-Amz-Matdesc": "{}",
|
||||
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
"Cache-Control": "max-age: 3600",
|
||||
"Content-Disposition": "dummy",
|
||||
"Content-Encoding": "gzip",
|
||||
"Content-Language": "en",
|
||||
"Content-Type": "application/javascript",
|
||||
}
|
||||
|
||||
attrs := storage.ObjectAttrs{
|
||||
Name: "test-obj",
|
||||
Bucket: "test-bucket",
|
||||
Updated: time.Now(),
|
||||
Size: 123,
|
||||
CRC32C: 45312398,
|
||||
CacheControl: "max-age: 3600",
|
||||
ContentDisposition: "dummy",
|
||||
ContentEncoding: "gzip",
|
||||
ContentLanguage: "en",
|
||||
ContentType: "application/javascript",
|
||||
Metadata: metadata,
|
||||
}
|
||||
expectedETag := minio.ToS3ETag(fmt.Sprintf("%d", attrs.CRC32C))
|
||||
|
||||
objInfo := fromGCSAttrsToObjectInfo(&attrs)
|
||||
if !reflect.DeepEqual(objInfo.UserDefined, expectedMeta) {
|
||||
t.Fatalf("Test failed, expected %#v, got %#v", expectedMeta, objInfo.UserDefined)
|
||||
}
|
||||
|
||||
if objInfo.Name != attrs.Name {
|
||||
t.Fatalf("Test failed with Name mistmatch, expected %s, got %s", attrs.Name, objInfo.Name)
|
||||
}
|
||||
if objInfo.Bucket != attrs.Bucket {
|
||||
t.Fatalf("Test failed with Bucket mistmatch, expected %s, got %s", attrs.Bucket, objInfo.Bucket)
|
||||
}
|
||||
if !objInfo.ModTime.Equal(attrs.Updated) {
|
||||
t.Fatalf("Test failed with ModTime mistmatch, expected %s, got %s", attrs.Updated, objInfo.ModTime)
|
||||
}
|
||||
if objInfo.Size != attrs.Size {
|
||||
t.Fatalf("Test failed with Size mistmatch, expected %d, got %d", attrs.Size, objInfo.Size)
|
||||
}
|
||||
if objInfo.ETag != expectedETag {
|
||||
t.Fatalf("Test failed with ETag mistmatch, expected %s, got %s", expectedETag, objInfo.ETag)
|
||||
}
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
/*
|
||||
* MinIO Object Storage (c) 2021 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package hdfs
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
)
|
||||
|
||||
const (
|
||||
// Minio meta bucket.
|
||||
minioMetaBucket = ".minio.sys"
|
||||
|
||||
// Minio Tmp meta prefix.
|
||||
minioMetaTmpBucket = minioMetaBucket + "/tmp"
|
||||
|
||||
// Minio reserved bucket name.
|
||||
minioReservedBucket = "minio"
|
||||
)
|
||||
|
||||
// Ignores all reserved bucket names or invalid bucket names.
|
||||
func isReservedOrInvalidBucket(bucketEntry string, strict bool) bool {
|
||||
bucketEntry = strings.TrimSuffix(bucketEntry, minio.SlashSeparator)
|
||||
if strict {
|
||||
if err := s3utils.CheckValidBucketNameStrict(bucketEntry); err != nil {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if err := s3utils.CheckValidBucketName(bucketEntry); err != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return isMinioMetaBucket(bucketEntry) || isMinioReservedBucket(bucketEntry)
|
||||
}
|
||||
|
||||
// Returns true if input bucket is a reserved minio meta bucket '.minio.sys'.
|
||||
func isMinioMetaBucket(bucketName string) bool {
|
||||
return bucketName == minioMetaBucket
|
||||
}
|
||||
|
||||
// Returns true if input bucket is a reserved minio bucket 'minio'.
|
||||
func isMinioReservedBucket(bucketName string) bool {
|
||||
return bucketName == minioReservedBucket
|
||||
}
|
||||
|
||||
// byBucketName is a collection satisfying sort.Interface.
|
||||
type byBucketName []minio.BucketInfo
|
||||
|
||||
func (d byBucketName) Len() int { return len(d) }
|
||||
func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
||||
func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name }
|
||||
@@ -1,887 +0,0 @@
|
||||
/*
|
||||
* MinIO Object Storage (c) 2021 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package hdfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/colinmarc/hdfs/v2"
|
||||
"github.com/colinmarc/hdfs/v2/hadoopconf"
|
||||
krb "github.com/jcmturner/gokrb5/v8/client"
|
||||
"github.com/jcmturner/gokrb5/v8/config"
|
||||
"github.com/jcmturner/gokrb5/v8/credentials"
|
||||
"github.com/jcmturner/gokrb5/v8/keytab"
|
||||
"github.com/minio/cli"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/env"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
|
||||
const (
|
||||
hdfsSeparator = minio.SlashSeparator
|
||||
)
|
||||
|
||||
func init() {
|
||||
const hdfsGatewayTemplate = `NAME:
|
||||
{{.HelpName}} - {{.Usage}}
|
||||
|
||||
USAGE:
|
||||
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} HDFS-NAMENODE [HDFS-NAMENODE...]
|
||||
{{if .VisibleFlags}}
|
||||
FLAGS:
|
||||
{{range .VisibleFlags}}{{.}}
|
||||
{{end}}{{end}}
|
||||
HDFS-NAMENODE:
|
||||
HDFS namenode URI
|
||||
|
||||
EXAMPLES:
|
||||
1. Start minio gateway server for HDFS backend
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}accesskey
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}secretkey
|
||||
{{.Prompt}} {{.HelpName}} hdfs://namenode:8200
|
||||
|
||||
2. Start minio gateway server for HDFS with edge caching enabled
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}accesskey
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}secretkey
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4"
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*,*.png"
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_QUOTA{{.AssignmentOperator}}90
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85
|
||||
{{.Prompt}} {{.HelpName}} hdfs://namenode:8200
|
||||
`
|
||||
|
||||
minio.RegisterGatewayCommand(cli.Command{
|
||||
Name: minio.HDFSBackendGateway,
|
||||
Usage: "Hadoop Distributed File System (HDFS)",
|
||||
Action: hdfsGatewayMain,
|
||||
CustomHelpTemplate: hdfsGatewayTemplate,
|
||||
HideHelpCommand: true,
|
||||
})
|
||||
}
|
||||
|
||||
// Handler for 'minio gateway hdfs' command line.
|
||||
func hdfsGatewayMain(ctx *cli.Context) {
|
||||
// Validate gateway arguments.
|
||||
if ctx.Args().First() == "help" {
|
||||
cli.ShowCommandHelpAndExit(ctx, minio.HDFSBackendGateway, 1)
|
||||
}
|
||||
|
||||
minio.StartGateway(ctx, &HDFS{args: ctx.Args()})
|
||||
}
|
||||
|
||||
// HDFS implements Gateway.
|
||||
type HDFS struct {
|
||||
args []string
|
||||
}
|
||||
|
||||
// Name implements Gateway interface.
|
||||
func (g *HDFS) Name() string {
|
||||
return minio.HDFSBackendGateway
|
||||
}
|
||||
|
||||
func getKerberosClient() (*krb.Client, error) {
|
||||
cfg, err := config.Load(env.Get("KRB5_CONFIG", "/etc/krb5.conf"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u, err := user.Current()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
keytabPath := env.Get("KRB5KEYTAB", "")
|
||||
if keytabPath != "" {
|
||||
kt, err := keytab.Load(keytabPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
username := env.Get("KRB5USERNAME", "")
|
||||
realm := env.Get("KRB5REALM", "")
|
||||
if username == "" || realm == "" {
|
||||
return nil, errors.New("empty KRB5USERNAME or KRB5REALM")
|
||||
}
|
||||
|
||||
return krb.NewWithKeytab(username, realm, kt, cfg), nil
|
||||
}
|
||||
|
||||
// Determine the ccache location from the environment, falling back to the default location.
|
||||
ccachePath := env.Get("KRB5CCNAME", fmt.Sprintf("/tmp/krb5cc_%s", u.Uid))
|
||||
if strings.Contains(ccachePath, ":") {
|
||||
if strings.HasPrefix(ccachePath, "FILE:") {
|
||||
ccachePath = strings.TrimPrefix(ccachePath, "FILE:")
|
||||
} else {
|
||||
return nil, fmt.Errorf("unable to use kerberos ccache: %s", ccachePath)
|
||||
}
|
||||
}
|
||||
|
||||
ccache, err := credentials.LoadCCache(ccachePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return krb.NewFromCCache(ccache, cfg)
|
||||
}
|
||||
|
||||
// NewGatewayLayer returns hdfs gatewaylayer.
|
||||
func (g *HDFS) NewGatewayLayer(creds madmin.Credentials) (minio.ObjectLayer, error) {
|
||||
dialFunc := (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext
|
||||
|
||||
hconfig, err := hadoopconf.LoadFromEnvironment()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts := hdfs.ClientOptionsFromConf(hconfig)
|
||||
opts.NamenodeDialFunc = dialFunc
|
||||
opts.DatanodeDialFunc = dialFunc
|
||||
|
||||
// Not addresses found, load it from command line.
|
||||
var commonPath string
|
||||
if len(opts.Addresses) == 0 {
|
||||
var addresses []string
|
||||
for _, s := range g.args {
|
||||
u, err := xnet.ParseURL(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if u.Scheme != "hdfs" {
|
||||
return nil, fmt.Errorf("unsupported scheme %s, only supports hdfs://", u)
|
||||
}
|
||||
if commonPath != "" && commonPath != u.Path {
|
||||
return nil, fmt.Errorf("all namenode paths should be same %s", g.args)
|
||||
}
|
||||
if commonPath == "" {
|
||||
commonPath = u.Path
|
||||
}
|
||||
addresses = append(addresses, u.Host)
|
||||
}
|
||||
opts.Addresses = addresses
|
||||
}
|
||||
|
||||
u, err := user.Current()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to lookup local user: %s", err)
|
||||
}
|
||||
|
||||
if opts.KerberosClient != nil {
|
||||
opts.KerberosClient, err = getKerberosClient()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize kerberos client: %s", err)
|
||||
}
|
||||
} else {
|
||||
opts.User = env.Get("HADOOP_USER_NAME", u.Username)
|
||||
}
|
||||
|
||||
clnt, err := hdfs.NewClient(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize hdfsClient: %v", err)
|
||||
}
|
||||
|
||||
if err = clnt.MkdirAll(minio.PathJoin(commonPath, hdfsSeparator, minioMetaTmpBucket), os.FileMode(0o755)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &hdfsObjects{clnt: clnt, subPath: commonPath, listPool: minio.NewTreeWalkPool(time.Minute * 30)}, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) Shutdown(ctx context.Context) error {
|
||||
return n.clnt.Close()
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) LocalStorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) {
|
||||
return n.StorageInfo(ctx)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) {
|
||||
fsInfo, err := n.clnt.StatFs()
|
||||
if err != nil {
|
||||
return minio.StorageInfo{}, []error{err}
|
||||
}
|
||||
si.Disks = []madmin.Disk{{
|
||||
UsedSpace: fsInfo.Used,
|
||||
}}
|
||||
si.Backend.Type = madmin.Gateway
|
||||
si.Backend.GatewayOnline = true
|
||||
return si, nil
|
||||
}
|
||||
|
||||
// hdfsObjects implements gateway for Minio and S3 compatible object storage servers.
|
||||
type hdfsObjects struct {
|
||||
minio.GatewayUnsupported
|
||||
clnt *hdfs.Client
|
||||
subPath string
|
||||
listPool *minio.TreeWalkPool
|
||||
}
|
||||
|
||||
func hdfsToObjectErr(ctx context.Context, err error, params ...string) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
bucket := ""
|
||||
object := ""
|
||||
uploadID := ""
|
||||
switch len(params) {
|
||||
case 3:
|
||||
uploadID = params[2]
|
||||
fallthrough
|
||||
case 2:
|
||||
object = params[1]
|
||||
fallthrough
|
||||
case 1:
|
||||
bucket = params[0]
|
||||
}
|
||||
|
||||
switch {
|
||||
case os.IsNotExist(err):
|
||||
if uploadID != "" {
|
||||
return minio.InvalidUploadID{
|
||||
UploadID: uploadID,
|
||||
}
|
||||
}
|
||||
if object != "" {
|
||||
return minio.ObjectNotFound{Bucket: bucket, Object: object}
|
||||
}
|
||||
return minio.BucketNotFound{Bucket: bucket}
|
||||
case os.IsExist(err):
|
||||
if object != "" {
|
||||
return minio.PrefixAccessDenied{Bucket: bucket, Object: object}
|
||||
}
|
||||
return minio.BucketAlreadyOwnedByYou{Bucket: bucket}
|
||||
case errors.Is(err, syscall.ENOTEMPTY):
|
||||
if object != "" {
|
||||
return minio.PrefixAccessDenied{Bucket: bucket, Object: object}
|
||||
}
|
||||
return minio.BucketNotEmpty{Bucket: bucket}
|
||||
default:
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// hdfsIsValidBucketName verifies whether a bucket name is valid.
|
||||
func hdfsIsValidBucketName(bucket string) bool {
|
||||
return s3utils.CheckValidBucketNameStrict(bucket) == nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) hdfsPathJoin(args ...string) string {
|
||||
return minio.PathJoin(append([]string{n.subPath, hdfsSeparator}, args...)...)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, opts minio.DeleteBucketOptions) error {
|
||||
if !hdfsIsValidBucketName(bucket) {
|
||||
return minio.BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
if opts.Force {
|
||||
return hdfsToObjectErr(ctx, n.clnt.RemoveAll(n.hdfsPathJoin(bucket)), bucket)
|
||||
}
|
||||
return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(bucket)), bucket)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts minio.BucketOptions) error {
|
||||
if opts.LockEnabled || opts.VersioningEnabled {
|
||||
return minio.NotImplemented{}
|
||||
}
|
||||
|
||||
if !hdfsIsValidBucketName(bucket) {
|
||||
return minio.BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
return hdfsToObjectErr(ctx, n.clnt.Mkdir(n.hdfsPathJoin(bucket), os.FileMode(0o755)), bucket)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
|
||||
fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return bi, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
// As hdfs.Stat() doesn't carry anything other than ModTime(), use ModTime() as CreatedTime.
|
||||
return minio.BucketInfo{
|
||||
Name: bucket,
|
||||
Created: fi.ModTime(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
|
||||
entries, err := n.clnt.ReadDir(n.hdfsPathJoin())
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, hdfsToObjectErr(ctx, err)
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
// Ignore all reserved bucket names and invalid bucket names.
|
||||
if isReservedOrInvalidBucket(entry.Name(), false) {
|
||||
continue
|
||||
}
|
||||
buckets = append(buckets, minio.BucketInfo{
|
||||
Name: entry.Name(),
|
||||
// As hdfs.Stat() doesnt carry CreatedTime, use ModTime() as CreatedTime.
|
||||
Created: entry.ModTime(),
|
||||
})
|
||||
}
|
||||
|
||||
// Sort bucket infos by bucket name.
|
||||
sort.Sort(byBucketName(buckets))
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) isLeafDir(bucket, leafPath string) bool {
|
||||
return n.isObjectDir(context.Background(), bucket, leafPath)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) isLeaf(bucket, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, hdfsSeparator)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
f, err := n.clnt.Open(n.hdfsPathJoin(bucket, prefixDir))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
logger.LogIf(minio.GlobalContext, err)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
fis, err := f.Readdir(0)
|
||||
if err != nil {
|
||||
logger.LogIf(minio.GlobalContext, err)
|
||||
return
|
||||
}
|
||||
if len(fis) == 0 {
|
||||
return true, nil, false
|
||||
}
|
||||
for _, fi := range fis {
|
||||
if fi.IsDir() {
|
||||
entries = append(entries, fi.Name()+hdfsSeparator)
|
||||
} else {
|
||||
entries = append(entries, fi.Name())
|
||||
}
|
||||
}
|
||||
entries, delayIsLeaf = minio.FilterListEntries(bucket, prefixDir, entries, prefixEntry, n.isLeaf)
|
||||
return false, entries, delayIsLeaf
|
||||
}
|
||||
|
||||
// Return list factory instance.
|
||||
return listDir
|
||||
}
|
||||
|
||||
// ListObjects lists all blobs in HDFS bucket filtered by prefix.
|
||||
func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
|
||||
var mutex sync.Mutex
|
||||
fileInfos := make(map[string]os.FileInfo)
|
||||
targetPath := n.hdfsPathJoin(bucket, prefix)
|
||||
|
||||
var targetFileInfo os.FileInfo
|
||||
|
||||
if targetFileInfo, err = n.populateDirectoryListing(targetPath, fileInfos); err != nil {
|
||||
return loi, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
// If the user is trying to list a single file, bypass the entire directory-walking code below
|
||||
// and just return the single file's information.
|
||||
if !targetFileInfo.IsDir() {
|
||||
return minio.ListObjectsInfo{
|
||||
IsTruncated: false,
|
||||
NextMarker: "",
|
||||
Objects: []minio.ObjectInfo{
|
||||
fileInfoToObjectInfo(bucket, prefix, targetFileInfo),
|
||||
},
|
||||
Prefixes: []string{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
filePath := path.Clean(n.hdfsPathJoin(bucket, entry))
|
||||
fi, ok := fileInfos[filePath]
|
||||
|
||||
// If the file info is not known, this may be a recursive listing and filePath is a
|
||||
// child of a sub-directory. In this case, obtain that sub-directory's listing.
|
||||
if !ok {
|
||||
parentPath := path.Dir(filePath)
|
||||
|
||||
if _, err := n.populateDirectoryListing(parentPath, fileInfos); err != nil {
|
||||
return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
fi, ok = fileInfos[filePath]
|
||||
|
||||
if !ok {
|
||||
err = fmt.Errorf("could not get FileInfo for path '%s'", filePath)
|
||||
return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket, entry)
|
||||
}
|
||||
}
|
||||
|
||||
objectInfo := fileInfoToObjectInfo(bucket, entry, fi)
|
||||
|
||||
delete(fileInfos, filePath)
|
||||
|
||||
return objectInfo, nil
|
||||
}
|
||||
|
||||
return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), n.isLeaf, n.isLeafDir, getObjectInfo, getObjectInfo)
|
||||
}
|
||||
|
||||
func fileInfoToObjectInfo(bucket string, entry string, fi os.FileInfo) minio.ObjectInfo {
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: entry,
|
||||
ModTime: fi.ModTime(),
|
||||
Size: fi.Size(),
|
||||
IsDir: fi.IsDir(),
|
||||
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
|
||||
}
|
||||
}
|
||||
|
||||
// Lists a path's direct, first-level entries and populates them in the `fileInfos` cache which maps
|
||||
// a path entry to an `os.FileInfo`. It also saves the listed path's `os.FileInfo` in the cache.
|
||||
func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[string]os.FileInfo) (os.FileInfo, error) {
|
||||
dirReader, err := n.clnt.Open(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dirStat := dirReader.Stat()
|
||||
key := path.Clean(filePath)
|
||||
|
||||
if !dirStat.IsDir() {
|
||||
return dirStat, nil
|
||||
}
|
||||
|
||||
fileInfos[key] = dirStat
|
||||
infos, err := dirReader.Readdir(0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, fileInfo := range infos {
|
||||
filePath := minio.PathJoin(filePath, fileInfo.Name())
|
||||
fileInfos[filePath] = fileInfo
|
||||
}
|
||||
|
||||
return dirStat, nil
|
||||
}
|
||||
|
||||
// deleteObject deletes a file path if its empty. If it's successfully deleted,
|
||||
// it will recursively move up the tree, deleting empty parent directories
|
||||
// until it finds one with files in it. Returns nil for a non-empty directory.
|
||||
func (n *hdfsObjects) deleteObject(basePath, deletePath string) error {
|
||||
if basePath == deletePath {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Attempt to remove path.
|
||||
if err := n.clnt.Remove(deletePath); err != nil {
|
||||
if errors.Is(err, syscall.ENOTEMPTY) {
|
||||
// Ignore errors if the directory is not empty. The server relies on
|
||||
// this functionality, and sometimes uses recursion that should not
|
||||
// error on parent directories.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Trailing slash is removed when found to ensure
|
||||
// slashpath.Dir() to work as intended.
|
||||
deletePath = strings.TrimSuffix(deletePath, hdfsSeparator)
|
||||
deletePath = path.Dir(deletePath)
|
||||
|
||||
// Delete parent directory. Errors for parent directories shouldn't trickle down.
|
||||
n.deleteObject(basePath, deletePath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListObjectsV2 lists all blobs in HDFS bucket filtered by prefix
|
||||
func (n *hdfsObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int,
|
||||
fetchOwner bool, startAfter string,
|
||||
) (loi minio.ListObjectsV2Info, err error) {
|
||||
// fetchOwner is not supported and unused.
|
||||
marker := continuationToken
|
||||
if marker == "" {
|
||||
marker = startAfter
|
||||
}
|
||||
resultV1, err := n.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
|
||||
if err != nil {
|
||||
return loi, err
|
||||
}
|
||||
return minio.ListObjectsV2Info{
|
||||
Objects: resultV1.Objects,
|
||||
Prefixes: resultV1.Prefixes,
|
||||
ContinuationToken: continuationToken,
|
||||
NextContinuationToken: resultV1.NextMarker,
|
||||
IsTruncated: resultV1.IsTruncated,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) DeleteObject(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) {
|
||||
err := hdfsToObjectErr(ctx, n.deleteObject(n.hdfsPathJoin(bucket), n.hdfsPathJoin(bucket, object)), bucket, object)
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) DeleteObjects(ctx context.Context, bucket string, objects []minio.ObjectToDelete, opts minio.ObjectOptions) ([]minio.DeletedObject, []error) {
|
||||
errs := make([]error, len(objects))
|
||||
dobjects := make([]minio.DeletedObject, len(objects))
|
||||
for idx, object := range objects {
|
||||
_, errs[idx] = n.DeleteObject(ctx, bucket, object.ObjectName, opts)
|
||||
if errs[idx] == nil {
|
||||
dobjects[idx] = minio.DeletedObject{
|
||||
ObjectName: object.ObjectName,
|
||||
}
|
||||
}
|
||||
}
|
||||
return dobjects, errs
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
|
||||
objInfo, err := n.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var startOffset, length int64
|
||||
startOffset, length, err = rs.GetOffsetLength(objInfo.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
nerr := n.getObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts)
|
||||
pw.CloseWithError(nerr)
|
||||
}()
|
||||
|
||||
// Setup cleanup function to cause the above go-routine to
|
||||
// exit in case of partial read
|
||||
pipeCloser := func() { pr.Close() }
|
||||
return minio.NewGetObjectReaderFromReader(pr, objInfo, opts, pipeCloser)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.ObjectInfo, error) {
|
||||
cpSrcDstSame := minio.IsStringEqual(n.hdfsPathJoin(srcBucket, srcObject), n.hdfsPathJoin(dstBucket, dstObject))
|
||||
if cpSrcDstSame {
|
||||
return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
|
||||
}
|
||||
|
||||
return n.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, minio.ObjectOptions{
|
||||
ServerSideEncryption: dstOpts.ServerSideEncryption,
|
||||
UserDefined: srcInfo.UserDefined,
|
||||
})
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) getObject(ctx context.Context, bucket, key string, startOffset, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
||||
if _, err := n.clnt.Stat(n.hdfsPathJoin(bucket)); err != nil {
|
||||
return hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
rd, err := n.clnt.Open(n.hdfsPathJoin(bucket, key))
|
||||
if err != nil {
|
||||
return hdfsToObjectErr(ctx, err, bucket, key)
|
||||
}
|
||||
defer rd.Close()
|
||||
_, err = io.Copy(writer, io.NewSectionReader(rd, startOffset, length))
|
||||
if err == io.ErrClosedPipe {
|
||||
// hdfs library doesn't send EOF correctly, so io.Copy attempts
|
||||
// to write which returns io.ErrClosedPipe - just ignore
|
||||
// this for now.
|
||||
err = nil
|
||||
}
|
||||
return hdfsToObjectErr(ctx, err, bucket, key)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) isObjectDir(ctx context.Context, bucket, object string) bool {
|
||||
f, err := n.clnt.Open(n.hdfsPathJoin(bucket, object))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return false
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
return false
|
||||
}
|
||||
defer f.Close()
|
||||
fis, err := f.Readdir(1)
|
||||
if err != nil && err != io.EOF {
|
||||
logger.LogIf(ctx, err)
|
||||
return false
|
||||
}
|
||||
// Readdir returns an io.EOF when len(fis) == 0.
|
||||
return len(fis) == 0
|
||||
}
|
||||
|
||||
// GetObjectInfo reads object info and replies back ObjectInfo.
|
||||
func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
if strings.HasSuffix(object, hdfsSeparator) && !n.isObjectDir(ctx, bucket, object) {
|
||||
return objInfo, hdfsToObjectErr(ctx, os.ErrNotExist, bucket, object)
|
||||
}
|
||||
|
||||
fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket, object))
|
||||
if err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ModTime: fi.ModTime(),
|
||||
Size: fi.Size(),
|
||||
IsDir: fi.IsDir(),
|
||||
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
name := n.hdfsPathJoin(bucket, object)
|
||||
|
||||
// If its a directory create a prefix {
|
||||
if strings.HasSuffix(object, hdfsSeparator) && r.Size() == 0 {
|
||||
if err = n.clnt.MkdirAll(name, os.FileMode(0o755)); err != nil {
|
||||
n.deleteObject(n.hdfsPathJoin(bucket), name)
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
} else {
|
||||
tmpname := n.hdfsPathJoin(minioMetaTmpBucket, minio.MustGetUUID())
|
||||
var w *hdfs.FileWriter
|
||||
w, err = n.clnt.Create(tmpname)
|
||||
if err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
defer n.deleteObject(n.hdfsPathJoin(minioMetaTmpBucket), tmpname)
|
||||
if _, err = io.Copy(w, r); err != nil {
|
||||
w.Close()
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
dir := path.Dir(name)
|
||||
if dir != "" {
|
||||
if err = n.clnt.MkdirAll(dir, os.FileMode(0o755)); err != nil {
|
||||
w.Close()
|
||||
n.deleteObject(n.hdfsPathJoin(bucket), dir)
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
}
|
||||
w.Close()
|
||||
if err = n.clnt.Rename(tmpname, name); err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
}
|
||||
fi, err := n.clnt.Stat(name)
|
||||
if err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ETag: r.MD5CurrentHexString(),
|
||||
ModTime: fi.ModTime(),
|
||||
Size: fi.Size(),
|
||||
IsDir: fi.IsDir(),
|
||||
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (uploadID string, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return uploadID, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
uploadID = minio.MustGetUUID()
|
||||
if err = n.clnt.CreateEmptyFile(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)); err != nil {
|
||||
return uploadID, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
return uploadID, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return lmi, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
// It's decided not to support List Multipart Uploads, hence returning empty result.
|
||||
return lmi, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(minioMetaTmpBucket, uploadID))
|
||||
if err != nil {
|
||||
return hdfsToObjectErr(ctx, err, bucket, object, uploadID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart info of the uploadId of the object
|
||||
func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return result, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return result, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
// It's decided not to support List parts, hence returning empty result.
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int,
|
||||
startOffset int64, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions,
|
||||
) (minio.PartInfo, error) {
|
||||
return n.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (info minio.PartInfo, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return info, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
var w *hdfs.FileWriter
|
||||
w, err = n.clnt.Append(n.hdfsPathJoin(minioMetaTmpBucket, uploadID))
|
||||
if err != nil {
|
||||
return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID)
|
||||
}
|
||||
defer w.Close()
|
||||
_, err = io.Copy(w, r.Reader)
|
||||
if err != nil {
|
||||
return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
info.PartNumber = partID
|
||||
info.ETag = r.MD5CurrentHexString()
|
||||
info.LastModified = minio.UTCNow()
|
||||
info.Size = r.Reader.Size()
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, parts []minio.CompletePart, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
name := n.hdfsPathJoin(bucket, object)
|
||||
dir := path.Dir(name)
|
||||
if dir != "" {
|
||||
if err = n.clnt.MkdirAll(dir, os.FileMode(0o755)); err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name)
|
||||
// Object already exists is an error on HDFS
|
||||
// remove it and then create it again.
|
||||
if os.IsExist(err) {
|
||||
if err = n.clnt.Remove(name); err != nil {
|
||||
if dir != "" {
|
||||
n.deleteObject(n.hdfsPathJoin(bucket), dir)
|
||||
}
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
if err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name); err != nil {
|
||||
if dir != "" {
|
||||
n.deleteObject(n.hdfsPathJoin(bucket), dir)
|
||||
}
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
}
|
||||
fi, err := n.clnt.Stat(name)
|
||||
if err != nil {
|
||||
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
|
||||
}
|
||||
|
||||
// Calculate s3 compatible md5sum for complete multipart.
|
||||
s3MD5 := minio.ComputeCompleteMultipartMD5(parts)
|
||||
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ETag: s3MD5,
|
||||
ModTime: fi.ModTime(),
|
||||
Size: fi.Size(),
|
||||
IsDir: fi.IsDir(),
|
||||
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (err error) {
|
||||
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
|
||||
if err != nil {
|
||||
return hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)), bucket, object, uploadID)
|
||||
}
|
||||
@@ -191,7 +191,7 @@ func healingMetricsPrometheus(ch chan<- prometheus.Metric) {
|
||||
// collects gateway specific metrics for MinIO instance in Prometheus specific format
|
||||
// and sends to given channel
|
||||
func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
|
||||
if !globalIsGateway || (globalGatewayName != S3BackendGateway && globalGatewayName != AzureBackendGateway && globalGatewayName != GCSBackendGateway) {
|
||||
if !globalIsGateway || (globalGatewayName != S3BackendGateway && globalGatewayName != AzureBackendGateway) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user