fix: make sure to honor versioning from browser UI deletes (#10016)

This commit is contained in:
Harshavardhana 2020-07-10 22:21:04 -07:00 committed by GitHub
parent aded0bc81a
commit 2d17c16d93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 170 additions and 39 deletions

View File

@ -196,8 +196,10 @@ func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) {
// Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo)
versioned := globalBucketVersioningSys.Enabled(bucket)
// Walk through all objects
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil {
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkVersions: versioned}); err != nil {
logger.LogIf(ctx, err)
continue
}
@ -226,8 +228,6 @@ func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) {
scorer.addFileWithObjInfo(obj, 1)
}
versioned := globalBucketVersioningSys.Enabled(bucket)
var objects []ObjectToDelete
numKeys := len(scorer.fileObjInfos())
for i, obj := range scorer.fileObjInfos() {

View File

@ -53,6 +53,6 @@ func (er erasureObjects) HealObjects(ctx context.Context, bucket, prefix string,
}
// Walk - This is not implemented/needed anymore, look for erasure-zones.Walk()
func (er erasureObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
func (er erasureObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, _ ObjectOptions) error {
return NotImplemented{}
}

View File

@ -855,8 +855,72 @@ func (f *FileInfoCh) Push(fi FileInfo) {
f.Valid = true
}
// Calculate least entry across multiple FileInfo channels,
// returns the least common entry and the total number of times
// Calculate lexically least entry across multiple FileInfo channels,
// returns the lexically common entry and the total number of times
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
for i := range entryChs {
entries[i], entriesValid[i] = entryChs[i].Pop()
}
var isTruncated = false
for _, valid := range entriesValid {
if !valid {
continue
}
isTruncated = true
break
}
var lentry FileInfo
var found bool
for i, valid := range entriesValid {
if !valid {
continue
}
if !found {
lentry = entries[i]
found = true
continue
}
if entries[i].Name < lentry.Name {
lentry = entries[i]
}
}
// We haven't been able to find any lexically least entry,
// this would mean that we don't have valid entry.
if !found {
return lentry, 0, isTruncated
}
lexicallySortedEntryCount := 0
for i, valid := range entriesValid {
if !valid {
continue
}
// Entries are duplicated across disks,
// we should simply skip such entries.
if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) {
lexicallySortedEntryCount++
continue
}
// Push all entries which are lexically higher
// and will be returned later in Pop()
entryChs[i].Push(entries[i])
}
return lentry, lexicallySortedEntryCount, isTruncated
}
// Calculate lexically least entry across multiple FileInfo channels,
// returns the lexically common entry and the total number of times
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
@ -892,7 +956,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
}
}
// We haven't been able to find any least entry,
// We haven't been able to find any lexically least entry,
// this would mean that we don't have valid entry.
if !found {
return lentry, 0, isTruncated
@ -1508,13 +1572,14 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error)
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
if err := checkListObjsArgs(ctx, bucket, prefix, "", s); err != nil {
// Upon error close the channel.
close(results)
return err
}
if opts.WalkVersions {
entryChs := s.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done())
entriesValid := make([]bool, len(entryChs))
@ -1542,6 +1607,31 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c
return nil
}
entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done())
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfo, len(entryChs))
go func() {
defer close(results)
for {
entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid)
if !ok {
return
}
if quorumCount >= s.drivesPerSet/2 {
// Read quorum exists proceed
results <- entry.ToObjectInfo(bucket, entry.Name)
}
// skip entries which do not have quorum
}
}()
return nil
}
// HealObjects - Heal all objects recursively at a specified prefix, any
// dangling objects deleted as well automatically.
func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {

View File

@ -1799,13 +1799,14 @@ func (z *erasureZones) HealBucket(ctx context.Context, bucket string, dryRun, re
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil {
// Upon error close the channel.
close(results)
return err
}
if opts.WalkVersions {
var zonesEntryChs [][]FileInfoVersionsCh
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done()))
@ -1847,6 +1848,44 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
return nil
}
var zonesEntryChs [][]FileInfoCh
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs, zone.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done()))
}
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
}
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
for _, entryChs := range zonesEntryChs {
zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs)))
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
go func() {
defer close(results)
for {
entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
// We have reached EOF across all entryChs, break the loop.
return
}
if quorumCount >= zoneDrivesPerSet[zoneIndex]/2 {
// Read quorum exists proceed
results <- entry.ToObjectInfo(bucket, entry.Name)
}
// skip entries which do not have quorum
}
}()
return nil
}
// HealObjectFn closure function heals the object.
type HealObjectFn func(string, string, string) error

