/* * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "bytes" "context" "io/ioutil" "math/rand" "os" "path" "reflect" "testing" "time" humanize "github.com/dustin/go-humanize" "github.com/minio/minio/pkg/madmin" ) func TestRepeatPutObjectPart(t *testing.T) { var objLayer ObjectLayer var disks []string var err error var opts ObjectOptions objLayer, disks, err = prepareXL16() if err != nil { t.Fatal(err) } // cleaning up of temporary test directories defer removeRoots(disks) err = objLayer.MakeBucketWithLocation(context.Background(), "bucket1", "") if err != nil { t.Fatal(err) } uploadID, err := objLayer.NewMultipartUpload(context.Background(), "bucket1", "mpartObj1", opts) if err != nil { t.Fatal(err) } fiveMBBytes := bytes.Repeat([]byte("a"), 5*humanize.MiByte) md5Hex := getMD5Hash(fiveMBBytes) _, err = objLayer.PutObjectPart(context.Background(), "bucket1", "mpartObj1", uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(fiveMBBytes), 5*humanize.MiByte, md5Hex, ""), opts) if err != nil { t.Fatal(err) } // PutObjectPart should succeed even if part already exists. ref: https://github.com/minio/minio/issues/1930 _, err = objLayer.PutObjectPart(context.Background(), "bucket1", "mpartObj1", uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(fiveMBBytes), 5*humanize.MiByte, md5Hex, ""), opts) if err != nil { t.Fatal(err) } } func TestXLDeleteObjectBasic(t *testing.T) { testCases := []struct { bucket string object string expectedErr error }{ {".test", "dir/obj", BucketNameInvalid{Bucket: ".test"}}, {"----", "dir/obj", BucketNameInvalid{Bucket: "----"}}, {"bucket", "", ObjectNameInvalid{Bucket: "bucket", Object: ""}}, {"bucket", "doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "doesnotexist"}}, {"bucket", "dir/doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "dir/doesnotexist"}}, {"bucket", "dir", ObjectNotFound{Bucket: "bucket", Object: "dir"}}, {"bucket", "dir/", ObjectNotFound{Bucket: "bucket", Object: "dir/"}}, {"bucket", "dir/obj", nil}, } // Create an instance of xl backend xl, fsDirs, err := prepareXL16() if err != nil { t.Fatal(err) } err = xl.MakeBucketWithLocation(context.Background(), "bucket", "") if err != nil { t.Fatal(err) } // Create object "dir/obj" under bucket "bucket" for Test 7 to pass _, err = xl.PutObject(context.Background(), "bucket", "dir/obj", mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{}) if err != nil { t.Fatalf("XL Object upload failed: %s", err) } for i, test := range testCases { actualErr := xl.DeleteObject(context.Background(), test.bucket, test.object) if test.expectedErr != nil && actualErr != test.expectedErr { t.Errorf("Test %d: Expected to fail with %s, but failed with %s", i+1, test.expectedErr, actualErr) } if test.expectedErr == nil && actualErr != nil { t.Errorf("Test %d: Expected to pass, but failed with %s", i+1, actualErr) } } // Cleanup backend directories removeRoots(fsDirs) } func TestXLDeleteObjectDiskNotFound(t *testing.T) { // Reset global storage class flags resetGlobalStorageEnvs() // Create an instance of xl backend. obj, fsDirs, err := prepareXL16() if err != nil { t.Fatal(err) } xl := obj.(*xlObjects) // Create "bucket" err = obj.MakeBucketWithLocation(context.Background(), "bucket", "") if err != nil { t.Fatal(err) } bucket := "bucket" object := "object" opts := ObjectOptions{} // Create object "obj" under bucket "bucket". _, err = obj.PutObject(context.Background(), bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) if err != nil { t.Fatal(err) } // for a 16 disk setup, quorum is 9. To simulate disks not found yet // quorum is available, we remove disks leaving quorum disks behind. for i := range xl.storageDisks[:7] { xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i], nil, errFaultyDisk) } err = obj.DeleteObject(context.Background(), bucket, object) if err != nil { t.Fatal(err) } // Create "obj" under "bucket". _, err = obj.PutObject(context.Background(), bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) if err != nil { t.Fatal(err) } // Remove one more disk to 'lose' quorum, by setting it to nil. xl.storageDisks[7] = nil xl.storageDisks[8] = nil err = obj.DeleteObject(context.Background(), bucket, object) // since majority of disks are not available, metaquorum is not achieved and hence errXLReadQuorum error if err != toObjectErr(errXLReadQuorum, bucket, object) { t.Errorf("Expected deleteObject to fail with %v, but failed with %v", toObjectErr(errXLReadQuorum, bucket, object), err) } // Cleanup backend directories removeRoots(fsDirs) } func TestGetObjectNoQuorum(t *testing.T) { // Create an instance of xl backend. obj, fsDirs, err := prepareXL16() if err != nil { t.Fatal(err) } xl := obj.(*xlObjects) // Create "bucket" err = obj.MakeBucketWithLocation(context.Background(), "bucket", "") if err != nil { t.Fatal(err) } bucket := "bucket" object := "object" opts := ObjectOptions{} // Create "object" under "bucket". _, err = obj.PutObject(context.Background(), bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) if err != nil { t.Fatal(err) } // Make 9 disks offline, which leaves less than quorum number of disks // in a 16 disk XL setup. The original disks are 'replaced' with // naughtyDisks that fail after 'f' successful StorageAPI method // invocations, where f - [0,2) for f := 0; f < 2; f++ { diskErrors := make(map[int]error) for i := 0; i <= f; i++ { diskErrors[i] = nil } for i := range xl.storageDisks[:9] { switch diskType := xl.storageDisks[i].(type) { case *naughtyDisk: xl.storageDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk) default: xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i], diskErrors, errFaultyDisk) } } // Fetch object from store. err = xl.GetObject(context.Background(), bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) if err != toObjectErr(errXLReadQuorum, bucket, object) { t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err) } } // Cleanup backend directories. removeRoots(fsDirs) } func TestPutObjectNoQuorum(t *testing.T) { // Create an instance of xl backend. obj, fsDirs, err := prepareXL16() if err != nil { t.Fatal(err) } xl := obj.(*xlObjects) // Create "bucket" err = obj.MakeBucketWithLocation(context.Background(), "bucket", "") if err != nil { t.Fatal(err) } bucket := "bucket" object := "object" opts := ObjectOptions{} // Create "object" under "bucket". _, err = obj.PutObject(context.Background(), bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) if err != nil { t.Fatal(err) } // Make 9 disks offline, which leaves less than quorum number of disks // in a 16 disk XL setup. The original disks are 'replaced' with // naughtyDisks that fail after 'f' successful StorageAPI method // invocations, where f - [0,3) for f := 0; f < 3; f++ { diskErrors := make(map[int]error) for i := 0; i <= f; i++ { diskErrors[i] = nil } for i := range xl.storageDisks[:9] { switch diskType := xl.storageDisks[i].(type) { case *naughtyDisk: xl.storageDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk) default: xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i], diskErrors, errFaultyDisk) } } // Upload new content to same object "object" _, err = obj.PutObject(context.Background(), bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) if err != toObjectErr(errXLWriteQuorum, bucket, object) { t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err) } } // Cleanup backend directories. removeRoots(fsDirs) } // Tests both object and bucket healing. func TestHealing(t *testing.T) { obj, fsDirs, err := prepareXL16() if err != nil { t.Fatal(err) } defer removeRoots(fsDirs) xl := obj.(*xlObjects) // Create "bucket" err = obj.MakeBucketWithLocation(context.Background(), "bucket", "") if err != nil { t.Fatal(err) } bucket := "bucket" object := "object" data := make([]byte, 1*humanize.MiByte) length := int64(len(data)) _, err = rand.Read(data) if err != nil { t.Fatal(err) } _, err = obj.PutObject(context.Background(), bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), length, "", ""), ObjectOptions{}) if err != nil { t.Fatal(err) } disk := xl.storageDisks[0] xlMetaPreHeal, err := readXLMeta(context.Background(), disk, bucket, object) if err != nil { t.Fatal(err) } // Remove the object - to simulate the case where the disk was down when the object // was created. err = os.RemoveAll(path.Join(fsDirs[0], bucket, object)) if err != nil { t.Fatal(err) } _, err = xl.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan) if err != nil { t.Fatal(err) } xlMetaPostHeal, err := readXLMeta(context.Background(), disk, bucket, object) if err != nil { t.Fatal(err) } // After heal the meta file should be as expected. if !reflect.DeepEqual(xlMetaPreHeal, xlMetaPostHeal) { t.Fatal("HealObject failed") } err = os.RemoveAll(path.Join(fsDirs[0], bucket, object, "xl.json")) if err != nil { t.Fatal(err) } // Write xl.json with different modtime to simulate the case where a disk had // gone down when an object was replaced by a new object. xlMetaOutDated := xlMetaPreHeal xlMetaOutDated.Stat.ModTime = time.Now() err = writeXLMetadata(context.Background(), disk, bucket, object, xlMetaOutDated) if err != nil { t.Fatal(err) } _, err = xl.HealObject(context.Background(), bucket, object, false, false, madmin.HealDeepScan) if err != nil { t.Fatal(err) } xlMetaPostHeal, err = readXLMeta(context.Background(), disk, bucket, object) if err != nil { t.Fatal(err) } // After heal the meta file should be as expected. if !reflect.DeepEqual(xlMetaPreHeal, xlMetaPostHeal) { t.Fatal("HealObject failed") } // Remove the bucket - to simulate the case where bucket was // created when the disk was down. err = os.RemoveAll(path.Join(fsDirs[0], bucket)) if err != nil { t.Fatal(err) } // This would create the bucket. _, err = xl.HealBucket(context.Background(), bucket, false, false) if err != nil { t.Fatal(err) } // Stat the bucket to make sure that it was created. _, err = xl.storageDisks[0].StatVol(bucket) if err != nil { t.Fatal(err) } }