mirror of https://github.com/minio/minio.git
Remove erasureSets and erasureObjects from ObjectLayer (#10442)
This commit is contained in:
parent
e959c5d71c
commit
493c714663
|
@ -354,7 +354,7 @@ func (b *BucketMetadata) Save(ctx context.Context, api ObjectLayer) error {
|
|||
|
||||
// deleteBucketMetadata deletes bucket metadata
|
||||
// If config does not exist no error is returned.
|
||||
func deleteBucketMetadata(ctx context.Context, obj ObjectLayer, bucket string) error {
|
||||
func deleteBucketMetadata(ctx context.Context, obj objectDeleter, bucket string) error {
|
||||
metadataFiles := []string{
|
||||
dataUsageCacheName,
|
||||
bucketMetadataFile,
|
||||
|
|
|
@ -46,7 +46,11 @@ func readConfig(ctx context.Context, objAPI ObjectLayer, configFile string) ([]b
|
|||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
func deleteConfig(ctx context.Context, objAPI ObjectLayer, configFile string) error {
|
||||
type objectDeleter interface {
|
||||
DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error)
|
||||
}
|
||||
|
||||
func deleteConfig(ctx context.Context, objAPI objectDeleter, configFile string) error {
|
||||
_, err := objAPI.DeleteObject(ctx, minioMetaBucket, configFile, ObjectOptions{})
|
||||
if err != nil && isErrObjectNotFound(err) {
|
||||
return errConfigNotFound
|
||||
|
|
|
@ -428,10 +428,15 @@ func (d *dataUsageCache) merge(other dataUsageCache) {
|
|||
}
|
||||
}
|
||||
|
||||
type objectIO interface {
|
||||
GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
|
||||
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
}
|
||||
|
||||
// load the cache content with name from minioMetaBackgroundOpsBucket.
|
||||
// Only backend errors are returned as errors.
|
||||
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
|
||||
func (d *dataUsageCache) load(ctx context.Context, store ObjectLayer, name string) error {
|
||||
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
|
||||
var buf bytes.Buffer
|
||||
err := store.GetObject(ctx, dataUsageBucket, name, 0, -1, &buf, "", ObjectOptions{})
|
||||
if err != nil {
|
||||
|
@ -450,7 +455,7 @@ func (d *dataUsageCache) load(ctx context.Context, store ObjectLayer, name strin
|
|||
}
|
||||
|
||||
// save the content of the cache to minioMetaBackgroundOpsBucket with the provided name.
|
||||
func (d *dataUsageCache) save(ctx context.Context, store ObjectLayer, name string) error {
|
||||
func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error {
|
||||
b := d.serialize()
|
||||
size := int64(len(b))
|
||||
r, err := hash.NewReader(bytes.NewReader(b), size, "", "", size, false)
|
||||
|
|
|
@ -240,7 +240,6 @@ func TestDataUsageUpdate(t *testing.T) {
|
|||
t.Fatal("got nil result")
|
||||
}
|
||||
if w.flatten {
|
||||
t.Log(e.Children)
|
||||
*e = got.flatten(*e)
|
||||
}
|
||||
if e.Size != int64(w.size) {
|
||||
|
|
|
@ -32,6 +32,7 @@ func TestErasureParentDirIsObject(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Unable to initialize 'Erasure' object layer.")
|
||||
}
|
||||
defer obj.Shutdown(context.Background())
|
||||
|
||||
// Remove all disks.
|
||||
for _, disk := range fsDisks {
|
||||
|
|
|
@ -99,6 +99,7 @@ func TestListOnlineDisks(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Prepare Erasure backend failed - %v", err)
|
||||
}
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(disks)
|
||||
|
||||
type tamperKind int
|
||||
|
@ -265,6 +266,7 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Prepare Erasure backend failed - %v", err)
|
||||
}
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(disks)
|
||||
|
||||
bucket := "bucket"
|
||||
|
|
|
@ -29,16 +29,6 @@ import (
|
|||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
)
|
||||
|
||||
func (er erasureObjects) ReloadFormat(ctx context.Context, dryRun bool) error {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
func (er erasureObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return madmin.HealResultItem{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// Heals a bucket if it doesn't exist on one of the disks, additionally
|
||||
// also heals the missing entries for bucket metadata files
|
||||
// `policy.json, notification.xml, listeners.json`.
|
||||
|
|
|
@ -39,6 +39,7 @@ func TestHealing(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
/*
|
||||
* MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
)
|
||||
|
||||
// ListObjectVersions - This is not implemented, look for erasure-zones.ListObjectVersions()
|
||||
func (er erasureObjects) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (loi ListObjectVersionsInfo, e error) {
|
||||
return loi, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListObjectsV2 - This is not implemented/needed anymore, look for erasure-zones.ListObjectsV2()
|
||||
func (er erasureObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi ListObjectsV2Info, e error) {
|
||||
return loi, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListObjects - This is not implemented/needed anymore, look for erasure-zones.ListObjects()
|
||||
func (er erasureObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
|
||||
return loi, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListBucketsHeal - This is not implemented/needed anymore, look for erasure-zones.ListBucketHeal()
|
||||
func (er erasureObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
|
||||
return nil, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListObjectsHeal - This is not implemented, look for erasure-zones.ListObjectsHeal()
|
||||
func (er erasureObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
|
||||
return ListObjectsInfo{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// HealObjects - This is not implemented/needed anymore, look for erasure-zones.HealObjects()
|
||||
func (er erasureObjects) HealObjects(ctx context.Context, bucket, prefix string, _ madmin.HealOpts, _ HealObjectFn) (e error) {
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
// Walk - This is not implemented/needed anymore, look for erasure-zones.Walk()
|
||||
func (er erasureObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, _ ObjectOptions) error {
|
||||
return NotImplemented{}
|
||||
}
|
|
@ -42,6 +42,7 @@ func TestRepeatPutObjectPart(t *testing.T) {
|
|||
}
|
||||
|
||||
// cleaning up of temporary test directories
|
||||
defer objLayer.Shutdown(context.Background())
|
||||
defer removeRoots(disks)
|
||||
|
||||
err = objLayer.MakeBucketWithLocation(ctx, "bucket1", BucketOptions{})
|
||||
|
@ -90,6 +91,7 @@ func TestErasureDeleteObjectBasic(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer xl.Shutdown(context.Background())
|
||||
|
||||
err = xl.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
if err != nil {
|
||||
|
@ -200,6 +202,7 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
// Cleanup backend directories
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
|
@ -269,6 +272,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
// Cleanup backend directories.
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
|
@ -331,6 +335,7 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
|||
}
|
||||
|
||||
// Cleanup backend directories.
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
|
|
|
@ -488,11 +488,6 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo,
|
|||
return storageInfo, errs
|
||||
}
|
||||
|
||||
func (s *erasureSets) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
|
||||
// Use the zone-level implementation instead.
|
||||
return NotImplemented{API: "CrawlAndGetDataUsage"}
|
||||
}
|
||||
|
||||
// Shutdown shutsdown all erasure coded sets in parallel
|
||||
// returns error upon first error.
|
||||
func (s *erasureSets) Shutdown(ctx context.Context) error {
|
||||
|
@ -510,7 +505,14 @@ func (s *erasureSets) Shutdown(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case _, ok := <-s.disksConnectEvent:
|
||||
if ok {
|
||||
close(s.disksConnectEvent)
|
||||
}
|
||||
default:
|
||||
close(s.disksConnectEvent)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -589,11 +591,6 @@ func (s *erasureSets) GetBucketInfo(ctx context.Context, bucket string) (bucketI
|
|||
return s.getHashedSet("").GetBucketInfo(ctx, bucket)
|
||||
}
|
||||
|
||||
// ListObjectsV2 lists all objects in bucket filtered by prefix
|
||||
func (s *erasureSets) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
|
||||
return result, NotImplemented{}
|
||||
}
|
||||
|
||||
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
||||
func (s *erasureSets) IsNotificationSupported() bool {
|
||||
return s.getHashedSet("").IsNotificationSupported()
|
||||
|
@ -1038,22 +1035,6 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
|
|||
return entryChs
|
||||
}
|
||||
|
||||
// ListObjectVersions - implements listing of objects across disks, each disk is indepenently
|
||||
// walked and merged at this layer. Resulting value through the merge process sends
|
||||
// the data in lexically sorted order.
|
||||
func (s *erasureSets) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionIDMarker, delimiter string, maxKeys int) (loi ListObjectVersionsInfo, err error) {
|
||||
// Shouldn't be called directly, caller Zones already has an implementation
|
||||
return loi, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListObjects - implements listing of objects across disks, each disk is indepenently
|
||||
// walked and merged at this layer. Resulting value through the merge process sends
|
||||
// the data in lexically sorted order.
|
||||
func (s *erasureSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
|
||||
// Shouldn't be called directly, caller Zones already has an implementation
|
||||
return loi, NotImplemented{}
|
||||
}
|
||||
|
||||
func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
||||
// In list multipart uploads we are going to treat input prefix as the object,
|
||||
// this means that we are not supporting directory navigation.
|
||||
|
@ -1621,18 +1602,6 @@ func (s *erasureSets) GetObjectTags(ctx context.Context, bucket, object string,
|
|||
return s.getHashedSet(object).GetObjectTags(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
// GetMetrics - no op
|
||||
func (s *erasureSets) GetMetrics(ctx context.Context) (*Metrics, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return &Metrics{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// Health shouldn't be called directly - will panic
|
||||
func (s *erasureSets) Health(ctx context.Context, _ HealthOptions) HealthResult {
|
||||
logger.CriticalIf(ctx, NotImplemented{})
|
||||
return HealthResult{}
|
||||
}
|
||||
|
||||
// maintainMRFList gathers the list of successful partial uploads
|
||||
// from all underlying er.sets and puts them in a global map which
|
||||
// should not have more than 10000 entries.
|
||||
|
|
|
@ -206,6 +206,7 @@ func TestHashedLayer(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal("Unable to initialize 'Erasure' object layer.", err)
|
||||
}
|
||||
defer obj.Shutdown(ctx)
|
||||
|
||||
// Remove all dirs.
|
||||
for _, dir := range fsDirs {
|
||||
|
|
|
@ -40,6 +40,9 @@ type erasureZones struct {
|
|||
GatewayUnsupported
|
||||
|
||||
zones []*erasureSets
|
||||
|
||||
// Shut down async operations
|
||||
shutdown context.CancelFunc
|
||||
}
|
||||
|
||||
func (z *erasureZones) SingleZone() bool {
|
||||
|
@ -79,7 +82,7 @@ func newErasureZones(ctx context.Context, endpointZones EndpointZones) (ObjectLa
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ctx, z.shutdown = context.WithCancel(ctx)
|
||||
go intDataUpdateTracker.start(ctx, localDrives...)
|
||||
return z, nil
|
||||
}
|
||||
|
@ -218,6 +221,7 @@ func (z *erasureZones) getZoneIdx(ctx context.Context, bucket, object string, op
|
|||
}
|
||||
|
||||
func (z *erasureZones) Shutdown(ctx context.Context) error {
|
||||
defer z.shutdown()
|
||||
if z.SingleZone() {
|
||||
return z.zones[0].Shutdown(ctx)
|
||||
}
|
||||
|
@ -237,7 +241,6 @@ func (z *erasureZones) Shutdown(ctx context.Context) error {
|
|||
}
|
||||
// let's the rest shutdown
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -81,6 +81,14 @@ func (er erasureObjects) SetDriveCount() int {
|
|||
func (er erasureObjects) Shutdown(ctx context.Context) error {
|
||||
// Add any object layer shutdown activities here.
|
||||
closeStorageDisks(er.getDisks())
|
||||
select {
|
||||
case _, ok := <-er.mrfOpCh:
|
||||
if ok {
|
||||
close(er.mrfOpCh)
|
||||
}
|
||||
default:
|
||||
close(er.mrfOpCh)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -245,19 +253,6 @@ func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageIn
|
|||
return getStorageInfo(disks, endpoints)
|
||||
}
|
||||
|
||||
// GetMetrics - is not implemented and shouldn't be called.
|
||||
func (er erasureObjects) GetMetrics(ctx context.Context) (*Metrics, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return &Metrics{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// CrawlAndGetDataUsage collects usage from all buckets.
|
||||
// updates are sent as different parts of the underlying
|
||||
// structure has been traversed.
|
||||
func (er erasureObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
|
||||
return NotImplemented{API: "CrawlAndGetDataUsage"}
|
||||
}
|
||||
|
||||
// CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed.
|
||||
// Updates are sent on a regular basis and the caller *must* consume them.
|
||||
func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error {
|
||||
|
@ -439,9 +434,3 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Health shouldn't be called directly - will panic
|
||||
func (er erasureObjects) Health(ctx context.Context, _ HealthOptions) HealthResult {
|
||||
logger.CriticalIf(ctx, NotImplemented{})
|
||||
return HealthResult{}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ func checkBucketAndObjectNames(ctx context.Context, bucket, object string) error
|
|||
}
|
||||
|
||||
// Checks for all ListObjects arguments validity.
|
||||
func checkListObjsArgs(ctx context.Context, bucket, prefix, marker string, obj ObjectLayer) error {
|
||||
func checkListObjsArgs(ctx context.Context, bucket, prefix, marker string, obj getBucketInfoI) error {
|
||||
// Verify if bucket exists before validating object name.
|
||||
// This is done on purpose since the order of errors is
|
||||
// important here bucket does not exist error should
|
||||
|
@ -173,7 +173,7 @@ func checkObjectArgs(ctx context.Context, bucket, object string, obj ObjectLayer
|
|||
}
|
||||
|
||||
// Checks for PutObject arguments validity, also validates if bucket exists.
|
||||
func checkPutObjectArgs(ctx context.Context, bucket, object string, obj ObjectLayer, size int64) error {
|
||||
func checkPutObjectArgs(ctx context.Context, bucket, object string, obj getBucketInfoI, size int64) error {
|
||||
// Verify if bucket exists before validating object name.
|
||||
// This is done on purpose since the order of errors is
|
||||
// important here bucket does not exist error should
|
||||
|
@ -197,8 +197,12 @@ func checkPutObjectArgs(ctx context.Context, bucket, object string, obj ObjectLa
|
|||
return nil
|
||||
}
|
||||
|
||||
type getBucketInfoI interface {
|
||||
GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
|
||||
}
|
||||
|
||||
// Checks whether bucket exists and returns appropriate error if not.
|
||||
func checkBucketExist(ctx context.Context, bucket string, obj ObjectLayer) error {
|
||||
func checkBucketExist(ctx context.Context, bucket string, obj getBucketInfoI) error {
|
||||
_, err := obj.GetBucketInfo(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -481,6 +481,7 @@ func newTestConfig(bucketLocation string, obj ObjectLayer) (err error) {
|
|||
func (testServer TestServer) Stop() {
|
||||
testServer.cancel()
|
||||
testServer.Server.Close()
|
||||
testServer.Obj.Shutdown(context.Background())
|
||||
os.RemoveAll(testServer.Root)
|
||||
for _, ep := range testServer.Disks {
|
||||
for _, disk := range ep.Endpoints {
|
||||
|
@ -1838,6 +1839,7 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [
|
|||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for Erasure setup: %s", err)
|
||||
}
|
||||
defer objLayer.Shutdown(ctx)
|
||||
|
||||
bucketErasure, erAPIRouter, err := initAPIHandlerTest(objLayer, endpoints)
|
||||
if err != nil {
|
||||
|
@ -1893,6 +1895,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
|
|||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for Erasure setup: %s", err)
|
||||
}
|
||||
defer objLayer.Shutdown(context.Background())
|
||||
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
|
||||
|
@ -1911,6 +1914,7 @@ func ExecObjectLayerTestWithDirs(t TestErrHandler, objTest objTestTypeWithDirs)
|
|||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for Erasure setup: %s", err)
|
||||
}
|
||||
defer objLayer.Shutdown(ctx)
|
||||
|
||||
// initialize the server and obtain the credentials and root.
|
||||
// credentials are necessary to sign the HTTP request.
|
||||
|
@ -1933,6 +1937,7 @@ func ExecObjectLayerDiskAlteredTest(t *testing.T, objTest objTestDiskNotFoundTyp
|
|||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for Erasure setup: %s", err)
|
||||
}
|
||||
defer objLayer.Shutdown(ctx)
|
||||
|
||||
if err = newTestConfig(globalMinioDefaultRegion, objLayer); err != nil {
|
||||
t.Fatal("Failed to create config directory", err)
|
||||
|
|
Loading…
Reference in New Issue