fix: support healing older content (#10076)

This PR adds support for healing older
content i.e from 2yrs, 1yr. Also handles
other situations where our config was
not encrypted yet.

This PR also ensures that our Listing
is consistent and quorum friendly,
such that we don't list partial objects
This commit is contained in:
Harshavardhana
2020-07-17 17:41:29 -07:00
committed by GitHub
parent 3fe27c8411
commit 17747db93f
10 changed files with 66 additions and 48 deletions

View File

@@ -681,10 +681,9 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
endWalkCh := make(chan struct{})
defer close(endWalkCh)
const ndisks = 3
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, ndisks))
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.drivesPerSet))
}
var objInfos []ObjectInfo
@@ -709,8 +708,8 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
break
}
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
if quorumCount < z.zones[0].drivesPerSet/2 {
// Skip entries which are not found on upto ndisks/2.
continue
}
@@ -797,18 +796,17 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
const ndisks = 3
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, ndisks)
entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.drivesPerSet)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks)
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet)
if len(entries.Files) == 0 {
return loi, nil
}
@@ -888,18 +886,17 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
const ndisks = 3
for _, zone := range z.zones {
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, ndisks)
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks)
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet)
if len(entries.Files) == 0 {
return loi, nil
}
@@ -1118,8 +1115,8 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i
break
}
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
if quorumCount < ndisks/2 {
// Skip entries which are not found on upto ndisks/2.
continue
}
@@ -1149,8 +1146,8 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int)
break
}
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
if quorumCount < ndisks/2 {
// Skip entries which are not found on upto ndisks/2.
continue
}
@@ -1292,18 +1289,17 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
var zonesEntryChs [][]FileInfoVersionsCh
var zonesEndWalkCh []chan struct{}
const ndisks = 3
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, ndisks)
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
}
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, ndisks)
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet)
if len(entries.FilesVersions) == 0 {
return loi, nil
}