feat: Single drive XL implementation (#14970)

Main motivation is move towards a common backend format
for all different types of modes in MinIO, allowing for
a simpler code and predictable behavior across all features.

This PR also brings features such as versioning, replication,
transitioning to single drive setups.
This commit is contained in:
Harshavardhana
2022-05-30 10:58:37 -07:00
committed by GitHub
parent 5792be71fa
commit f1abb92f0c
62 changed files with 4288 additions and 270 deletions

View File

@@ -145,7 +145,7 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
bucket := pathClean(vars["bucket"])
update := r.Form.Get("update") == "true"
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -274,7 +274,8 @@ func (a adminAPIHandlers) ListRemoteTargetsHandler(w http.ResponseWriter, r *htt
vars := mux.Vars(r)
bucket := pathClean(vars["bucket"])
arnType := vars["type"]
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -314,7 +315,7 @@ func (a adminAPIHandlers) RemoveRemoteTargetHandler(w http.ResponseWriter, r *ht
bucket := pathClean(vars["bucket"])
arn := vars["arn"]
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}

View File

@@ -47,10 +47,10 @@ func TestIAMInternalIDPConcurrencyServerSuite(t *testing.T) {
}
baseTestCases := []TestSuiteCommon{
// Init and run test on FS backend with signature v4.
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend, with tls enabled.
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on ErasureSD backend with signature v4.
{serverType: "ErasureSD", signer: signerV4},
// Init and run test on ErasureSD backend, with tls enabled.
{serverType: "ErasureSD", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.

View File

@@ -102,10 +102,10 @@ func (s *TestSuiteIAM) iamSetup(c *check) {
// common to tests.
var iamTestSuites = func() []*TestSuiteIAM {
baseTestCases := []TestSuiteCommon{
// Init and run test on FS backend with signature v4.
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend, with tls enabled.
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on ErasureSD backend with signature v4.
{serverType: "ErasureSD", signer: signerV4},
// Init and run test on ErasureSD backend, with tls enabled.
{serverType: "ErasureSD", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.

View File

@@ -805,8 +805,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Check if this setup has an erasure coded backend.
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
@@ -998,7 +997,7 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *
}
// Check if this setup has an erasure coded backend.
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
@@ -1078,7 +1077,7 @@ func (a adminAPIHandlers) ObjectSpeedtestHandler(w http.ResponseWriter, r *http.
return
}
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -1228,7 +1227,7 @@ func (a adminAPIHandlers) DriveSpeedtestHandler(w http.ResponseWriter, r *http.R
return
}
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}

View File

@@ -170,50 +170,48 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
// Set Group Status
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-group-status").HandlerFunc(gz(httpTraceHdrs(adminAPI.SetGroupStatus))).Queries("group", "{group:.*}").Queries("status", "{status:.*}")
if globalIsDistErasure || globalIsErasure {
// GetBucketQuotaConfig
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-quota").HandlerFunc(
gz(httpTraceHdrs(adminAPI.GetBucketQuotaConfigHandler))).Queries("bucket", "{bucket:.*}")
// PutBucketQuotaConfig
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-quota").HandlerFunc(
gz(httpTraceHdrs(adminAPI.PutBucketQuotaConfigHandler))).Queries("bucket", "{bucket:.*}")
// GetBucketQuotaConfig
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-quota").HandlerFunc(
gz(httpTraceHdrs(adminAPI.GetBucketQuotaConfigHandler))).Queries("bucket", "{bucket:.*}")
// PutBucketQuotaConfig
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-quota").HandlerFunc(
gz(httpTraceHdrs(adminAPI.PutBucketQuotaConfigHandler))).Queries("bucket", "{bucket:.*}")
// Bucket replication operations
// GetBucketTargetHandler
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/list-remote-targets").HandlerFunc(
gz(httpTraceHdrs(adminAPI.ListRemoteTargetsHandler))).Queries("bucket", "{bucket:.*}", "type", "{type:.*}")
// SetRemoteTargetHandler
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-remote-target").HandlerFunc(
gz(httpTraceHdrs(adminAPI.SetRemoteTargetHandler))).Queries("bucket", "{bucket:.*}")
// RemoveRemoteTargetHandler
adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-remote-target").HandlerFunc(
gz(httpTraceHdrs(adminAPI.RemoveRemoteTargetHandler))).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}")
// Bucket replication operations
// GetBucketTargetHandler
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/list-remote-targets").HandlerFunc(
gz(httpTraceHdrs(adminAPI.ListRemoteTargetsHandler))).Queries("bucket", "{bucket:.*}", "type", "{type:.*}")
// SetRemoteTargetHandler
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-remote-target").HandlerFunc(
gz(httpTraceHdrs(adminAPI.SetRemoteTargetHandler))).Queries("bucket", "{bucket:.*}")
// RemoveRemoteTargetHandler
adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-remote-target").HandlerFunc(
gz(httpTraceHdrs(adminAPI.RemoveRemoteTargetHandler))).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}")
// Remote Tier management operations
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.AddTierHandler)))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.EditTierHandler)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.ListTierHandler)))
adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.RemoveTierHandler)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.VerifyTierHandler)))
// Tier stats
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier-stats").HandlerFunc(gz(httpTraceHdrs(adminAPI.TierStatsHandler)))
// Remote Tier management operations
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.AddTierHandler)))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.EditTierHandler)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.ListTierHandler)))
adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.RemoveTierHandler)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.VerifyTierHandler)))
// Tier stats
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier-stats").HandlerFunc(gz(httpTraceHdrs(adminAPI.TierStatsHandler)))
// Cluster Replication APIs
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/add").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationAdd)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationRemove)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/info").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationInfo)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/metainfo").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationMetaInfo)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/status").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationStatus)))
// Cluster Replication APIs
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/add").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationAdd)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationRemove)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/info").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationInfo)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/metainfo").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationMetaInfo)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/status").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationStatus)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/join").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerJoin)))
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/peer/bucket-ops").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerBucketOps))).Queries("bucket", "{bucket:.*}").Queries("operation", "{operation:.*}")
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/iam-item").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerReplicateIAMItem)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/bucket-meta").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerReplicateBucketItem)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/peer/idp-settings").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerGetIDPSettings)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationEdit)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerEdit)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerRemove)))
}
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/join").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerJoin)))
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/peer/bucket-ops").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerBucketOps))).Queries("bucket", "{bucket:.*}").Queries("operation", "{operation:.*}")
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/iam-item").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerReplicateIAMItem)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/bucket-meta").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerReplicateBucketItem)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/peer/idp-settings").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerGetIDPSettings)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationEdit)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerEdit)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerRemove)))
if globalIsDistErasure {
// Top locks

View File

@@ -1364,7 +1364,7 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -1611,7 +1611,7 @@ func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWr
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}

View File

@@ -127,20 +127,11 @@ func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configF
meta.QuotaConfigJSON = configData
meta.QuotaConfigUpdatedAt = UTCNow()
case objectLockConfig:
if !globalIsErasure && !globalIsDistErasure {
return NotImplemented{}
}
meta.ObjectLockConfigXML = configData
meta.ObjectLockConfigUpdatedAt = UTCNow()
case bucketVersioningConfig:
if !globalIsErasure && !globalIsDistErasure {
return NotImplemented{}
}
meta.VersioningConfigXML = configData
case bucketReplicationConfig:
if !globalIsErasure && !globalIsDistErasure {
return NotImplemented{}
}
meta.ReplicationConfigXML = configData
meta.ReplicationConfigUpdatedAt = UTCNow()
case bucketTargetsFile:

View File

@@ -2187,10 +2187,6 @@ func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo,
if objAPI == nil {
return errServerNotInitialized
}
// replication applies only to erasure coded setups
if !globalIsErasure {
return nil
}
// Load bucket metadata sys in background
go p.loadResync(ctx, buckets, objAPI)
return nil

View File

