diff --git a/.github/workflows/replication.yaml b/.github/workflows/replication.yaml index 6a1f435d0..69cc04485 100644 --- a/.github/workflows/replication.yaml +++ b/.github/workflows/replication.yaml @@ -1,4 +1,4 @@ -name: Multi-site replication tests +name: MinIO advanced tests on: pull_request: @@ -16,7 +16,7 @@ permissions: jobs: replication-test: - name: Replication Tests with Go ${{ matrix.go-version }} + name: Advanced Tests with Go ${{ matrix.go-version }} runs-on: ubuntu-latest strategy: @@ -37,11 +37,18 @@ jobs: key: ${{ runner.os }}-${{ matrix.go-version }}-go-${{ hashFiles('**/go.sum') }} restore-keys: | ${{ runner.os }}-${{ matrix.go-version }}-go- + - name: Test Decom + run: | + sudo sysctl net.ipv6.conf.all.disable_ipv6=0 + sudo sysctl net.ipv6.conf.default.disable_ipv6=0 + make test-decom + - name: Test Replication run: | sudo sysctl net.ipv6.conf.all.disable_ipv6=0 sudo sysctl net.ipv6.conf.default.disable_ipv6=0 make test-replication + - name: Test MinIO IDP for automatic site replication run: | sudo sysctl net.ipv6.conf.all.disable_ipv6=0 diff --git a/Makefile b/Makefile index efc5dbbec..7d906b667 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,10 @@ test: verifiers build ## builds minio, runs linters, tests @echo "Running unit tests" @CGO_ENABLED=0 go test -tags kqueue ./... +test-decom: install + @echo "Running minio decom tests" + @env bash $(PWD)/docs/distributed/decom.sh + test-upgrade: build @echo "Running minio upgrade tests" @(env bash $(PWD)/buildscripts/minio-upgrade.sh) diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index f79ae869e..7ba734640 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -53,6 +53,9 @@ type PoolDecommissionInfo struct { // Last bucket/object decommissioned. Bucket string `json:"-" msg:"bkt"` + // Captures prefix that is currently being + // decommissioned inside the 'Bucket' + Prefix string `json:"-" msg:"pfx"` Object string `json:"-" msg:"obj"` // Verbose information @@ -73,6 +76,7 @@ func (pd *PoolDecommissionInfo) bucketPop(bucket string) { // Clear tracker info. if pd.Bucket == bucket { pd.Bucket = "" // empty this out for next bucket + pd.Prefix = "" // empty this out for the next bucket pd.Object = "" // empty this out for next object } return @@ -80,12 +84,6 @@ func (pd *PoolDecommissionInfo) bucketPop(bucket string) { } } -func (pd *PoolDecommissionInfo) bucketsToDecommission() []string { - queuedBuckets := make([]string, len(pd.QueuedBuckets)) - copy(queuedBuckets, pd.QueuedBuckets) - return queuedBuckets -} - func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool { for _, b := range pd.DecommissionedBuckets { if b == bucket { @@ -95,17 +93,18 @@ func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool { return false } -func (pd *PoolDecommissionInfo) bucketPush(bucket string) { +func (pd *PoolDecommissionInfo) bucketPush(bucket decomBucketInfo) { for _, b := range pd.QueuedBuckets { if pd.isBucketDecommissioned(b) { return } - if b == bucket { + if b == bucket.String() { return } } - pd.QueuedBuckets = append(pd.QueuedBuckets, bucket) - pd.Bucket = bucket + pd.QueuedBuckets = append(pd.QueuedBuckets, bucket.String()) + pd.Bucket = bucket.Name + pd.Prefix = bucket.Prefix } // PoolStatus captures current pool status @@ -183,12 +182,12 @@ func (p poolMeta) isBucketDecommissioned(idx int, bucket string) bool { return p.Pools[idx].Decommission.isBucketDecommissioned(bucket) } -func (p *poolMeta) BucketDone(idx int, bucket string) { +func (p *poolMeta) BucketDone(idx int, bucket decomBucketInfo) { if p.Pools[idx].Decommission == nil { // Decommission not in progress. return } - p.Pools[idx].Decommission.bucketPop(bucket) + p.Pools[idx].Decommission.bucketPop(bucket.String()) } func (p poolMeta) ResumeBucketObject(idx int) (bucket, object string) { @@ -208,19 +207,38 @@ func (p *poolMeta) TrackCurrentBucketObject(idx int, bucket string, object strin p.Pools[idx].Decommission.Object = object } -func (p *poolMeta) PendingBuckets(idx int) []string { +func (p *poolMeta) PendingBuckets(idx int) []decomBucketInfo { if p.Pools[idx].Decommission == nil { // Decommission not in progress. return nil } - return p.Pools[idx].Decommission.bucketsToDecommission() + decomBuckets := make([]decomBucketInfo, len(p.Pools[idx].Decommission.QueuedBuckets)) + for i := range decomBuckets { + bucket, prefix := path2BucketObject(p.Pools[idx].Decommission.QueuedBuckets[i]) + decomBuckets[i] = decomBucketInfo{ + Name: bucket, + Prefix: prefix, + } + } + + return decomBuckets } -func (p *poolMeta) QueueBuckets(idx int, buckets []BucketInfo) { +//msgp:ignore decomBucketInfo +type decomBucketInfo struct { + Name string + Prefix string +} + +func (db decomBucketInfo) String() string { + return pathJoin(db.Name, db.Prefix) +} + +func (p *poolMeta) QueueBuckets(idx int, buckets []decomBucketInfo) { // add new queued buckets for _, bucket := range buckets { - p.Pools[idx].Decommission.bucketPush(bucket.Name) + p.Pools[idx].Decommission.bucketPush(bucket) } } @@ -607,7 +625,7 @@ func (v versionsSorter) reverse() { }) } -func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error { +func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error { ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{}) var wg sync.WaitGroup @@ -628,7 +646,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool continue } - vc, _ := globalBucketVersioningSys.Get(bName) + vc, _ := globalBucketVersioningSys.Get(bi.Name) decommissionEntry := func(entry metaCacheEntry) { defer func() { <-parallelWorkers @@ -639,7 +657,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool return } - fivs, err := entry.fileInfoVersions(bName) + fivs, err := entry.fileInfoVersions(bi.Name) if err != nil { return } @@ -652,19 +670,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool for _, version := range fivs.Versions { // TODO: Skip transitioned objects for now. if version.IsRemote() { - logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bName, version.Name)) + logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bi.Name, version.Name)) continue } // We will skip decommissioning delete markers // with single version, its as good as there // is no data associated with the object. if version.Deleted && len(fivs.Versions) == 1 { - logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bName, version.Name)) + logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bi.Name, version.Name)) continue } if version.Deleted { _, err := z.DeleteObject(ctx, - bName, + bi.Name, version.Name, ObjectOptions{ Versioned: vc.PrefixEnabled(version.Name), @@ -691,7 +709,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool // gr.Close() is ensured by decommissionObject(). for try := 0; try < 3; try++ { gr, err := set.GetObjectNInfo(ctx, - bName, + bi.Name, encodeDirObject(version.Name), nil, http.Header{}, @@ -708,7 +726,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool logger.LogIf(ctx, err) continue } - if err = z.decommissionObject(ctx, bName, gr); err != nil { + if err = z.decommissionObject(ctx, bi.Name, gr); err != nil { failure = true logger.LogIf(ctx, err) continue @@ -728,19 +746,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool // if all versions were decommissioned, then we can delete the object versions. if decommissionedCount == len(fivs.Versions) { _, err := set.DeleteObject(ctx, - bName, + bi.Name, encodeDirObject(entry.name), ObjectOptions{ DeletePrefix: true, // use prefix delete to delete all versions at once. }, ) - auditLogDecom(ctx, "DecomDeleteObject", bName, entry.name, "", err) + auditLogDecom(ctx, "DecomDeleteObject", bi.Name, entry.name, "", err) if err != nil { logger.LogIf(ctx, err) } } z.poolMetaMutex.Lock() - z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name) + z.poolMeta.TrackCurrentBucketObject(idx, bi.Name, entry.name) ok, err := z.poolMeta.updateAfter(ctx, idx, z.serverPools, 30*time.Second) logger.LogIf(ctx, err) if ok { @@ -753,7 +771,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool resolver := metadataResolutionParams{ dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios objQuorum: len(disks) / 2, // make sure to capture all quorum ratios - bucket: bName, + bucket: bi.Name, } wg.Add(1) @@ -761,7 +779,8 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool defer wg.Done() err := listPathRaw(ctx, listPathRawOptions{ disks: disks, - bucket: bName, + bucket: bi.Name, + path: bi.Prefix, recursive: true, forwardTo: "", minDisks: len(disks) / 2, // to capture all quorum ratios @@ -791,7 +810,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error { pool := z.serverPools[idx] for _, bucket := range z.poolMeta.PendingBuckets(idx) { - if z.poolMeta.isBucketDecommissioned(idx, bucket) { + if z.poolMeta.isBucketDecommissioned(idx, bucket.String()) { if serverDebugLog { console.Debugln("decommission: already done, moving on", bucket) } @@ -803,7 +822,7 @@ func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx i continue } if serverDebugLog { - console.Debugln("decommission: currently on bucket", bucket) + console.Debugln("decommission: currently on bucket", bucket.Name) } if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil { return err @@ -1031,8 +1050,15 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er return err } + decomBuckets := make([]decomBucketInfo, len(buckets)) + for i := range buckets { + decomBuckets[i] = decomBucketInfo{ + Name: buckets[i].Name, + } + } + // TODO: Support decommissioning transition tiers. - for _, bucket := range buckets { + for _, bucket := range decomBuckets { if lc, err := globalLifecycleSys.Get(bucket.Name); err == nil { if lc.HasTransition() { return decomError{ @@ -1059,11 +1085,13 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er // Buckets data are dispersed in multiple zones/sets, make // sure to decommission the necessary metadata. - buckets = append(buckets, BucketInfo{ - Name: pathJoin(minioMetaBucket, minioConfigPrefix), + decomBuckets = append(decomBuckets, decomBucketInfo{ + Name: minioMetaBucket, + Prefix: minioConfigPrefix, }) - buckets = append(buckets, BucketInfo{ - Name: pathJoin(minioMetaBucket, bucketMetaPrefix), + decomBuckets = append(decomBuckets, decomBucketInfo{ + Name: minioMetaBucket, + Prefix: bucketMetaPrefix, }) var pool *erasureSets @@ -1089,7 +1117,7 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er if err = z.poolMeta.Decommission(idx, pi); err != nil { return err } - z.poolMeta.QueueBuckets(idx, buckets) + z.poolMeta.QueueBuckets(idx, decomBuckets) if err = z.poolMeta.save(ctx, z.serverPools); err != nil { return err } diff --git a/cmd/erasure-server-pool-decom_gen.go b/cmd/erasure-server-pool-decom_gen.go index df5e3134e..7fed2ce45 100644 --- a/cmd/erasure-server-pool-decom_gen.go +++ b/cmd/erasure-server-pool-decom_gen.go @@ -110,6 +110,12 @@ func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Bucket") return } + case "pfx": + z.Prefix, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } case "obj": z.Object, err = dc.ReadString() if err != nil { @@ -153,9 +159,9 @@ func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 15 + // map header, size 16 // write "st" - err = en.Append(0x8f, 0xa2, 0x73, 0x74) + err = en.Append(0xde, 0x0, 0x10, 0xa2, 0x73, 0x74) if err != nil { return } @@ -268,6 +274,16 @@ func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Bucket") return } + // write "pfx" + err = en.Append(0xa3, 0x70, 0x66, 0x78) + if err != nil { + return + } + err = en.WriteString(z.Prefix) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } // write "obj" err = en.Append(0xa3, 0x6f, 0x62, 0x6a) if err != nil { @@ -324,9 +340,9 @@ func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 15 + // map header, size 16 // string "st" - o = append(o, 0x8f, 0xa2, 0x73, 0x74) + o = append(o, 0xde, 0x0, 0x10, 0xa2, 0x73, 0x74) o = msgp.AppendTime(o, z.StartTime) // string "ss" o = append(o, 0xa2, 0x73, 0x73) @@ -361,6 +377,9 @@ func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) { // string "bkt" o = append(o, 0xa3, 0x62, 0x6b, 0x74) o = msgp.AppendString(o, z.Bucket) + // string "pfx" + o = append(o, 0xa3, 0x70, 0x66, 0x78) + o = msgp.AppendString(o, z.Prefix) // string "obj" o = append(o, 0xa3, 0x6f, 0x62, 0x6a) o = msgp.AppendString(o, z.Object) @@ -483,6 +502,12 @@ func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Bucket") return } + case "pfx": + z.Prefix, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } case "obj": z.Object, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -527,7 +552,7 @@ func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *PoolDecommissionInfo) Msgsize() (s int) { - s = 1 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.ArrayHeaderSize + s = 3 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.ArrayHeaderSize for za0001 := range z.QueuedBuckets { s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001]) } @@ -535,7 +560,7 @@ func (z *PoolDecommissionInfo) Msgsize() (s int) { for za0002 := range z.DecommissionedBuckets { s += msgp.StringPrefixSize + len(z.DecommissionedBuckets[za0002]) } - s += 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + s += 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Prefix) + 4 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size return } diff --git a/docs/distributed/decom.sh b/docs/distributed/decom.sh new file mode 100755 index 000000000..5618524d7 --- /dev/null +++ b/docs/distributed/decom.sh @@ -0,0 +1,108 @@ +#!/bin/bash + +if [ -n "$DEBUG" ]; then + set -x +fi + +pkill minio +rm -rf /tmp/xl + +wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && \ + chmod +x mc + +export CI=true +(minio server /tmp/xl/{1...10}/disk{0...1} 2>&1 >/dev/null)& +pid=$! + +sleep 2 + +export MC_HOST_myminio="http://minioadmin:minioadmin@localhost:9000/" + +./mc admin user add myminio/ minio123 minio123 +./mc admin user add myminio/ minio12345 minio12345 + +./mc admin policy add myminio/ rw ./docs/distributed/rw.json +./mc admin policy add myminio/ lake ./docs/distributed/rw.json + +./mc admin policy set myminio/ rw user=minio123 +./mc admin policy set myminio/ lake,rw user=minio12345 + +./mc mb -l myminio/versioned +./mc mirror internal myminio/versioned/ --quiet >/dev/null + +user_count=$(./mc admin user list myminio/ | wc -l) +policy_count=$(./mc admin policy list myminio/ | wc -l) + +kill $pid +(minio server /tmp/xl/{1...10}/disk{0...1} /tmp/xl/{11...30}/disk{0...3} 2>&1 >/dev/null) & +pid=$! + +sleep 2 + +expanded_user_count=$(./mc admin user list myminio/ | wc -l) +expanded_policy_count=$(./mc admin policy list myminio/ | wc -l) + +if [ $user_count -ne $expanded_user_count ]; then + echo "BUG: original user count differs from expanded setup" + exit 1 +fi + +if [ $policy_count -ne $expanded_policy_count ]; then + echo "BUG: original policy count differs from expanded setup" + exit 1 +fi + +./mc version info myminio/versioned | grep -q "versioning is enabled" +ret=$? +if [ $ret -ne 0 ]; then + echo "expected versioning enabled after expansion" + exit 1 +fi + +./mc mirror cmd myminio/versioned/ --quiet >/dev/null +./mc ls -r myminio/versioned/ > expanded_ns.txt + +./mc admin decom start myminio/ /tmp/xl/{1...10}/disk{0...1} + +until $(./mc admin decom status myminio/ | grep -q Complete) +do + echo "waiting for decom to finish..." + sleep 1 +done + +kill $pid + +(minio server /tmp/xl/{11...30}/disk{0...3} 2>&1 >/dev/null)& +pid=$! + +sleep 2 + +decom_user_count=$(./mc admin user list myminio/ | wc -l) +decom_policy_count=$(./mc admin policy list myminio/ | wc -l) + +if [ $user_count -ne $decom_user_count ]; then + echo "BUG: original user count differs after decommission" + exit 1 +fi + +if [ $policy_count -ne $decom_policy_count ]; then + echo "BUG: original policy count differs after decommission" + exit 1 +fi + +./mc version info myminio/versioned | grep -q "versioning is enabled" +ret=$? +if [ $ret -ne 0 ]; then + echo "BUG: expected versioning enabled after decommission" + exit 1 +fi + +./mc ls -r myminio/versioned > decommissioned_ns.txt + +out=$(diff -qpruN expanded_ns.txt decommissioned_ns.txt) +ret=$? +if [ $ret -ne 0 ]; then + echo "BUG: expected no missing entries after decommission: $out" +fi + +kill $pid diff --git a/docs/distributed/rw.json b/docs/distributed/rw.json new file mode 100644 index 000000000..66171dc9e --- /dev/null +++ b/docs/distributed/rw.json @@ -0,0 +1,14 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:*" + ], + "Resource": [ + "arn:aws:s3:::*" + ] + } + ] + } \ No newline at end of file