mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
Support bucket versioning (#9377)
- Implement a new xl.json 2.0.0 format to support, this moves the entire marshaling logic to POSIX layer, top layer always consumes a common FileInfo construct which simplifies the metadata reads. - Implement list object versions - Migrate to siphash from crchash for new deployments for object placements. Fixes #2111
This commit is contained in:
455
cmd/erasure.go
455
cmd/erasure.go
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2017 MinIO, Inc.
|
||||
* MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -18,136 +18,373 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/reedsolomon"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
"github.com/minio/minio/pkg/dsync"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
)
|
||||
|
||||
// Erasure - erasure encoding details.
|
||||
type Erasure struct {
|
||||
encoder func() reedsolomon.Encoder
|
||||
dataBlocks, parityBlocks int
|
||||
blockSize int64
|
||||
// OfflineDisk represents an unavailable disk.
|
||||
var OfflineDisk StorageAPI // zero value is nil
|
||||
|
||||
// partialUpload is a successful upload of an object
|
||||
// but not written in all disks (having quorum)
|
||||
type partialUpload struct {
|
||||
bucket string
|
||||
object string
|
||||
failedSet int
|
||||
}
|
||||
|
||||
// NewErasure creates a new ErasureStorage.
|
||||
func NewErasure(ctx context.Context, dataBlocks, parityBlocks int, blockSize int64) (e Erasure, err error) {
|
||||
e = Erasure{
|
||||
dataBlocks: dataBlocks,
|
||||
parityBlocks: parityBlocks,
|
||||
blockSize: blockSize,
|
||||
}
|
||||
// erasureObjects - Implements ER object layer.
|
||||
type erasureObjects struct {
|
||||
GatewayUnsupported
|
||||
|
||||
// Check the parameters for sanity now.
|
||||
if dataBlocks <= 0 || parityBlocks <= 0 {
|
||||
return e, reedsolomon.ErrInvShardNum
|
||||
}
|
||||
// getDisks returns list of storageAPIs.
|
||||
getDisks func() []StorageAPI
|
||||
|
||||
if dataBlocks+parityBlocks > 256 {
|
||||
return e, reedsolomon.ErrMaxShardNum
|
||||
}
|
||||
// getLockers returns list of remote and local lockers.
|
||||
getLockers func() []dsync.NetLocker
|
||||
|
||||
// Encoder when needed.
|
||||
var enc reedsolomon.Encoder
|
||||
var once sync.Once
|
||||
e.encoder = func() reedsolomon.Encoder {
|
||||
once.Do(func() {
|
||||
e, err := reedsolomon.New(dataBlocks, parityBlocks, reedsolomon.WithAutoGoroutines(int(e.ShardSize())))
|
||||
if err != nil {
|
||||
// Error conditions should be checked above.
|
||||
panic(err)
|
||||
}
|
||||
enc = e
|
||||
})
|
||||
return enc
|
||||
}
|
||||
return
|
||||
// getEndpoints returns list of endpoint strings belonging this set.
|
||||
// some may be local and some remote.
|
||||
getEndpoints func() []string
|
||||
|
||||
// Locker mutex map.
|
||||
nsMutex *nsLockMap
|
||||
|
||||
// Byte pools used for temporary i/o buffers.
|
||||
bp *bpool.BytePoolCap
|
||||
|
||||
mrfUploadCh chan partialUpload
|
||||
}
|
||||
|
||||
// EncodeData encodes the given data and returns the erasure-coded data.
|
||||
// It returns an error if the erasure coding failed.
|
||||
func (e *Erasure) EncodeData(ctx context.Context, data []byte) ([][]byte, error) {
|
||||
if len(data) == 0 {
|
||||
return make([][]byte, e.dataBlocks+e.parityBlocks), nil
|
||||
}
|
||||
encoded, err := e.encoder().Split(data)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
if err = e.encoder().Encode(encoded); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
return encoded, nil
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (er erasureObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return er.nsMutex.NewNSLock(ctx, er.getLockers, bucket, objects...)
|
||||
}
|
||||
|
||||
// DecodeDataBlocks decodes the given erasure-coded data.
|
||||
// It only decodes the data blocks but does not verify them.
|
||||
// It returns an error if the decoding failed.
|
||||
func (e *Erasure) DecodeDataBlocks(data [][]byte) error {
|
||||
var isZero = 0
|
||||
for _, b := range data[:] {
|
||||
if len(b) == 0 {
|
||||
isZero++
|
||||
break
|
||||
}
|
||||
}
|
||||
if isZero == 0 || isZero == len(data) {
|
||||
// If all are zero, payload is 0 bytes.
|
||||
return nil
|
||||
}
|
||||
return e.encoder().ReconstructData(data)
|
||||
}
|
||||
|
||||
// DecodeDataAndParityBlocks decodes the given erasure-coded data and verifies it.
|
||||
// It returns an error if the decoding failed.
|
||||
func (e *Erasure) DecodeDataAndParityBlocks(ctx context.Context, data [][]byte) error {
|
||||
needsReconstruction := false
|
||||
for _, b := range data {
|
||||
if b == nil {
|
||||
needsReconstruction = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !needsReconstruction {
|
||||
return nil
|
||||
}
|
||||
if err := e.encoder().Reconstruct(data); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
// Shutdown function for object storage interface.
|
||||
func (er erasureObjects) Shutdown(ctx context.Context) error {
|
||||
// Add any object layer shutdown activities here.
|
||||
closeStorageDisks(er.getDisks())
|
||||
return nil
|
||||
}
|
||||
|
||||
// ShardSize - returns actual shared size from erasure blockSize.
|
||||
func (e *Erasure) ShardSize() int64 {
|
||||
return ceilFrac(e.blockSize, int64(e.dataBlocks))
|
||||
// byDiskTotal is a collection satisfying sort.Interface.
|
||||
type byDiskTotal []DiskInfo
|
||||
|
||||
func (d byDiskTotal) Len() int { return len(d) }
|
||||
func (d byDiskTotal) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
||||
func (d byDiskTotal) Less(i, j int) bool {
|
||||
return d[i].Total < d[j].Total
|
||||
}
|
||||
|
||||
// ShardFileSize - returns final erasure size from original size.
|
||||
func (e *Erasure) ShardFileSize(totalLength int64) int64 {
|
||||
if totalLength == 0 {
|
||||
return 0
|
||||
// getDisksInfo - fetch disks info across all other storage API.
|
||||
func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
|
||||
disksInfo = make([]DiskInfo, len(disks))
|
||||
onlineDisks = make(madmin.BackendDisks)
|
||||
offlineDisks = make(madmin.BackendDisks)
|
||||
|
||||
for _, disk := range disks {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
peerAddr := disk.Hostname()
|
||||
if _, ok := offlineDisks[peerAddr]; !ok {
|
||||
offlineDisks[peerAddr] = 0
|
||||
}
|
||||
if _, ok := onlineDisks[peerAddr]; !ok {
|
||||
onlineDisks[peerAddr] = 0
|
||||
}
|
||||
}
|
||||
if totalLength == -1 {
|
||||
return -1
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
for index := range disks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if disks[index] == OfflineDisk {
|
||||
// Storage disk is empty, perhaps ignored disk or not available.
|
||||
return errDiskNotFound
|
||||
}
|
||||
info, err := disks[index].DiskInfo()
|
||||
if err != nil {
|
||||
if !IsErr(err, baseErrs...) {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String())
|
||||
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
disksInfo[index] = info
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
numShards := totalLength / e.blockSize
|
||||
lastBlockSize := totalLength % int64(e.blockSize)
|
||||
lastShardSize := ceilFrac(lastBlockSize, int64(e.dataBlocks))
|
||||
return numShards*e.ShardSize() + lastShardSize
|
||||
|
||||
errs = g.Wait()
|
||||
// Wait for the routines.
|
||||
for i, diskInfoErr := range errs {
|
||||
if disks[i] == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
if diskInfoErr != nil {
|
||||
offlineDisks[disks[i].Hostname()]++
|
||||
continue
|
||||
}
|
||||
onlineDisks[disks[i].Hostname()]++
|
||||
}
|
||||
|
||||
// Iterate over the passed endpoints arguments and check
|
||||
// if there are still disks missing from the offline/online lists
|
||||
// and update them accordingly.
|
||||
missingOfflineDisks := make(map[string]int)
|
||||
for _, zone := range globalEndpoints {
|
||||
for _, endpoint := range zone.Endpoints {
|
||||
// if local is set and endpoint is not local
|
||||
// we are not interested in remote disks.
|
||||
if local && !endpoint.IsLocal {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := offlineDisks[endpoint.Host]; !ok {
|
||||
missingOfflineDisks[endpoint.Host]++
|
||||
}
|
||||
}
|
||||
}
|
||||
for missingDisk, n := range missingOfflineDisks {
|
||||
onlineDisks[missingDisk] = 0
|
||||
offlineDisks[missingDisk] = n
|
||||
}
|
||||
|
||||
// Success.
|
||||
return disksInfo, errs, onlineDisks, offlineDisks
|
||||
}
|
||||
|
||||
// ShardFileTillOffset - returns the effectiv eoffset where erasure reading begins.
|
||||
func (e *Erasure) ShardFileTillOffset(startOffset, length, totalLength int64) int64 {
|
||||
shardSize := e.ShardSize()
|
||||
shardFileSize := e.ShardFileSize(totalLength)
|
||||
endShard := (startOffset + int64(length)) / e.blockSize
|
||||
tillOffset := endShard*shardSize + shardSize
|
||||
if tillOffset > shardFileSize {
|
||||
tillOffset = shardFileSize
|
||||
// Get an aggregated storage info across all disks.
|
||||
func getStorageInfo(disks []StorageAPI, local bool) (StorageInfo, []error) {
|
||||
disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, local)
|
||||
|
||||
// Sort so that the first element is the smallest.
|
||||
sort.Sort(byDiskTotal(disksInfo))
|
||||
|
||||
// Combine all disks to get total usage
|
||||
usedList := make([]uint64, len(disksInfo))
|
||||
totalList := make([]uint64, len(disksInfo))
|
||||
availableList := make([]uint64, len(disksInfo))
|
||||
mountPaths := make([]string, len(disksInfo))
|
||||
|
||||
for i, di := range disksInfo {
|
||||
usedList[i] = di.Used
|
||||
totalList[i] = di.Total
|
||||
availableList[i] = di.Free
|
||||
mountPaths[i] = di.MountPath
|
||||
}
|
||||
return tillOffset
|
||||
|
||||
storageInfo := StorageInfo{
|
||||
Used: usedList,
|
||||
Total: totalList,
|
||||
Available: availableList,
|
||||
MountPaths: mountPaths,
|
||||
}
|
||||
|
||||
storageInfo.Backend.Type = BackendErasure
|
||||
storageInfo.Backend.OnlineDisks = onlineDisks
|
||||
storageInfo.Backend.OfflineDisks = offlineDisks
|
||||
|
||||
return storageInfo, errs
|
||||
}
|
||||
|
||||
// StorageInfo - returns underlying storage statistics.
|
||||
func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
|
||||
disks := er.getDisks()
|
||||
if local {
|
||||
var localDisks []StorageAPI
|
||||
for _, disk := range disks {
|
||||
if disk != nil {
|
||||
if disk.IsLocal() {
|
||||
// Append this local disk since local flag is true
|
||||
localDisks = append(localDisks, disk)
|
||||
}
|
||||
}
|
||||
}
|
||||
disks = localDisks
|
||||
}
|
||||
return getStorageInfo(disks, local)
|
||||
}
|
||||
|
||||
// GetMetrics - is not implemented and shouldn't be called.
|
||||
func (er erasureObjects) GetMetrics(ctx context.Context) (*Metrics, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return &Metrics{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// CrawlAndGetDataUsage collects usage from all buckets.
|
||||
// updates are sent as different parts of the underlying
|
||||
// structure has been traversed.
|
||||
func (er erasureObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
|
||||
return NotImplemented{API: "CrawlAndGetDataUsage"}
|
||||
}
|
||||
|
||||
// CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed.
|
||||
// Updates are sent on a regular basis and the caller *must* consume them.
|
||||
func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error {
|
||||
var disks []StorageAPI
|
||||
|
||||
for _, d := range er.getLoadBalancedDisks() {
|
||||
if d == nil || !d.IsOnline() {
|
||||
continue
|
||||
}
|
||||
disks = append(disks, d)
|
||||
}
|
||||
if len(disks) == 0 || len(buckets) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load bucket totals
|
||||
oldCache := dataUsageCache{}
|
||||
err := oldCache.load(ctx, er, dataUsageCacheName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// New cache..
|
||||
cache := dataUsageCache{
|
||||
Info: dataUsageCacheInfo{
|
||||
Name: dataUsageRoot,
|
||||
NextCycle: oldCache.Info.NextCycle,
|
||||
},
|
||||
Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)),
|
||||
}
|
||||
|
||||
// Put all buckets into channel.
|
||||
bucketCh := make(chan BucketInfo, len(buckets))
|
||||
// Add new buckets first
|
||||
for _, b := range buckets {
|
||||
if oldCache.find(b.Name) == nil {
|
||||
bucketCh <- b
|
||||
}
|
||||
}
|
||||
// Add existing buckets.
|
||||
for _, b := range buckets {
|
||||
e := oldCache.find(b.Name)
|
||||
if e != nil {
|
||||
bucketCh <- b
|
||||
cache.replace(b.Name, dataUsageRoot, *e)
|
||||
}
|
||||
}
|
||||
|
||||
close(bucketCh)
|
||||
bucketResults := make(chan dataUsageEntryInfo, len(disks))
|
||||
|
||||
// Start async collector/saver.
|
||||
// This goroutine owns the cache.
|
||||
var saverWg sync.WaitGroup
|
||||
saverWg.Add(1)
|
||||
go func() {
|
||||
const updateTime = 30 * time.Second
|
||||
t := time.NewTicker(updateTime)
|
||||
defer t.Stop()
|
||||
defer saverWg.Done()
|
||||
var lastSave time.Time
|
||||
|
||||
saveLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Return without saving.
|
||||
return
|
||||
case <-t.C:
|
||||
if cache.Info.LastUpdate.Equal(lastSave) {
|
||||
continue
|
||||
}
|
||||
logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName))
|
||||
updates <- cache.clone()
|
||||
lastSave = cache.Info.LastUpdate
|
||||
case v, ok := <-bucketResults:
|
||||
if !ok {
|
||||
break saveLoop
|
||||
}
|
||||
cache.replace(v.Name, v.Parent, v.Entry)
|
||||
cache.Info.LastUpdate = time.Now()
|
||||
}
|
||||
}
|
||||
// Save final state...
|
||||
cache.Info.NextCycle++
|
||||
cache.Info.LastUpdate = time.Now()
|
||||
logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName))
|
||||
updates <- cache
|
||||
}()
|
||||
|
||||
// Start one crawler per disk
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(disks))
|
||||
for i := range disks {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
disk := disks[i]
|
||||
|
||||
for bucket := range bucketCh {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Load cache for bucket
|
||||
cacheName := pathJoin(bucket.Name, dataUsageCacheName)
|
||||
cache := dataUsageCache{}
|
||||
logger.LogIf(ctx, cache.load(ctx, er, cacheName))
|
||||
if cache.Info.Name == "" {
|
||||
cache.Info.Name = bucket.Name
|
||||
}
|
||||
if cache.Info.Name != bucket.Name {
|
||||
logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name))
|
||||
cache.Info = dataUsageCacheInfo{
|
||||
Name: bucket.Name,
|
||||
LastUpdate: time.Time{},
|
||||
NextCycle: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Calc usage
|
||||
before := cache.Info.LastUpdate
|
||||
cache, err = disk.CrawlAndGetDataUsage(ctx, cache)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
if cache.Info.LastUpdate.After(before) {
|
||||
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var root dataUsageEntry
|
||||
if r := cache.root(); r != nil {
|
||||
root = cache.flatten(*r)
|
||||
}
|
||||
bucketResults <- dataUsageEntryInfo{
|
||||
Name: cache.Info.Name,
|
||||
Parent: dataUsageRoot,
|
||||
Entry: root,
|
||||
}
|
||||
// Save cache
|
||||
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
close(bucketResults)
|
||||
saverWg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsReady - shouldn't be called will panic.
|
||||
func (er erasureObjects) IsReady(ctx context.Context) bool {
|
||||
logger.CriticalIf(ctx, NotImplemented{})
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user