heal: isObjectDangling should return false when it cannot decide (#14053)

In a multi-pool setup when disks are coming up, or in a single pool
setup let's say with 100's of erasure sets with a slow network.

It's possible when healing is attempted on `.minio.sys/config`
folder, it can lead to healing unexpectedly deleting some policy
files as dangling due to a mistake in understanding when `isObjectDangling`
is considered to be 'true'.

This issue happened in commit 30135eed86
when we assumed the validMeta with empty ErasureInfo is considered
to be fully dangling. This implementation issue gets exposed when
the server is starting up.

This is most easily seen with multiple-pool setups because of the
disconnected fashion pools that come up. The decision to purge the
object as dangling is taken incorrectly prior to the correct state
being achieved on each pool, when the corresponding drive let's say
returns 'errDiskNotFound', a 'delete' is triggered. At this point,
the 'drive' comes online because this is part of the startup sequence
as drives can come online lazily.

This kind of situation exists because we allow (totalDisks/2) number
of drives to be online when the server is being restarted.

Implementation made an incorrect assumption here leading to policies
getting deleted.

Added tests to capture the implementation requirements.
This commit is contained in:
Harshavardhana 2022-01-07 19:11:54 -08:00 committed by GitHub
parent 0a224654c2
commit b7c5e45fff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 461 additions and 71 deletions

View File

@ -38,3 +38,9 @@ jobs:
sudo sysctl net.ipv6.conf.all.disable_ipv6=0 sudo sysctl net.ipv6.conf.all.disable_ipv6=0
sudo sysctl net.ipv6.conf.default.disable_ipv6=0 sudo sysctl net.ipv6.conf.default.disable_ipv6=0
make test-replication make test-replication
- name: Test MinIO IDP for automatic site replication
run: |
sudo sysctl net.ipv6.conf.all.disable_ipv6=0
sudo sysctl net.ipv6.conf.default.disable_ipv6=0
make test-site-replication-minio

View File

@ -67,6 +67,10 @@ test-site-replication-oidc: install ## verify automatic site replication
@echo "Running tests for automatic site replication of IAM (with OIDC)" @echo "Running tests for automatic site replication of IAM (with OIDC)"
@(env bash $(PWD)/docs/site-replication/run-multi-site-oidc.sh) @(env bash $(PWD)/docs/site-replication/run-multi-site-oidc.sh)
test-site-replication-minio: install ## verify automatic site replication
@echo "Running tests for automatic site replication of IAM (with MinIO IDP)"
@(env bash $(PWD)/docs/site-replication/run-multi-site-minio-idp.sh)
verify: ## verify minio various setups verify: ## verify minio various setups
@echo "Verifying build with race" @echo "Verifying build with race"
@CGO_ENABLED=1 go build -race -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null @CGO_ENABLED=1 go build -race -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null

View File

@ -246,7 +246,7 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File
switch { switch {
case errors.Is(erErr, errFileNotFound) || errors.Is(erErr, errFileVersionNotFound): case errors.Is(erErr, errFileNotFound) || errors.Is(erErr, errFileVersionNotFound):
return true return true
case errors.Is(erErr, errCorruptedFormat): case errors.Is(erErr, errFileCorrupt):
return true return true
} }
if erErr == nil { if erErr == nil {
@ -312,13 +312,13 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
if err != nil { if err != nil {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts) return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, nil, opts)
} }
result.ParityBlocks = result.DiskCount - readQuorum result.ParityBlocks = result.DiskCount - readQuorum
result.DataBlocks = readQuorum result.DataBlocks = readQuorum
// List of disks having latest version of the object er.meta // List of disks having latest version of the object xl.meta
// (by modtime). // (by modtime).
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
@ -356,10 +356,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// If data is sane on any one disk, we can // If data is sane on any one disk, we can
// extract the correct object size. // extract the correct object size.
result.ObjectSize = partsMetadata[i].Size result.ObjectSize = partsMetadata[i].Size
if partsMetadata[i].Erasure.ParityBlocks > 0 && partsMetadata[i].Erasure.DataBlocks > 0 {
result.ParityBlocks = partsMetadata[i].Erasure.ParityBlocks
result.DataBlocks = partsMetadata[i].Erasure.DataBlocks
}
case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound: case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound:
driveState = madmin.DriveStateOffline driveState = madmin.DriveStateOffline
case errs[i] == errFileNotFound, errs[i] == errFileVersionNotFound, errs[i] == errVolumeNotFound: case errs[i] == errFileNotFound, errs[i] == errFileVersionNotFound, errs[i] == errVolumeNotFound:
@ -406,7 +402,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// If less than read quorum number of disks have all the parts // If less than read quorum number of disks have all the parts
// of the data, we can't reconstruct the erasure-coded data. // of the data, we can't reconstruct the erasure-coded data.
if numAvailableDisks < result.DataBlocks { if numAvailableDisks < readQuorum {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, dataErrs, opts) return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, dataErrs, opts)
} }
@ -466,9 +462,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
copyPartsMetadata = shufflePartsMetadata(copyPartsMetadata, latestMeta.Erasure.Distribution) copyPartsMetadata = shufflePartsMetadata(copyPartsMetadata, latestMeta.Erasure.Distribution)
if !latestMeta.Deleted && !latestMeta.IsRemote() { if !latestMeta.Deleted && !latestMeta.IsRemote() {
result.DataBlocks = latestMeta.Erasure.DataBlocks
result.ParityBlocks = latestMeta.Erasure.ParityBlocks
// Heal each part. erasureHealFile() will write the healed // Heal each part. erasureHealFile() will write the healed
// part to .minio/tmp/uuid/ which needs to be renamed later to // part to .minio/tmp/uuid/ which needs to be renamed later to
// the final location. // the final location.
@ -819,21 +812,20 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
// remove we simply delete it from namespace. // remove we simply delete it from namespace.
m, ok := isObjectDangling(metaArr, errs, dataErrs) m, ok := isObjectDangling(metaArr, errs, dataErrs)
if ok { if ok {
parityBlocks := m.Erasure.ParityBlocks parityBlocks := er.defaultParityCount
if m.Erasure.ParityBlocks == 0 { dataBlocks := len(storageDisks) - parityBlocks
parityBlocks = er.defaultParityCount if m.IsValid() {
} parityBlocks = m.Erasure.ParityBlocks
dataBlocks := m.Erasure.DataBlocks dataBlocks = m.Erasure.DataBlocks
if m.Erasure.DataBlocks == 0 {
dataBlocks = len(storageDisks) - parityBlocks
} }
writeQuorum := dataBlocks writeQuorum := dataBlocks
if dataBlocks == parityBlocks { if dataBlocks == parityBlocks {
writeQuorum++ writeQuorum++
} }
var err error var err error
var returnNotFound bool if opts.Remove {
if !opts.DryRun && opts.Remove {
err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{ err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{
VersionID: versionID, VersionID: versionID,
}, false) }, false)
@ -849,19 +841,10 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
m.Size = 0 m.Size = 0
} }
// Delete successfully purged dangling content, return ObjectNotFound/VersionNotFound instead. return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
if countErrs(errs, nil) == len(errs) { errs, bucket, object, versionID), nil
returnNotFound = true
}
}
if returnNotFound {
err = toObjectErr(errFileNotFound, bucket, object)
if versionID != "" {
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
} }
return er.defaultHealResult(m, storageDisks, storageEndpoints, return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID) errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID)
} }
@ -878,54 +861,64 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
// files is lesser than number of data blocks. // files is lesser than number of data blocks.
func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (validMeta FileInfo, ok bool) { func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (validMeta FileInfo, ok bool) {
// We can consider an object data not reliable // We can consider an object data not reliable
// when er.meta is not found in read quorum disks. // when xl.meta is not found in read quorum disks.
// or when er.meta is not readable in read quorum disks. // or when xl.meta is not readable in read quorum disks.
var notFoundErasureMeta, corruptedErasureMeta int danglingErrsCount := func(cerrs []error) (int, int) {
for _, readErr := range errs { var (
if errors.Is(readErr, errFileNotFound) || errors.Is(readErr, errFileVersionNotFound) { notFoundCount int
notFoundErasureMeta++ corruptedCount int
} else if errors.Is(readErr, errCorruptedFormat) { )
corruptedErasureMeta++ for _, readErr := range cerrs {
} if errors.Is(readErr, errFileNotFound) || errors.Is(readErr, errFileVersionNotFound) {
} notFoundCount++
var notFoundParts int } else if errors.Is(readErr, errFileCorrupt) {
for i := range dataErrs { corruptedCount++
// Only count part errors, if the error is not
// same as er.meta error. This is to avoid
// double counting when both parts and er.meta
// are not available.
if errs[i] != dataErrs[i] {
if IsErr(dataErrs[i], []error{
errFileNotFound,
errFileVersionNotFound,
}...) {
notFoundParts++
} }
} }
return notFoundCount, corruptedCount
} }
ndataErrs := make([]error, len(dataErrs))
for i := range dataErrs {
if errs[i] != dataErrs[i] {
// Only count part errors, if the error is not
// same as xl.meta error. This is to avoid
// double counting when both parts and xl.meta
// are not available.
ndataErrs[i] = dataErrs[i]
}
}
notFoundMetaErrs, corruptedMetaErrs := danglingErrsCount(errs)
notFoundPartsErrs, corruptedPartsErrs := danglingErrsCount(ndataErrs)
for _, m := range metaArr { for _, m := range metaArr {
if !m.IsValid() { if m.IsValid() {
continue validMeta = m
break
} }
validMeta = m
break
} }
if validMeta.Deleted || validMeta.IsRemote() { if !validMeta.IsValid() {
// notFoundParts is ignored since a // We have no idea what this file is, leave it as is.
return validMeta, false
}
if validMeta.Deleted {
// notFoundPartsErrs is ignored since
// - delete marker does not have any parts // - delete marker does not have any parts
// - transition status of complete has no parts return validMeta, corruptedMetaErrs+notFoundMetaErrs > len(errs)/2
return validMeta, corruptedErasureMeta+notFoundErasureMeta > len(errs)/2
} }
// We couldn't find any valid meta we are indeed corrupted, return true right away. totalErrs := notFoundMetaErrs + corruptedMetaErrs + notFoundPartsErrs + corruptedPartsErrs
if validMeta.Erasure.DataBlocks == 0 { if validMeta.IsRemote() {
return validMeta, true // notFoundPartsErrs is ignored since
// - transition status of complete has no parts
totalErrs = notFoundMetaErrs + corruptedMetaErrs
} }
// We have valid meta, now verify if we have enough files with parity blocks. // We have valid meta, now verify if we have enough files with parity blocks.
return validMeta, corruptedErasureMeta+notFoundErasureMeta+notFoundParts > validMeta.Erasure.ParityBlocks return validMeta, totalErrs > validMeta.Erasure.ParityBlocks
} }
// HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true. // HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.