View File

@ -1484,7 +1484,7 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), results, fs.getObjectInfo, fs.getObjectInfo)
}

View File

@ -57,7 +57,7 @@ func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, objects ..
}
// Walk - implements common gateway level Walker, to walk on all objects recursively at a prefix
func (l *GatewayLocker) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
func (l *GatewayLocker) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
walk := func(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
var marker string
@ -85,7 +85,7 @@ func (l *GatewayLocker) Walk(ctx context.Context, bucket, prefix string, results
return nil
}
if err := l.ObjectLayer.Walk(ctx, bucket, prefix, results); err != nil {
if err := l.ObjectLayer.Walk(ctx, bucket, prefix, results, opts); err != nil {
if _, ok := err.(NotImplemented); ok {
return walk(ctx, bucket, prefix, results)
}

View File

@ -186,7 +186,7 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c
}
// Walk - Not implemented stub
func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
return NotImplemented{}
}

View File

@ -38,6 +38,7 @@ type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts Objec
type ObjectOptions struct {
ServerSideEncryption encrypt.ServerSide
Versioned bool // indicates if the bucket is versioned
WalkVersions bool // indicates if the we are interested in walking versions
VersionID string // Specifies the versionID which needs to be overwritten or read
MTime time.Time // Is only set in POST/PUT operations
UserDefined map[string]string // only set in case of POST/PUT operations
@ -80,7 +81,7 @@ type ObjectLayer interface {
ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error)
// Walk lists all objects including versions, delete markers.
Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error
Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error
// Object operations.

View File

@ -655,8 +655,9 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
return nil
}
versioned := globalBucketVersioningSys.Enabled(args.BucketName)
opts := ObjectOptions{
Versioned: globalBucketVersioningSys.Enabled(args.BucketName),
}
var err error
next:
for _, objectName := range args.Objects {
@ -690,7 +691,7 @@ next:
}
}
_, err = deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, r, ObjectOptions{})
_, err = deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, r, opts)
logger.LogIf(ctx, err)
}
@ -723,7 +724,7 @@ next:
objInfoCh := make(chan ObjectInfo)
// Walk through all objects
if err = objectAPI.Walk(ctx, args.BucketName, objectName, objInfoCh); err != nil {
if err = objectAPI.Walk(ctx, args.BucketName, objectName, objInfoCh, ObjectOptions{}); err != nil {
break next
}
@ -736,7 +737,6 @@ next:
}
objects = append(objects, ObjectToDelete{
ObjectName: obj.Name,
VersionID: obj.VersionID,
})
}
@ -746,7 +746,7 @@ next:
}
// Deletes a list of objects.
_, errs := deleteObjects(ctx, args.BucketName, objects, ObjectOptions{Versioned: versioned})
_, errs := deleteObjects(ctx, args.BucketName, objects, opts)
for _, err := range errs {
if err != nil {
logger.LogIf(ctx, err)
@ -1030,6 +1030,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
writeWebErrorResponse(w, err)
return
}
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
@ -1052,15 +1053,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
return
}
}
pReader = NewPutObjReader(hashReader, nil, nil)
// get gateway encryption options
var opts ObjectOptions
opts, err = putOpts(ctx, r, bucket, object, metadata)
opts, err := putOpts(ctx, r, bucket, object, metadata)
if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
if objectAPI.IsEncryptionSupported() {
if crypto.IsRequested(r.Header) && !HasSuffix(object, SlashSeparator) { // handle SSE requests
rawReader := hashReader
@ -1545,7 +1546,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
objInfoCh := make(chan ObjectInfo)
// Walk through all objects
if err := objectAPI.Walk(ctx, args.BucketName, pathJoin(args.Prefix, object), objInfoCh); err != nil {
if err := objectAPI.Walk(ctx, args.BucketName, pathJoin(args.Prefix, object), objInfoCh, ObjectOptions{}); err != nil {
logger.LogIf(ctx, err)
continue
}