@@ -120,9 +120,6 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
}
if tgt.Type == madmin.ReplicationService {
if !globalIsErasure {
return NotImplemented{Message: "Replication is not implemented in " + getMinioMode()}
}
if !globalBucketVersioningSys.Enabled(bucket) {
return BucketReplicationSourceNotVersioned{Bucket: bucket}
}
@@ -184,9 +181,6 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
if globalIsGateway {
return nil
}
if !globalIsErasure {
return NotImplemented{Message: "Replication is not implemented in " + getMinioMode()}
}
if arnStr == "" {
return BucketRemoteArnInvalid{Bucket: bucket}

View File

@@ -582,7 +582,7 @@ func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endp
return endpoints, setupType, config.ErrInvalidFSEndpoint(nil).Msg("use path style endpoint for FS setup")
}
endpoints = append(endpoints, endpoint)
setupType = FSSetupType
setupType = ErasureSDSetupType
// Check for cross device mounts if any.
if err = checkCrossDeviceMounts(endpoints); err != nil {

View File

@@ -231,10 +231,10 @@ func TestCreateEndpoints(t *testing.T) {
}{
{"localhost", [][]string{}, "", Endpoints{}, -1, fmt.Errorf("address localhost: missing port in address")},
// FS Setup
// Erasure Single Drive
{"localhost:9000", [][]string{{"http://localhost/d1"}}, "", Endpoints{}, -1, fmt.Errorf("use path style endpoint for FS setup")},
{":443", [][]string{{"/d1"}}, ":443", Endpoints{Endpoint{URL: &url.URL{Path: mustAbs("/d1")}, IsLocal: true}}, FSSetupType, nil},
{"localhost:10000", [][]string{{"/d1"}}, "localhost:10000", Endpoints{Endpoint{URL: &url.URL{Path: mustAbs("/d1")}, IsLocal: true}}, FSSetupType, nil},
{":443", [][]string{{"/d1"}}, ":443", Endpoints{Endpoint{URL: &url.URL{Path: mustAbs("/d1")}, IsLocal: true}}, ErasureSDSetupType, nil},
{"localhost:10000", [][]string{{"/d1"}}, "localhost:10000", Endpoints{Endpoint{URL: &url.URL{Path: mustAbs("/d1")}, IsLocal: true}}, ErasureSDSetupType, nil},
{"localhost:9000", [][]string{{"https://127.0.0.1:9000/d1", "https://localhost:9001/d1", "https://example.com/d1", "https://example.com/d2"}}, "", Endpoints{}, -1, fmt.Errorf("path '/d1' can not be served by different port on same address")},
// Erasure Setup with PathEndpointType

View File

@@ -41,7 +41,7 @@ type Erasure struct {
// NewErasure creates a new ErasureStorage.
func NewErasure(ctx context.Context, dataBlocks, parityBlocks int, blockSize int64) (e Erasure, err error) {
// Check the parameters for sanity now.
if dataBlocks <= 0 || parityBlocks <= 0 {
if dataBlocks <= 0 || parityBlocks < 0 {
return e, reedsolomon.ErrInvShardNum
}

View File

@@ -99,7 +99,7 @@ func (fi FileInfo) IsValid() bool {
fi.Erasure.Index <= dataBlocks+parityBlocks &&
len(fi.Erasure.Distribution) == (dataBlocks+parityBlocks))
return ((dataBlocks >= parityBlocks) &&
(dataBlocks != 0) && (parityBlocks != 0) &&
(dataBlocks > 0) && (parityBlocks >= 0) &&
correctIndexes)
}
@@ -284,7 +284,7 @@ func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIn
func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (FileInfo, error) {
// with less quorum return error.
if quorum < 2 {
if quorum < 1 {
return FileInfo{}, errErasureReadQuorum
}
metaHashes := make([]string, len(metaArr))
@@ -398,6 +398,10 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
// readQuorum is the min required disks to read data.
// writeQuorum is the min required disks to write data.
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
if defaultParityCount == 0 {
return 1, 1, nil
}
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
if err != nil {

View File

@@ -1327,12 +1327,17 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string) error {
disks := er.getDisks()
g := errgroup.WithNErrs(len(disks))
dirPrefix := encodeDirObject(prefix)
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return nil
}
// Deletes
// - The prefix and its children
// - The prefix__XLDIR__
defer disks[index].Delete(ctx, bucket, dirPrefix, true)
return disks[index].Delete(ctx, bucket, prefix, true)
}, index)
}

View File

@@ -61,6 +61,22 @@ func (z *erasureServerPools) SinglePool() bool {
// Initialize new pool of erasure sets.
func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServerPools) (ObjectLayer, error) {
if endpointServerPools.NEndpoints() == 1 {
ep := endpointServerPools[0]
storageDisks, format, err := waitForFormatErasure(true, ep.Endpoints, 1, ep.SetCount, ep.DrivesPerSet, "", "")
if err != nil {
return nil, err
}
objLayer, err := newErasureSingle(ctx, storageDisks[0], format)
if err != nil {
return nil, err
}
globalLocalDrives = storageDisks
return objLayer, nil
}
var (
deploymentID string
distributionAlgo string
@@ -320,7 +336,7 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
nSets[index] = pool.setCount
g.Go(func() error {
// Get the set where it would be placed.
storageInfos[index] = getDiskInfos(ctx, pool.getHashedSet(object).getDisks())
storageInfos[index] = getDiskInfos(ctx, pool.getHashedSet(object).getDisks()...)
return nil
}, index)
}
@@ -933,7 +949,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
object = encodeDirObject(object)
if z.SinglePool() {
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()), data.Size()) {
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), data.Size()) {
return ObjectInfo{}, toObjectErr(errDiskFull)
}
return z.serverPools[0].PutObject(ctx, bucket, object, data, opts)
@@ -1325,7 +1341,7 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
}
if z.SinglePool() {
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()), -1) {
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), -1) {
return "", toObjectErr(errDiskFull)
}
return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts)

View File

@@ -1252,7 +1252,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
defer func(storageDisks []StorageAPI) {
if err != nil {
closeStorageDisks(storageDisks)
closeStorageDisks(storageDisks...)
}
}(storageDisks)