View File

@ -34,6 +34,192 @@ import (
"github.com/minio/madmin-go" "github.com/minio/madmin-go"
) )
// Tests isObjectDangling function
func TestIsObjectDangling(t *testing.T) {
fi := newFileInfo("test-object", 2, 2)
fi.Erasure.Index = 1
testCases := []struct {
name string
metaArr []FileInfo
errs []error
dataErrs []error
expectedMeta FileInfo
expectedDangling bool
}{
{
name: "FileInfoExists-case1",
metaArr: []FileInfo{
{},
{},
fi,
fi,
},
errs: []error{
errFileNotFound,
errDiskNotFound,
nil,
nil,
},
dataErrs: nil,
expectedMeta: fi,
expectedDangling: false,
},
{
name: "FileInfoExists-case2",
metaArr: []FileInfo{
{},
{},
fi,
fi,
},
errs: []error{
errFileNotFound,
errFileNotFound,
nil,
nil,
},
dataErrs: nil,
expectedMeta: fi,
expectedDangling: false,
},
{
name: "FileInfoUndecided-case1",
metaArr: []FileInfo{
{},
{},
{},
fi,
},
errs: []error{
errFileNotFound,
errDiskNotFound,
errDiskNotFound,
nil,
},
dataErrs: nil,
expectedMeta: fi,
expectedDangling: false,
},
{
name: "FileInfoUndecided-case2",
metaArr: []FileInfo{},
errs: []error{
errFileNotFound,
errDiskNotFound,
errDiskNotFound,
errFileNotFound,
},
dataErrs: nil,
expectedMeta: FileInfo{},
expectedDangling: false,
},
{
name: "FileInfoUndecided-case3(file deleted)",
metaArr: []FileInfo{},
errs: []error{
errFileNotFound,
errFileNotFound,
errFileNotFound,
errFileNotFound,
},
dataErrs: nil,
expectedMeta: FileInfo{},
expectedDangling: false,
},
{
name: "FileInfoDecided-case1",
metaArr: []FileInfo{
{},
{},
{},
fi,
},
errs: []error{
errFileNotFound,
errFileNotFound,
errFileNotFound,
nil,
},
dataErrs: nil,
expectedMeta: fi,
expectedDangling: true,
},
{
name: "FileInfoDecided-case2",
metaArr: []FileInfo{
{},
{},
{},
fi,
},
errs: []error{
errFileNotFound,
errFileCorrupt,
errFileCorrupt,
nil,
},
dataErrs: nil,
expectedMeta: fi,
expectedDangling: true,
},
{
name: "FileInfoDecided-case2-delete-marker",
metaArr: []FileInfo{
{},
{},
{},
{Deleted: true},
},
errs: []error{
errFileNotFound,
errFileCorrupt,
errFileCorrupt,
nil,
},
dataErrs: nil,
expectedMeta: FileInfo{Deleted: true},
expectedDangling: true,
},
{
name: "FileInfoDecided-case2-(duplicate data errors)",
metaArr: []FileInfo{
{},
{},
{},
{Deleted: true},
},
errs: []error{
errFileNotFound,
errFileCorrupt,
errFileCorrupt,
nil,
},
dataErrs: []error{
errFileNotFound,
errFileCorrupt,
nil,
nil,
},
expectedMeta: FileInfo{Deleted: true},
expectedDangling: true,
},
// Add new cases as seen
}
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
gotMeta, dangling := isObjectDangling(testCase.metaArr, testCase.errs, testCase.dataErrs)
if !gotMeta.Equals(testCase.expectedMeta) {
t.Errorf("Expected %#v, got %#v", testCase.expectedMeta, gotMeta)
}
if dangling != testCase.expectedDangling {
t.Errorf("Expected dangling %t, got %t", testCase.expectedDangling, dangling)
}
})
}
}
// Tests both object and bucket healing. // Tests both object and bucket healing.
func TestHealing(t *testing.T) { func TestHealing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View File

@ -927,7 +927,7 @@ func (x *xlMetaV2) loadLegacy(buf []byte) error {
return errors.New("unknown major metadata version") return errors.New("unknown major metadata version")
} }
if allMeta == nil { if allMeta == nil {
return errCorruptedFormat return errFileCorrupt
} }
// bts will shrink as we decode. // bts will shrink as we decode.
bts := allMeta bts := allMeta

