fix: pass around the correct drives per set (#10097)

this is a precursor change before adding parity
based SLA across zones instead of same stripe size
This commit is contained in:
Harshavardhana 2020-07-20 16:38:40 -07:00 committed by GitHub
parent f9648d3976
commit 11d21d5d1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 14 deletions

View File

@ -677,6 +677,7 @@ func (z *erasureZones) ListObjectsV2(ctx context.Context, bucket, prefix, contin
func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
var zonesEntryChs [][]FileInfoCh
var zonesDrivesPerSet []int
endWalkCh := make(chan struct{})
defer close(endWalkCh)
@ -684,6 +685,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.drivesPerSet))
zonesDrivesPerSet = append(zonesDrivesPerSet, zone.drivesPerSet)
}
var objInfos []ObjectInfo
@ -702,13 +704,13 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
break
}
result, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
result, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
eof = true
break
}
if quorumCount < z.zones[0].drivesPerSet/2 {
if quorumCount < zonesDrivesPerSet[zoneIndex]/2 {
// Skip entries which are not found on upto ndisks/2.
continue
}
@ -795,6 +797,7 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
var drivesPerSets []int
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix})
@ -804,9 +807,10 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
drivesPerSets = append(drivesPerSets, zone.drivesPerSet)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet)
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets)
if len(entries.Files) == 0 {
return loi, nil
}
@ -885,6 +889,7 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
var drivesPerSets []int
for _, zone := range z.zones {
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
@ -894,9 +899,10 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
drivesPerSets = append(drivesPerSets, zone.drivesPerSet)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet)
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets)
if len(entries.Files) == 0 {
return loi, nil
}
@ -1100,7 +1106,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
}
// mergeZonesEntriesVersionsCh - merges FileInfoVersions channel to entries upto maxKeys.
func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, ndisks int) (entries FilesInfoVersions) {
func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, drivesPerSets []int) (entries FilesInfoVersions) {
var i = 0
var zonesEntriesInfos [][]FileInfoVersions
var zonesEntriesValid [][]bool
@ -1109,13 +1115,13 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
for {
fi, quorumCount, _, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
fi, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
// We have reached EOF across all entryChs, break the loop.
break
}
if quorumCount < ndisks/2 {
if quorumCount < drivesPerSets[zoneIndex]/2 {
// Skip entries which are not found on upto ndisks/2.
continue
}
@ -1131,7 +1137,7 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i
}
// mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys.
func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int) (entries FilesInfo) {
func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSets []int) (entries FilesInfo) {
var i = 0
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
@ -1140,13 +1146,13 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int)
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
for {
fi, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
fi, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
// We have reached EOF across all entryChs, break the loop.
break
}
if quorumCount < ndisks/2 {
if quorumCount < drivesPerSets[zoneIndex]/2 {
// Skip entries which are not found on upto ndisks/2.
continue
}
@ -1288,7 +1294,7 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
var zonesEntryChs [][]FileInfoVersionsCh
var zonesEndWalkCh []chan struct{}
var drivesPerSets []int
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
@ -1297,9 +1303,10 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
drivesPerSets = append(drivesPerSets, zone.drivesPerSet)
}
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet)
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, drivesPerSets)
if len(entries.FilesVersions) == 0 {
return loi, nil
}

View File

@ -73,7 +73,6 @@ DIR:
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
EXAMPLES:
1. Start minio server on "/home/shared" directory.
{{.Prompt}} {{.HelpName}} /home/shared
@ -91,7 +90,6 @@ EXAMPLES:
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}miniostorage
{{.Prompt}} {{.HelpName}} http://node{1...16}.example.com/mnt/export{1...32} \
http://node{17...64}.example.com/mnt/export{1...64}
`,
}