3289
cmd/erasure-single-drive.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -79,7 +79,7 @@ func (er erasureObjects) NewNSLock(bucket string, objects ...string) RWLocker {
// Shutdown function for object storage interface.
func (er erasureObjects) Shutdown(ctx context.Context) error {
// Add any object layer shutdown activities here.
closeStorageDisks(er.getDisks())
closeStorageDisks(er.getDisks()...)
return nil
}

View File

@@ -40,6 +40,9 @@ const (
// Represents Erasure backend.
formatBackendErasure = "xl"
// Represents Erasure backend - single drive
formatBackendErasureSingle = "xl-single"
// formatErasureV1.Erasure.Version - version '1'.
formatErasureVersionV1 = "1"
@@ -146,6 +149,9 @@ func newFormatErasureV3(numSets int, setLen int) *formatErasureV3 {
format := &formatErasureV3{}
format.Version = formatMetaVersionV1
format.Format = formatBackendErasure
if setLen == 1 {
format.Format = formatBackendErasureSingle
}
format.ID = mustGetUUID()
format.Erasure.Version = formatErasureVersionV3
format.Erasure.DistributionAlgo = formatErasureVersionV3DistributionAlgoV3
@@ -170,8 +176,8 @@ func formatGetBackendErasureVersion(b []byte) (string, error) {
if meta.Version != formatMetaVersionV1 {
return "", fmt.Errorf(`format.Version expected: %s, got: %s`, formatMetaVersionV1, meta.Version)
}
if meta.Format != formatBackendErasure {
return "", fmt.Errorf(`found backend type %s, expected %s`, meta.Format, formatBackendErasure)
if meta.Format != formatBackendErasure && meta.Format != formatBackendErasureSingle {
return "", fmt.Errorf(`found backend type %s, expected %s or %s`, meta.Format, formatBackendErasure, formatBackendErasureSingle)
}
// Erasure backend found, proceed to detect version.
format := &formatErasureVersionDetect{}
@@ -291,7 +297,7 @@ func formatErasureMigrateV2ToV3(data []byte, export, version string) ([]byte, er
func countErrs(errs []error, err error) int {
i := 0
for _, err1 := range errs {
if err1 == err {
if err1 == err || errors.Is(err1, err) {
i++
}
}
@@ -410,7 +416,7 @@ func checkFormatErasureValue(formatErasure *formatErasureV3, disk StorageAPI) er
if formatErasure.Version != formatMetaVersionV1 {
return fmt.Errorf("Unsupported version of backend format [%s] found on %s", formatErasure.Version, disk)
}
if formatErasure.Format != formatBackendErasure {
if formatErasure.Format != formatBackendErasure && formatErasure.Format != formatBackendErasureSingle {
return fmt.Errorf("Unsupported backend format [%s] found on %s", formatErasure.Format, disk)
}
if formatErasure.Erasure.Version != formatErasureVersionV3 {
@@ -643,7 +649,7 @@ func saveFormatErasureAll(ctx context.Context, storageDisks []StorageAPI, format
}
// relinquishes the underlying connection for all storage disks.
func closeStorageDisks(storageDisks []StorageAPI) {
func closeStorageDisks(storageDisks ...StorageAPI) {
var wg sync.WaitGroup
for _, disk := range storageDisks {
if disk == nil {

View File

@@ -240,6 +240,9 @@ func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, er
if err != nil {
return nil, err
}
if formatBackend == formatBackendErasureSingle {
return nil, errFreshDisk
}
if formatBackend != formatBackendFS {
return nil, fmt.Errorf(`%s file: expected format-type: %s, found: %s`, formatConfigFile, formatBackendFS, formatBackend)
}
@@ -319,6 +322,10 @@ func formatFSFixDeploymentID(ctx context.Context, fsFormatPath string) error {
rlk.Close()
return err
}
if formatBackend == formatBackendErasureSingle {
rlk.Close()
return errFreshDisk
}
if formatBackend != formatBackendFS {
rlk.Close()
return fmt.Errorf(`%s file: expected format-type: %s, found: %s`, formatConfigFile, formatBackendFS, formatBackend)

View File

@@ -45,6 +45,8 @@ func TestFSV1MetadataObjInfo(t *testing.T) {
// TestReadFSMetadata - readFSMetadata testing with a healthy and faulty disk
func TestReadFSMetadata(t *testing.T) {
t.Skip()
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)
@@ -80,6 +82,7 @@ func TestReadFSMetadata(t *testing.T) {
// TestWriteFSMetadata - tests of writeFSMetadata with healthy disk.
func TestWriteFSMetadata(t *testing.T) {
t.Skip()
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)

View File

@@ -32,6 +32,7 @@ import (
// Tests cleanup multipart uploads for filesystem backend.
func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
t.Skip()
// Prepare for tests
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)
@@ -88,6 +89,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
// TestNewMultipartUploadFaultyDisk - test NewMultipartUpload with faulty disks
func TestNewMultipartUploadFaultyDisk(t *testing.T) {
t.Skip()
// Prepare for tests
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)

View File

@@ -142,6 +142,11 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
return nil, config.ErrUnableToWriteInBackend(err).Hint(hint)
}
fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile)
if _, err = fsStat(ctx, fsFormatPath); err != nil && os.IsNotExist(err) {
return nil, errFreshDisk
}
// Assign a new UUID for FS minio mode. Each server instance
// gets its own UUID for temporary file transaction.
fsUUID := mustGetUUID()

View File

@@ -51,6 +51,8 @@ func TestNewFS(t *testing.T) {
// TestFSShutdown - initialize a new FS object layer then calls
// Shutdown to check returned results
func TestFSShutdown(t *testing.T) {
t.Skip()
bucketName := "testbucket"
objectName := "object"
// Create and return an fsObject with its path in the disk
@@ -83,6 +85,8 @@ func TestFSShutdown(t *testing.T) {
// TestFSGetBucketInfo - test GetBucketInfo with healty and faulty disks
func TestFSGetBucketInfo(t *testing.T) {
t.Skip()
// Prepare for testing
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)
@@ -165,6 +169,7 @@ func TestFSPutObject(t *testing.T) {
// TestFSDeleteObject - test fs.DeleteObject() with healthy and corrupted disks
func TestFSDeleteObject(t *testing.T) {
t.Skip()
// Prepare for tests
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)
@@ -209,6 +214,7 @@ func TestFSDeleteObject(t *testing.T) {
// TestFSDeleteBucket - tests for fs DeleteBucket
func TestFSDeleteBucket(t *testing.T) {
t.Skip()
// Prepare for testing
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)
@@ -249,6 +255,7 @@ func TestFSDeleteBucket(t *testing.T) {
// TestFSListBuckets - tests for fs ListBuckets
func TestFSListBuckets(t *testing.T) {
t.Skip()
// Prepare for tests
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk)

View File

@@ -19,6 +19,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"io/ioutil"
"log"
@@ -32,6 +33,7 @@ import (
"github.com/minio/cli"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/certs"
@@ -292,6 +294,9 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
SecretKey: globalActiveCred.SecretKey,
})
if err != nil {
if errors.Is(err, errFreshDisk) {
err = config.ErrInvalidFSValue(err)
}
logger.FatalIf(err, "Unable to initialize gateway backend")
}
newObject = NewGatewayLayerWithLocker(newObject)

View File

@@ -340,7 +340,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// healObject heals given object path in deep to fix bitrot.
func healObject(bucket, object, versionID string, scan madmin.HealScanMode) {
// Get background heal sequence to send elements to heal
globalHealStateLK.Lock()
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
globalHealStateLK.Unlock()
if ok {
bgSeq.queueHealTask(healSource{
bucket: bucket,

View File

@@ -74,6 +74,7 @@ const (
globalWindowsOSName = "windows"
globalMacOSName = "darwin"
globalMinioModeFS = "mode-server-fs"
globalMinioModeErasureSD = "mode-server-xl-single"
globalMinioModeErasure = "mode-server-xl"
globalMinioModeDistErasure = "mode-server-distributed-xl"
globalMinioModeGatewayPrefix = "mode-gateway-"
@@ -141,6 +142,9 @@ var (
// Indicates if the running minio server is an erasure-code backend.
globalIsErasure = false
// Indicates if the running minio server is in single drive XL mode.
globalIsErasureSD = false
// Indicates if the running minio is in gateway mode.
globalIsGateway = false

View File

@@ -116,10 +116,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
// + 2 * 10MiB (default erasure block size v1) + 2 * 1MiB (default erasure block size v2)
blockSize := xioutil.BlockSizeLarge + xioutil.BlockSizeSmall
apiRequestsMaxPerNode = int(maxMem / uint64(maxSetDrives*blockSize+int(blockSizeV1*2+blockSizeV2*2)))
if globalIsErasure {
logger.Info("Automatically configured API requests per node based on available memory on the system: %d", apiRequestsMaxPerNode)
}
logger.Info("Automatically configured API requests per node based on available memory on the system: %d", apiRequestsMaxPerNode)
} else {
apiRequestsMaxPerNode = cfg.RequestsMax
if len(globalEndpoints.Hostnames()) > 0 {

View File

@@ -314,18 +314,20 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
break
}
refreshInterval := sys.iamRefreshInterval
// Set up polling for expired accounts and credentials purging.
switch {
case sys.openIDConfig.ProviderEnabled():
go func() {
timer := time.NewTimer(sys.iamRefreshInterval)
timer := time.NewTimer(refreshInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
sys.purgeExpiredCredentialsForExternalSSO(ctx)
timer.Reset(sys.iamRefreshInterval)
timer.Reset(refreshInterval)
case <-ctx.Done():
return
}
@@ -333,7 +335,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
}()
case sys.ldapConfig.Enabled:
go func() {
timer := time.NewTimer(sys.iamRefreshInterval)
timer := time.NewTimer(refreshInterval)
defer timer.Stop()
for {
@@ -342,7 +344,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
sys.purgeExpiredCredentialsForLDAP(ctx)
sys.updateGroupMembershipsForLDAP(ctx)
timer.Reset(sys.iamRefreshInterval)
timer.Reset(refreshInterval)
case <-ctx.Done():
return
}
@@ -717,7 +719,7 @@ func (sys *IAMSys) SetTempUser(ctx context.Context, accessKey string, cred auth.
return errServerNotInitialized
}
if globalAuthZPlugin != nil {
if newGlobalAuthZPluginFn() != nil {
// If OPA is set, we do not need to set a policy mapping.
policyName = ""
}
@@ -1690,8 +1692,8 @@ func (sys *IAMSys) GetCombinedPolicy(policies ...string) iampolicy.Policy {
// IsAllowed - checks given policy args is allowed to continue the Rest API.
func (sys *IAMSys) IsAllowed(args iampolicy.Args) bool {
// If opa is configured, use OPA always.
if globalAuthZPlugin != nil {
ok, err := globalAuthZPlugin.IsAllowed(args)
if authz := newGlobalAuthZPluginFn(); authz != nil {
ok, err := authz.IsAllowed(args)
if err != nil {
logger.LogIf(GlobalContext, err)
}

View File

@@ -209,22 +209,7 @@ func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Reque
// lockMaintenance loops over all locks and discards locks
// that have not been refreshed for some time.
func lockMaintenance(ctx context.Context) {
// Wait until the object API is ready
// no need to start the lock maintenance
// if ObjectAPI is not initialized.
var objAPI ObjectLayer
for {
objAPI = newObjectLayerFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
if _, ok := objAPI.(*erasureServerPools); !ok {
if !globalIsDistErasure {
return
}

View File

@@ -51,10 +51,12 @@ func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
if cleanup {
// Recursively delete all caches.
objAPI := newObjectLayerFn()
ez, ok := objAPI.(*erasureServerPools)
if ok {
ctx := context.Background()
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
if objAPI != nil {
ez, ok := objAPI.(renameAllStorager)
if ok {
ctx := context.Background()
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
}
}
}
return &bucketMetacache{
@@ -207,9 +209,15 @@ func (b *bucketMetacache) cloneCaches() (map[string]metacache, map[string][]stri
// Deletes are performed concurrently.
func (b *bucketMetacache) deleteAll() {
ctx := context.Background()
ez, ok := newObjectLayerFn().(*erasureServerPools)
objAPI := newObjectLayerFn()
if objAPI == nil {
return
}
ez, ok := objAPI.(renameAllStorager)
if !ok {
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasurePools"))
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be 'renameAllStorager'"))
return
}

View File

@@ -56,7 +56,7 @@ func (m *metacacheManager) initManager() {
objAPI = newObjectLayerFn()
}
if !globalIsErasure {
if globalIsGateway {
return
}

View File

@@ -259,6 +259,286 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
return entries, nil
}
// listPath will return the requested entries.
// If no more entries are in the listing io.EOF is returned,
// otherwise nil or an unexpected error is returned.
// The listPathOptions given will be checked and modified internally.
// Required important fields are Bucket, Prefix, Separator.
// Other important fields are Limit, Marker.
// List ID always derived from the Marker.
func (es *erasureSingle) listPath(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) {
if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, es); err != nil {
return entries, err
}
// Marker is set validate pre-condition.
if o.Marker != "" && o.Prefix != "" {
// Marker not common with prefix is not implemented. Send an empty response
if !HasPrefix(o.Marker, o.Prefix) {
return entries, io.EOF
}
}
// With max keys of zero we have reached eof, return right here.
if o.Limit == 0 {
return entries, io.EOF
}
// For delimiter and prefix as '/' we do not list anything at all
// along // with the prefix. On a flat namespace with 'prefix'
// as '/' we don't have any entries, since all the keys are
// of form 'keyName/...'
if strings.HasPrefix(o.Prefix, SlashSeparator) {
return entries, io.EOF
}
// If delimiter is slashSeparator we must return directories of
// the non-recursive scan unless explicitly requested.
o.IncludeDirectories = o.Separator == slashSeparator
if (o.Separator == slashSeparator || o.Separator == "") && !o.Recursive {
o.Recursive = o.Separator != slashSeparator
o.Separator = slashSeparator
} else {
// Default is recursive, if delimiter is set then list non recursive.
o.Recursive = true
}
// Decode and get the optional list id from the marker.
o.parseMarker()
o.BaseDir = baseDirFromPrefix(o.Prefix)
o.Transient = o.Transient || isReservedOrInvalidBucket(o.Bucket, false)
o.SetFilter()
if o.Transient {
o.Create = false
}
// We have 2 cases:
// 1) Cold listing, just list.
// 2) Returning, but with no id. Start async listing.
// 3) Returning, with ID, stream from list.
//
// If we don't have a list id we must ask the server if it has a cache or create a new.
if o.ID != "" && !o.Transient {
// Create or ping with handout...
rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix))
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var c *metacache
if rpc == nil {
resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o)
c = &resp
} else {
c, err = rpc.GetMetacacheListing(ctx, *o)
}
if err != nil {
if errors.Is(err, context.Canceled) {
// Context is canceled, return at once.
// request canceled, no entries to return
return entries, io.EOF
}
if !errors.Is(err, context.DeadlineExceeded) {
o.debugln("listPath: got error", err)
}
o.Transient = true
o.Create = false
o.ID = mustGetUUID()
} else {
if c.fileNotFound {
// No cache found, no entries found.
return entries, io.EOF
}
if c.status == scanStateError || c.status == scanStateNone {
o.ID = ""
o.Create = false
o.debugln("scan status", c.status, " - waiting a roundtrip to create")
} else {
// Continue listing
o.ID = c.id
go func(meta metacache) {
// Continuously update while we wait.
t := time.NewTicker(metacacheMaxClientWait / 10)
defer t.Stop()
select {
case <-ctx.Done():
// Request is done, stop updating.
return
case <-t.C:
meta.lastHandout = time.Now()
if rpc == nil {
meta, _ = localMetacacheMgr.updateCacheEntry(meta)
}
meta, _ = rpc.UpdateMetacacheListing(ctx, meta)
}
}(*c)
}
}
// We have an existing list ID, continue streaming.
if o.Create {
o.debugln("Creating", o)
entries, err = es.listAndSave(ctx, o)
if err == nil || err == io.EOF {
return entries, err
}
entries.truncate(0)
} else {
o.debugln("Resuming", o)
entries, err = es.streamMetadataParts(ctx, *o)
entries.reuse = true // We read from stream and are not sharing results.
if err == nil {
return entries, nil
}
}
if IsErr(err, []error{
nil,
context.Canceled,
context.DeadlineExceeded,
// io.EOF is expected and should be returned but no need to log it.
io.EOF,
}...) {
// Expected good errors we don't need to return error.
return entries, err
}
entries.truncate(0)
go func() {
rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix))
if rpc != nil {
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
defer cancel()
c, err := rpc.GetMetacacheListing(ctx, *o)
if err == nil {
c.error = "no longer used"
c.status = scanStateError
rpc.UpdateMetacacheListing(ctx, *c)
}
}
}()
o.ID = ""
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Resuming listing from drives failed %w, proceeding to do raw listing", err))
}
}
// Do listing in-place.
// Create output for our results.
// Create filter for results.
o.debugln("Raw List", o)
filterCh := make(chan metaCacheEntry, o.Limit)
listCtx, cancelList := context.WithCancel(ctx)
filteredResults := o.gatherResults(listCtx, filterCh)
var wg sync.WaitGroup
wg.Add(1)
var listErr error
go func(o listPathOptions) {
defer wg.Done()
o.Limit = 0
listErr = es.listMerged(listCtx, o, filterCh)
o.debugln("listMerged returned with", listErr)
}(*o)
entries, err = filteredResults()
cancelList()
wg.Wait()
if listErr != nil && !errors.Is(listErr, context.Canceled) {
return entries, listErr
}
entries.reuse = true
truncated := entries.len() > o.Limit || err == nil
entries.truncate(o.Limit)
if !o.Transient && truncated {
if o.ID == "" {
entries.listID = mustGetUUID()
} else {
entries.listID = o.ID
}
}
if !truncated {
return entries, io.EOF
}
return entries, nil
}
// listMerged will list across all sets and return a merged results stream.
// The result channel is closed when no more results are expected.
func (es *erasureSingle) listMerged(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) error {
var mu sync.Mutex
var wg sync.WaitGroup
var listErr error
var inputs []chan metaCacheEntry
innerResults := make(chan metaCacheEntry, 100)
inputs = append(inputs, innerResults)
mu.Lock()
listCtx, cancelList := context.WithCancel(ctx)
defer cancelList()
wg.Add(1)
go func() {
defer wg.Done()
err := es.listPathInner(listCtx, o, innerResults)
mu.Lock()
defer mu.Unlock()
listErr = err
}()
mu.Unlock()
// Do lifecycle filtering.
if o.Lifecycle != nil {
filterIn := make(chan metaCacheEntry, 10)
go filterLifeCycle(ctx, o.Bucket, *o.Lifecycle, o.Retention, filterIn, results)
// Replace results.
results = filterIn
}
// Gather results to a single channel.
err := mergeEntryChannels(ctx, inputs, results, func(existing, other *metaCacheEntry) (replace bool) {
// Pick object over directory
if existing.isDir() && !other.isDir() {
return true
}
if !existing.isDir() && other.isDir() {
return false
}
eMeta, err := existing.xlmeta()
if err != nil {
return true
}
oMeta, err := other.xlmeta()
if err != nil {
return false
}
// Replace if modtime is newer
if !oMeta.latestModtime().Equal(oMeta.latestModtime()) {
return oMeta.latestModtime().After(eMeta.latestModtime())
}
// Use NumVersions as a final tiebreaker.
return len(oMeta.versions) > len(eMeta.versions)
})
cancelList()
wg.Wait()
if err != nil {
return err
}
if listErr != nil {
if contextCanceled(ctx) {
return nil
}
if listErr.Error() == io.EOF.Error() {
return nil
}
logger.LogIf(ctx, listErr)
return listErr
}
if contextCanceled(ctx) {
return ctx.Err()
}
return nil
}
// listMerged will list across all sets and return a merged results stream.
// The result channel is closed when no more results are expected.
func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) error {
@@ -395,6 +675,73 @@ func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle,
}
}
func (es *erasureSingle) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) {
// Use ID as the object name...
o.pool = 0
o.set = 0
saver := es
// Disconnect from call above, but cancel on exit.
listCtx, cancel := context.WithCancel(GlobalContext)
saveCh := make(chan metaCacheEntry, metacacheBlockSize)
inCh := make(chan metaCacheEntry, metacacheBlockSize)
outCh := make(chan metaCacheEntry, o.Limit)
filteredResults := o.gatherResults(ctx, outCh)
mc := o.newMetacache()
meta := metaCacheRPC{meta: &mc, cancel: cancel, rpc: globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix)), o: *o}
// Save listing...
go func() {
if err := saver.saveMetaCacheStream(listCtx, &meta, saveCh); err != nil {
meta.setErr(err.Error())
}
cancel()
}()
// Do listing...
go func(o listPathOptions) {
err := es.listMerged(listCtx, o, inCh)
if err != nil {
meta.setErr(err.Error())
}
o.debugln("listAndSave: listing", o.ID, "finished with ", err)
}(*o)
// Keep track of when we return since we no longer have to send entries to output.
var funcReturned bool
var funcReturnedMu sync.Mutex
defer func() {
funcReturnedMu.Lock()
funcReturned = true
funcReturnedMu.Unlock()
}()
// Write listing to results and saver.
go func() {
var returned bool
for entry := range inCh {
if !returned {
funcReturnedMu.Lock()
returned = funcReturned
funcReturnedMu.Unlock()
outCh <- entry
if returned {
close(outCh)
}
}
entry.reusable = returned
saveCh <- entry
}
if !returned {
close(outCh)
}
close(saveCh)
}()
return filteredResults()
}
func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) {
// Use ID as the object name...
o.pool = z.getAvailablePoolIdx(ctx, minioMetaBucket, o.ID, 10<<20)

View File

@@ -543,6 +543,170 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
}
}
func (es *erasureSingle) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
retries := 0
rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix))
for {
if contextCanceled(ctx) {
return entries, ctx.Err()
}
// If many failures, check the cache state.
if retries > 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("remote listing canceled: %w", err)
}
retries = 1
}
const retryDelay = 250 * time.Millisecond
// All operations are performed without locks, so we must be careful and allow for failures.
// Read metadata associated with the object from a disk.
if retries > 0 {
_, err := es.disk.ReadVersion(ctx, minioMetaBucket,
o.objectPath(0), "", false)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
}
// Load first part metadata...
// Read metadata associated with the object from all disks.
fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true)
if err != nil {
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
case ObjectNotFound:
retries++
time.Sleep(retryDelay)
continue
case InsufficientReadQuorum:
retries++
time.Sleep(retryDelay)
continue
default:
return entries, fmt.Errorf("reading first part metadata: %w", err)
}
}
partN, err := o.findFirstPart(fi)
switch {
case err == nil:
case errors.Is(err, io.ErrUnexpectedEOF):
if retries == 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("remote listing canceled: %w", err)
}
retries = -1
}
retries++
time.Sleep(retryDelay)
continue
case errors.Is(err, io.EOF):
return entries, io.EOF
}
// We got a stream to start at.
loadedPart := 0
for {
if contextCanceled(ctx) {
return entries, ctx.Err()
}
if partN != loadedPart {
if retries > 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("waiting for next part %d: %w", partN, err)
}
retries = 1
}
if retries > 0 {
// Load from one disk only
_, err := es.disk.ReadVersion(ctx, minioMetaBucket,
o.objectPath(partN), "", false)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
}
// Load partN metadata...
fi, metaArr, onlineDisks, err = es.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
loadedPart = partN
bi, err := getMetacacheBlockInfo(fi, partN)
logger.LogIf(ctx, err)
if err == nil {
if bi.pastPrefix(o.Prefix) {
return entries, io.EOF
}
}
}
pr, pw := io.Pipe()
go func() {
werr := es.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0,
fi.Size, pw, fi, metaArr, onlineDisks)
pw.CloseWithError(werr)
}()
tmp := newMetacacheReader(pr)
e, err := tmp.filter(o)
pr.CloseWithError(err)
entries.o = append(entries.o, e.o...)
if o.Limit > 0 && entries.len() > o.Limit {
entries.truncate(o.Limit)
return entries, nil
}
if err == nil {
// We stopped within the listing, we are done for now...
return entries, nil
}
if err != nil && err.Error() != io.EOF.Error() {
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
case ObjectNotFound:
retries++
time.Sleep(retryDelay)
continue
case InsufficientReadQuorum:
retries++
time.Sleep(retryDelay)
continue
default:
logger.LogIf(ctx, err)
return entries, err
}
}
// We finished at the end of the block.
// And should not expect any more results.
bi, err := getMetacacheBlockInfo(fi, partN)
logger.LogIf(ctx, err)
if err != nil || bi.EOS {
// We are done and there are no more parts.
return entries, io.EOF
}
if bi.endedPrefix(o.Prefix) {
// Nothing more for prefix.
return entries, io.EOF
}
partN++
retries = 0
}
}
}
// getListQuorum interprets list quorum values and returns appropriate
// acceptable quorum expected for list operations
func getListQuorum(quorum string, driveCount int) int {
@@ -562,6 +726,53 @@ func getListQuorum(quorum string, driveCount int) int {
return 3
}
// Will return io.EOF if continuing would not yield more results.
func (es *erasureSingle) listPathInner(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) {
defer close(results)
o.debugf(color.Green("listPath:")+" with options: %#v", o)
// How to resolve results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: o.Bucket,
}
// Maximum versions requested for "latest" object
// resolution on versioned buckets, this is to be only
// used when o.Versioned is false
if !o.Versioned {
resolver.requestedVersions = 1
}
ctxDone := ctx.Done()
return listPathRaw(ctx, listPathRawOptions{
disks: []StorageAPI{es.disk},
bucket: o.Bucket,
path: o.BaseDir,
recursive: o.Recursive,
filterPrefix: o.FilterPrefix,
minDisks: 1,
forwardTo: o.Marker,
agreed: func(entry metaCacheEntry) {
select {
case <-ctxDone:
case results <- entry:
}
},
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
// Results Disagree :-(
entry, ok := entries.resolve(&resolver)
if ok {
select {
case <-ctxDone:
case results <- *entry:
}
}
},
})
}
// Will return io.EOF if continuing would not yield more results.
func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) {
defer close(results)
@@ -654,6 +865,133 @@ func (m *metaCacheRPC) setErr(err string) {
*m.meta = meta
}
func (es *erasureSingle) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) {
o := mc.o
o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o)
metaMu := &mc.mu
rpc := mc.rpc
cancel := mc.cancel
defer func() {
o.debugln(color.Green("saveMetaCacheStream:")+"err:", err)
if err != nil && !errors.Is(err, io.EOF) {
go mc.setErr(err.Error())
cancel()
}
}()
defer cancel()
// Save continuous updates
go func() {
var err error
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
var exit bool
for !exit {
select {
case <-ticker.C:
case <-ctx.Done():
exit = true
}
metaMu.Lock()
meta := *mc.meta
meta, err = o.updateMetacacheListing(meta, rpc)
if err == nil && time.Since(meta.lastHandout) > metacacheMaxClientWait {
cancel()
exit = true
meta.status = scanStateError
meta.error = fmt.Sprintf("listing canceled since time since last handout was %v ago", time.Since(meta.lastHandout).Round(time.Second))
o.debugln(color.Green("saveMetaCacheStream: ") + meta.error)
meta, err = o.updateMetacacheListing(meta, rpc)
}
if err == nil {
*mc.meta = meta
if meta.status == scanStateError {
cancel()
exit = true
}
}
metaMu.Unlock()
}
}()
const retryDelay = 200 * time.Millisecond
const maxTries = 5
// Keep destination...
// Write results to disk.
bw := newMetacacheBlockWriter(entries, func(b *metacacheBlock) error {
// if the block is 0 bytes and its a first block skip it.
// skip only this for Transient caches.
if len(b.data) == 0 && b.n == 0 && o.Transient {
return nil
}
o.debugln(color.Green("saveMetaCacheStream:")+" saving block", b.n, "to", o.objectPath(b.n))
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
logger.LogIf(ctx, err)
custom := b.headerKV()
_, err = es.putMetacacheObject(ctx, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
UserDefined: custom,
})
if err != nil {
mc.setErr(err.Error())
cancel()
return err
}
if b.n == 0 {
return nil
}
// Update block 0 metadata.
var retries int
for {
meta := b.headerKV()
fi := FileInfo{
Metadata: make(map[string]string, len(meta)),
}
for k, v := range meta {
fi.Metadata[k] = v
}
err := es.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi, es.disk)
if err == nil {
break
}
switch err.(type) {
case ObjectNotFound:
return err
case StorageErr:
return err
case InsufficientReadQuorum:
default:
logger.LogIf(ctx, err)
}
if retries >= maxTries {
return err
}
retries++
time.Sleep(retryDelay)
}
return nil
})
// Blocks while consuming entries or an error occurs.
err = bw.Close()
if err != nil {
mc.setErr(err.Error())
}
metaMu.Lock()
defer metaMu.Unlock()
if mc.meta.error != "" {
return err
}
// Save success
mc.meta.status = scanStateSuccess
meta, err := o.updateMetacacheListing(*mc.meta, rpc)
if err == nil {
*mc.meta = meta
}
return nil
}
func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) {
o := mc.o
o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o)

View File

@@ -155,9 +155,9 @@ func (m *metacache) delete(ctx context.Context) {
logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
return
}
ez, ok := objAPI.(*erasureServerPools)
ez, ok := objAPI.(renameAllStorager)
if !ok {
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools"))
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be 'renameAllStorager'"))
return
}
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))

View File

@@ -1365,7 +1365,7 @@ func getMinioHealingMetrics() *MetricsGroup {
mg := &MetricsGroup{}
mg.RegisterRead(func(_ context.Context) (metrics []Metric) {
metrics = make([]Metric, 0, 5)
if !globalIsErasure {
if globalIsGateway {
return
}
bgSeq, exists := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
@@ -1817,7 +1817,7 @@ func getClusterStorageMetrics() *MetricsGroup {
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil || !globalIsErasure {
if objLayer == nil || globalIsGateway {
return
}

View File

@@ -132,7 +132,7 @@ func nodeHealthMetricsPrometheus(ch chan<- prometheus.Metric) {
// collects healing specific metrics for MinIO instance in Prometheus specific format
// and sends to given channel
func healingMetricsPrometheus(ch chan<- prometheus.Metric) {
if !globalIsErasure {
if globalIsGateway {
return
}
bgSeq, exists := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)

View File

@@ -35,9 +35,6 @@ func TestListObjectsVersionedFolders(t *testing.T) {
}
func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
if instanceType == FSTestStr {
return
}
t, _ := t1.(*testing.T)
testBuckets := []string{
// This bucket is used for testing ListObject operations.
@@ -317,9 +314,6 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
}
func _testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler, versioned bool) {
if instanceType == FSTestStr && versioned {
return
}
t, _ := t1.(*testing.T)
testBuckets := []string{
// This bucket is used for testing ListObject operations.
@@ -1020,10 +1014,6 @@ func TestDeleteObjectVersionMarker(t *testing.T) {
}
func testDeleteObjectVersion(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
if instanceType == FSTestStr {
return
}
t, _ := t1.(*testing.T)
testBuckets := []string{
@@ -1101,10 +1091,6 @@ func TestListObjectVersions(t *testing.T) {
// Unit test for ListObjectVersions
func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
if instanceType == FSTestStr {
return
}
t, _ := t1.(*testing.T)
testBuckets := []string{
// This bucket is used for testing ListObject operations.
@@ -1886,16 +1872,14 @@ func testListObjectsContinuation(obj ObjectLayer, instanceType string, t1 TestEr
// Initialize FS backend for the benchmark.
func initFSObjectsB(disk string, t *testing.B) (obj ObjectLayer) {
var err error
obj, err = NewFSObjectLayer(disk)
obj, _, err := initObjectLayer(context.Background(), mustGetPoolEndpoints(disk))
if err != nil {
t.Fatal("Unexpected err: ", err)
t.Fatal(err)
}
newTestConfig(globalMinioDefaultRegion, obj)
initAllSubsystems()
return obj
}

View File

@@ -1620,39 +1620,35 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler)
t.Errorf("Test %d: %s: Expected Bucket to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.Bucket, actualResult.Bucket)
}
// ListObjectParts returns empty response always in FS mode
if instanceType != FSTestStr {
// Asserting IsTruncated.
if actualResult.IsTruncated != testCase.expectedResult.IsTruncated {
t.Errorf("Test %d: %s: Expected IsTruncated to be \"%v\", but found it to \"%v\"", i+1, instanceType, expectedResult.IsTruncated, actualResult.IsTruncated)
continue
// Asserting IsTruncated.
if actualResult.IsTruncated != testCase.expectedResult.IsTruncated {
t.Errorf("Test %d: %s: Expected IsTruncated to be \"%v\", but found it to \"%v\"", i+1, instanceType, expectedResult.IsTruncated, actualResult.IsTruncated)
continue
}
// Asserting NextPartNumberMarker.
if actualResult.NextPartNumberMarker != expectedResult.NextPartNumberMarker {
t.Errorf("Test %d: %s: Expected NextPartNumberMarker to be \"%d\", but instead found it to be \"%d\"", i+1, instanceType, expectedResult.NextPartNumberMarker, actualResult.NextPartNumberMarker)
continue
}
// Asserting the number of Parts.
if len(expectedResult.Parts) != len(actualResult.Parts) {
t.Errorf("Test %d: %s: Expected the result to contain info of %d Parts, but found %d instead", i+1, instanceType, len(expectedResult.Parts), len(actualResult.Parts))
continue
}
// Iterating over the partInfos and asserting the fields.
for j, actualMetaData := range actualResult.Parts {
// Asserting the PartNumber in the PartInfo.
if actualMetaData.PartNumber != expectedResult.Parts[j].PartNumber {
t.Errorf("Test %d: %s: Part %d: Expected PartNumber to be \"%d\", but instead found \"%d\"", i+1, instanceType, j+1, expectedResult.Parts[j].PartNumber, actualMetaData.PartNumber)
}
// Asserting NextPartNumberMarker.
if actualResult.NextPartNumberMarker != expectedResult.NextPartNumberMarker {
t.Errorf("Test %d: %s: Expected NextPartNumberMarker to be \"%d\", but instead found it to be \"%d\"", i+1, instanceType, expectedResult.NextPartNumberMarker, actualResult.NextPartNumberMarker)
continue
// Asserting the Size in the PartInfo.
if actualMetaData.Size != expectedResult.Parts[j].Size {
t.Errorf("Test %d: %s: Part %d: Expected Part Size to be \"%d\", but instead found \"%d\"", i+1, instanceType, j+1, expectedResult.Parts[j].Size, actualMetaData.Size)
}
// Asserting the number of Parts.
if len(expectedResult.Parts) != len(actualResult.Parts) {
t.Errorf("Test %d: %s: Expected the result to contain info of %d Parts, but found %d instead", i+1, instanceType, len(expectedResult.Parts), len(actualResult.Parts))
continue
// Asserting the ETag in the PartInfo.
if actualMetaData.ETag != expectedResult.Parts[j].ETag {
t.Errorf("Test %d: %s: Part %d: Expected Etag to be \"%s\", but instead found \"%s\"", i+1, instanceType, j+1, expectedResult.Parts[j].ETag, actualMetaData.ETag)
}
// Iterating over the partInfos and asserting the fields.
for j, actualMetaData := range actualResult.Parts {
// Asserting the PartNumber in the PartInfo.
if actualMetaData.PartNumber != expectedResult.Parts[j].PartNumber {
t.Errorf("Test %d: %s: Part %d: Expected PartNumber to be \"%d\", but instead found \"%d\"", i+1, instanceType, j+1, expectedResult.Parts[j].PartNumber, actualMetaData.PartNumber)
}
// Asserting the Size in the PartInfo.
if actualMetaData.Size != expectedResult.Parts[j].Size {
t.Errorf("Test %d: %s: Part %d: Expected Part Size to be \"%d\", but instead found \"%d\"", i+1, instanceType, j+1, expectedResult.Parts[j].Size, actualMetaData.Size)
}
// Asserting the ETag in the PartInfo.
if actualMetaData.ETag != expectedResult.Parts[j].ETag {
t.Errorf("Test %d: %s: Part %d: Expected Etag to be \"%s\", but instead found \"%s\"", i+1, instanceType, j+1, expectedResult.Parts[j].ETag, actualMetaData.ETag)
}
}
}
}
}

View File

@@ -950,7 +950,7 @@ func compressSelfTest() {
// getDiskInfos returns the disk information for the provided disks.
// If a disk is nil or an error is returned the result will be nil as well.
func getDiskInfos(ctx context.Context, disks []StorageAPI) []*DiskInfo {
func getDiskInfos(ctx context.Context, disks ...StorageAPI) []*DiskInfo {
res := make([]*DiskInfo, len(disks))
for i, disk := range disks {
if disk == nil {

View File

@@ -1954,7 +1954,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
if err != nil {
t.Fatalf("Test %d: %s: Failed to look for copied object part: <ERROR> %s", i+1, instanceType, err)
}
if instanceType != FSTestStr && len(results.Parts) != 1 {
if len(results.Parts) != 1 {
t.Fatalf("Test %d: %s: Expected only one entry returned %d entries", i+1, instanceType, len(results.Parts))
}
}

View File

@@ -150,13 +150,13 @@ func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpo
defer func(storageDisks []StorageAPI) {
if err != nil {
closeStorageDisks(storageDisks)
closeStorageDisks(storageDisks...)
}
}(storageDisks)
for i, err := range errs {
if err != nil {
if err == errDiskNotFound && verboseLogging {
if err != nil && !errors.Is(err, errXLBackend) {
if errors.Is(err, errDiskNotFound) && verboseLogging {
logger.Error("Unable to connect to %s: %v", endpoints[i], isServerResolvable(endpoints[i], time.Second))
} else {
logger.Error("Unable to use the drive %s: %v", endpoints[i], err)
@@ -173,7 +173,7 @@ func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpo
// Check if we have
for i, sErr := range sErrs {
// print the error, nonetheless, which is perhaps unhandled
if sErr != errUnformattedDisk && sErr != errDiskNotFound && verboseLogging {
if !errors.Is(sErr, errUnformattedDisk) && !errors.Is(sErr, errDiskNotFound) && verboseLogging {
if sErr != nil {
logger.Error("Unable to read 'format.json' from %s: %v\n", endpoints[i], sErr)
}

View File

@@ -222,6 +222,7 @@ func serverHandleCmdArgs(ctx *cli.Context) {
if globalIsDistErasure {
globalIsErasure = true
}
globalIsErasureSD = (setupType == ErasureSDSetupType)
}
func serverHandleEnvVars() {
@@ -232,13 +233,11 @@ func serverHandleEnvVars() {
var globalHealStateLK sync.RWMutex
func initAllSubsystems() {
if globalIsErasure {
globalHealStateLK.Lock()
// New global heal state
globalAllHealState = newHealState(true)
globalBackgroundHealState = newHealState(false)
globalHealStateLK.Unlock()
}
globalHealStateLK.Lock()
// New global heal state
globalAllHealState = newHealState(true)
globalBackgroundHealState = newHealState(false)
globalHealStateLK.Unlock()
// Create new notification system and initialize notification peer targets
globalNotificationSys = NewNotificationSys(globalEndpoints)
@@ -527,11 +526,8 @@ func serverMain(ctx *cli.Context) {
xhttp.SetMinIOVersion(Version)
// Enable background operations for erasure coding
if globalIsErasure {
initAutoHeal(GlobalContext, newObject)
initHealMRF(GlobalContext, newObject)
}
initAutoHeal(GlobalContext, newObject)
initHealMRF(GlobalContext, newObject)
initBackgroundExpiry(GlobalContext, newObject)
if globalActiveCred.Equal(auth.DefaultCredentials) {
@@ -579,21 +575,19 @@ func serverMain(ctx *cli.Context) {
// Background all other operations such as initializing bucket metadata etc.
go func() {
// Initialize transition tier configuration manager
if globalIsErasure {
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
go func() {
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
logger.LogIf(GlobalContext, err)
}
go func() {
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
logger.LogIf(GlobalContext, err)
}
globalTierJournal, err = initTierDeletionJournal(GlobalContext)
if err != nil {
logger.FatalIf(err, "Unable to initialize remote tier pending deletes journal")
}
}()
}
globalTierJournal, err = initTierDeletionJournal(GlobalContext)
if err != nil {
logger.FatalIf(err, "Unable to initialize remote tier pending deletes journal")
}
}()
// Initialize site replication manager.
globalSiteReplicationSys.Init(GlobalContext, newObject)
@@ -664,7 +658,13 @@ func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools
// For FS only, directly use the disk.
if endpointServerPools.NEndpoints() == 1 {
// Initialize new FS object layer.
return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
newObject, err = NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
if err == nil {
return newObject, nil
}
if err != nil && err != errFreshDisk {
return newObject, err
}
}
return newErasureServerPools(ctx, endpointServerPools)

View File

@@ -27,7 +27,7 @@ import (
func TestNewObjectLayer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Tests for FS object layer.
// Tests for ErasureSD object layer.
nDisks := 1
disks, err := getRandomDisks(nDisks)
if err != nil {
@@ -39,7 +39,7 @@ func TestNewObjectLayer(t *testing.T) {
if err != nil {
t.Fatal("Unexpected object layer initialization error", err)
}
_, ok := obj.(*FSObjects)
_, ok := obj.(*erasureSingle)
if !ok {
t.Fatal("Unexpected object layer detected", reflect.TypeOf(obj))
}

View File

@@ -39,7 +39,7 @@ import (
"github.com/minio/pkg/bucket/policy"
)
// API suite container common to both FS and Erasure.
// API suite container common to both ErasureSD and Erasure.
type TestSuiteCommon struct {
serverType string
testServer TestServer
@@ -122,12 +122,12 @@ func runAllTests(suite *TestSuiteCommon, c *check) {
func TestServerSuite(t *testing.T) {
testCases := []*TestSuiteCommon{
// Init and run test on FS backend with signature v4.
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend with signature v2.
{serverType: "FS", signer: signerV2},
// Init and run test on FS backend, with tls enabled.
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on ErasureSD backend with signature v4.
{serverType: "ErasureSD", signer: signerV4},
// Init and run test on ErasureSD backend with signature v2.
{serverType: "ErasureSD", signer: signerV2},
// Init and run test on ErasureSD backend, with tls enabled.
{serverType: "ErasureSD", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.

View File

@@ -27,6 +27,9 @@ const (
// FSSetupType - FS setup type enum.
FSSetupType
// ErasureSDSetupType - Erasure single drive setup enum.
ErasureSDSetupType
// ErasureSetupType - Erasure setup type enum.
ErasureSetupType
@@ -41,6 +44,8 @@ func (setupType SetupType) String() string {
switch setupType {
case FSSetupType:
return globalMinioModeFS
case ErasureSDSetupType:
return globalMinioModeErasureSD
case ErasureSetupType:
return globalMinioModeErasure
case DistErasureSetupType:

View File

@@ -116,6 +116,12 @@ var errDoneForNow = errors.New("done for now")
// to proceed to next entry.
var errSkipFile = errors.New("skip this file")
// Returned by FS drive mode when a fresh disk is specified.
var errFreshDisk = errors.New("FS backend requires existing disk")
// errXLBackend XL drive mode requires fresh deployment.
var errXLBackend = errors.New("XL backend requires fresh disk")
// StorageErr represents error generated by xlStorage call.
type StorageErr string

View File

@@ -1140,6 +1140,10 @@ func checkDiskFatalErrs(errs []error) error {
return errFaultyDisk
}
if countErrs(errs, errXLBackend) == len(errs) {
return errXLBackend
}
return nil
}
@@ -1152,6 +1156,8 @@ func checkDiskFatalErrs(errs []error) error {
// Do not like it :-(
func logFatalErrs(err error, endpoint Endpoint, exit bool) {
switch {
case errors.Is(err, errXLBackend):
logger.Fatal(config.ErrInvalidXLValue(err), "Unable to initialize backend")
case errors.Is(err, errUnsupportedDisk):
var hint string
if endpoint.URL != nil {

View File

@@ -389,7 +389,7 @@ func (sts *stsAPIHandlers) AssumeRoleWithSSO(w http.ResponseWriter, r *http.Requ
policyName = globalIAMSys.CurrentPolicies(policies)
}
if globalAuthZPlugin == nil {
if newGlobalAuthZPluginFn() == nil {
if !ok {
writeSTSErrorResponse(ctx, w, true, ErrSTSInvalidParameterValue,
fmt.Errorf("%s claim missing from the JWT token, credentials will not be generated", iamPolicyClaimNameOpenID()))
@@ -598,7 +598,7 @@ func (sts *stsAPIHandlers) AssumeRoleWithLDAPIdentity(w http.ResponseWriter, r *
// Check if this user or their groups have a policy applied.
ldapPolicies, _ := globalIAMSys.PolicyDBGet(ldapUserDN, false, groupDistNames...)
if len(ldapPolicies) == 0 && globalAuthZPlugin == nil {
if len(ldapPolicies) == 0 && newGlobalAuthZPluginFn() == nil {
writeSTSErrorResponse(ctx, w, true, ErrSTSInvalidParameterValue,
fmt.Errorf("expecting a policy to be set for user `%s` or one of their groups: `%s` - rejecting this request",
ldapUserDN, strings.Join(groupDistNames, "`,`")))

View File

@@ -42,10 +42,10 @@ func runAllIAMSTSTests(suite *TestSuiteIAM, c *check) {
func TestIAMInternalIDPSTSServerSuite(t *testing.T) {
baseTestCases := []TestSuiteCommon{
// Init and run test on FS backend with signature v4.
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend, with tls enabled.
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on ErasureSD backend with signature v4.
{serverType: "ErasureSD", signer: signerV4},
// Init and run test on ErasureSD backend, with tls enabled.
{serverType: "ErasureSD", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.

View File

@@ -78,6 +78,8 @@ func TestMain(m *testing.M) {
// set to 'true' when testing is invoked
globalIsTesting = true
globalIsCICD = globalIsTesting
globalActiveCred = auth.Credentials{
AccessKey: auth.DefaultAccessKey,
SecretKey: auth.DefaultSecretKey,
@@ -191,10 +193,14 @@ func prepareFS() (ObjectLayer, string, error) {
if err != nil {
return nil, "", err
}
obj, err := NewFSObjectLayer(fsDirs[0])
obj, _, err := initObjectLayer(context.Background(), mustGetPoolEndpoints(fsDirs...))
if err != nil {
return nil, "", err
}
initAllSubsystems()
globalIAMSys.Init(context.Background(), obj, globalEtcdClient, 2*time.Second)
return obj, fsDirs[0], nil
}
@@ -221,8 +227,7 @@ func prepareErasure16(ctx context.Context) (ObjectLayer, []string, error) {
// Initialize FS objects.
func initFSObjects(disk string, t *testing.T) (obj ObjectLayer) {
var err error
obj, err = NewFSObjectLayer(disk)
obj, _, err := initObjectLayer(context.Background(), mustGetPoolEndpoints(disk))
if err != nil {
t.Fatal(err)
}
@@ -242,8 +247,8 @@ type TestErrHandler interface {
}
const (
// FSTestStr is the string which is used as notation for Single node ObjectLayer in the unit tests.
FSTestStr string = "FS"
// ErasureSDStr is the string which is used as notation for Single node ObjectLayer in the unit tests.
ErasureSDStr string = "ErasureSD"
// ErasureTestStr is the string which is used as notation for Erasure ObjectLayer in the unit tests.
ErasureTestStr string = "Erasure"
@@ -1469,20 +1474,9 @@ func getRandomDisks(N int) ([]string, error) {
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
func newTestObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error) {
// For FS only, directly use the disk.
if endpointServerPools.NEndpoints() == 1 {
// Initialize new FS object layer.
return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
}
z, err := newErasureServerPools(ctx, endpointServerPools)
if err != nil {
return nil, err
}
initAllSubsystems()
return z, nil
return newErasureServerPools(ctx, endpointServerPools)
}
// initObjectLayer - Instantiates object layer and returns it.
@@ -1750,7 +1744,7 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [
credentials := globalActiveCred
// Executing the object layer tests for single node setup.
objAPITest(objLayer, FSTestStr, bucketFS, fsAPIRouter, credentials, t)
objAPITest(objLayer, ErasureSDStr, bucketFS, fsAPIRouter, credentials, t)
objLayer, erasureDisks, err := prepareErasure16(ctx)
if err != nil {
@@ -1816,7 +1810,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
// Executing the object layer tests for single node setup.
objTest(objLayer, FSTestStr, t)
objTest(objLayer, ErasureSDStr, t)
// Call clean up functions
cancel()

View File

@@ -74,7 +74,7 @@ func (api adminAPIHandlers) AddTierHandler(w http.ResponseWriter, r *http.Reques
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -132,7 +132,7 @@ func (api adminAPIHandlers) ListTierHandler(w http.ResponseWriter, r *http.Reque
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -158,7 +158,7 @@ func (api adminAPIHandlers) EditTierHandler(w http.ResponseWriter, r *http.Reque
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -210,7 +210,7 @@ func (api adminAPIHandlers) RemoveTierHandler(w http.ResponseWriter, r *http.Req
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -247,7 +247,7 @@ func (api adminAPIHandlers) VerifyTierHandler(w http.ResponseWriter, r *http.Req
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
@@ -273,7 +273,7 @@ func (api adminAPIHandlers) TierStatsHandler(w http.ResponseWriter, r *http.Requ
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
if globalIsGateway {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}

View File

@@ -906,6 +906,8 @@ func getMinioMode() string {
mode = globalMinioModeErasure
} else if globalIsGateway {
mode = globalMinioModeGatewayPrefix + globalGatewayName
} else if globalIsErasureSD {
mode = globalMinioModeErasureSD
}
return mode
}

View File

@@ -141,6 +141,9 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (F
// Will return -1 for unknown values.
func getXLDiskLoc(diskID string) (poolIdx, setIdx, diskIdx int) {
if api := newObjectLayerFn(); api != nil {
if globalIsErasureSD {
return 0, 0, 0
}
if ep, ok := api.(*erasureServerPools); ok {
if pool, set, disk, err := ep.getPoolAndSet(diskID); err == nil {
return pool, set, disk

View File

@@ -53,7 +53,7 @@ func isXLMetaFormatValid(version, format string) bool {
// Verifies if the backend format metadata is sane by validating
// the ErasureInfo, i.e. data and parity blocks.
func isXLMetaErasureInfoValid(data, parity int) bool {
return ((data >= parity) && (data != 0) && (parity != 0))
return ((data >= parity) && (data > 0) && (parity >= 0))
}
//go:generate msgp -file=$GOFILE -unexported

View File

@@ -62,13 +62,15 @@ func TestIsXLMetaErasureInfoValid(t *testing.T) {
{1, 5, 6, false},
{2, 5, 5, true},
{3, 0, 5, false},
{4, 5, 0, false},
{5, 5, 0, false},
{6, 5, 4, true},
{3, -1, 5, false},
{4, 5, -1, false},
{5, 5, 0, true},
{6, 5, 0, true},
{7, 5, 4, true},
}
for _, tt := range tests {
if got := isXLMetaErasureInfoValid(tt.data, tt.parity); got != tt.want {
t.Errorf("Test %d: Expected %v but received %v", tt.name, got, tt.want)
t.Errorf("Test %d: Expected %v but received %v -> %#v", tt.name, got, tt.want, tt)
}
}
}