View File

@ -10,7 +10,7 @@ cleanup() {
echo "Cleaning up instances of MinIO" echo "Cleaning up instances of MinIO"
pkill minio pkill minio
pkill -9 minio pkill -9 minio
rm -rf /tmp/minio{1,2,3} rm -rf /tmp/minio-ldap-idp{1,2,3}
} }
cleanup cleanup
@ -40,9 +40,9 @@ if [ ! -f ./mc ]; then
&& chmod +x mc && chmod +x mc
fi fi
minio server --address ":9001" /tmp/minio1/{1...4} >/tmp/minio1_1.log 2>&1 & minio server --config-dir /tmp/minio-ldap --address ":9001" /tmp/minio-ldap-idp1/{1...4} >/tmp/minio1_1.log 2>&1 &
minio server --address ":9002" /tmp/minio2/{1...4} >/tmp/minio2_1.log 2>&1 & minio server --config-dir /tmp/minio-ldap --address ":9002" /tmp/minio-ldap-idp2/{1...4} >/tmp/minio2_1.log 2>&1 &
minio server --address ":9003" /tmp/minio3/{1...4} >/tmp/minio3_1.log 2>&1 & minio server --config-dir /tmp/minio-ldap --address ":9003" /tmp/minio-ldap-idp3/{1...4} >/tmp/minio3_1.log 2>&1 &
sleep 10 sleep 10
@ -205,3 +205,5 @@ if [ "${enabled_minio1}" != "Enabled" ]; then
echo "expected bucket to be mirrored with object-lock enabled, exiting..." echo "expected bucket to be mirrored with object-lock enabled, exiting..."
exit_1; exit_1;
fi fi
cleanup

View File

@ -0,0 +1,199 @@
#!/usr/bin/env bash
# shellcheck disable=SC2120
exit_1() {
cleanup
exit 1
}
cleanup() {
echo "Cleaning up instances of MinIO"
pkill minio
pkill -9 minio
rm -rf /tmp/minio-internal-idp{1,2,3}
}
cleanup
unset MINIO_KMS_KES_CERT_FILE
unset MINIO_KMS_KES_KEY_FILE
unset MINIO_KMS_KES_ENDPOINT
unset MINIO_KMS_KES_KEY_NAME
export MINIO_BROWSER=off
export MINIO_ROOT_USER="minio"
export MINIO_ROOT_PASSWORD="minio123"
export MINIO_KMS_AUTO_ENCRYPTION=off
export MINIO_PROMETHEUS_AUTH_TYPE=public
export MINIO_KMS_SECRET_KEY=my-minio-key:OSMM+vkKUTCvQs9YL/CVMIMt43HFhkUpqJxTmGl6rYw=
if [ ! -f ./mc ]; then
wget -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc \
&& chmod +x mc
fi
minio server --config-dir /tmp/minio-internal --address ":9001" /tmp/minio-internal-idp1/{1...4} >/tmp/minio1_1.log 2>&1 &
minio server --config-dir /tmp/minio-internal --address ":9002" /tmp/minio-internal-idp2/{1...4} >/tmp/minio2_1.log 2>&1 &
minio server --config-dir /tmp/minio-internal --address ":9003" /tmp/minio-internal-idp3/{1...4} >/tmp/minio3_1.log 2>&1 &
sleep 10
export MC_HOST_minio1=http://minio:minio123@localhost:9001
export MC_HOST_minio2=http://minio:minio123@localhost:9002
export MC_HOST_minio3=http://minio:minio123@localhost:9003
./mc admin replicate add minio1 minio2 minio3
./mc admin user add minio1 foobar foo12345
./mc admin policy set minio1 consoleAdmin user=foobar
sleep 5
./mc admin user info minio2 foobar
./mc admin user info minio3 foobar
./mc admin policy add minio1 rw ./docs/site-replication/rw.json
sleep 5
./mc admin policy info minio2 rw >/dev/null 2>&1
./mc admin policy info minio3 rw >/dev/null 2>&1
./mc admin policy remove minio3 rw
sleep 10
./mc admin policy info minio1 rw
if [ $? -eq 0 ]; then
echo "expecting the command to fail, exiting.."
exit_1;
fi
./mc admin policy info minio2 rw
if [ $? -eq 0 ]; then
echo "expecting the command to fail, exiting.."
exit_1;
fi
./mc admin user info minio1 foobar
if [ $? -ne 0 ]; then
echo "policy mapping missing, exiting.."
exit_1;
fi
./mc admin user info minio2 foobar
if [ $? -ne 0 ]; then
echo "policy mapping missing, exiting.."
exit_1;
fi
./mc admin user info minio3 foobar
if [ $? -ne 0 ]; then
echo "policy mapping missing, exiting.."
exit_1;
fi
./mc admin user svcacct add minio2 foobar --access-key testsvc --secret-key testsvc123
if [ $? -ne 0 ]; then
echo "adding svc account failed, exiting.."
exit_1;
fi
sleep 10
./mc admin user svcacct info minio1 testsvc
if [ $? -ne 0 ]; then
echo "svc account not mirrored, exiting.."
exit_1;
fi
./mc admin user svcacct info minio2 testsvc
if [ $? -ne 0 ]; then
echo "svc account not mirrored, exiting.."
exit_1;
fi
./mc admin user svcacct rm minio1 testsvc
if [ $? -ne 0 ]; then
echo "removing svc account failed, exiting.."
exit_1;
fi
sleep 10
./mc admin user svcacct info minio2 testsvc
if [ $? -eq 0 ]; then
echo "svc account found after delete, exiting.."
exit_1;
fi
./mc admin user svcacct info minio3 testsvc
if [ $? -eq 0 ]; then
echo "svc account found after delete, exiting.."
exit_1;
fi
./mc mb minio1/newbucket
sleep 5
./mc stat minio2/newbucket
if [ $? -ne 0 ]; then
echo "expecting bucket to be present. exiting.."
exit_1;
fi
./mc stat minio3/newbucket
if [ $? -ne 0 ]; then
echo "expecting bucket to be present. exiting.."
exit_1;
fi
./mc cp README.md minio2/newbucket/
sleep 5
./mc stat minio1/newbucket/README.md
if [ $? -ne 0 ]; then
echo "expecting object to be present. exiting.."
exit_1;
fi
./mc stat minio3/newbucket/README.md
if [ $? -ne 0 ]; then
echo "expecting object to be present. exiting.."
exit_1;
fi
./mc rm minio3/newbucket/README.md
sleep 5
./mc stat minio2/newbucket/README.md
if [ $? -eq 0 ]; then
echo "expected file to be deleted, exiting.."
exit_1;
fi
./mc stat minio1/newbucket/README.md
if [ $? -eq 0 ]; then
echo "expected file to be deleted, exiting.."
exit_1;
fi
./mc mb --with-lock minio3/newbucket-olock
sleep 5
enabled_minio2=$(./mc stat --json minio2/newbucket-olock| jq -r .metadata.ObjectLock.enabled)
if [ $? -ne 0 ]; then
echo "expected bucket to be mirrored with object-lock but not present, exiting..."
exit_1;
fi
if [ "${enabled_minio2}" != "Enabled" ]; then
echo "expected bucket to be mirrored with object-lock enabled, exiting..."
exit_1;
fi
enabled_minio1=$(./mc stat --json minio1/newbucket-olock| jq -r .metadata.ObjectLock.enabled)
if [ $? -ne 0 ]; then
echo "expected bucket to be mirrored with object-lock but not present, exiting..."
exit_1;
fi
if [ "${enabled_minio1}" != "Enabled" ]; then
echo "expected bucket to be mirrored with object-lock enabled, exiting..."
exit_1;
fi