fix: list object versions in distributed setup (#9958)

Remove calls to `WalkVersions` was calling the wrong endpoint, 
so unless quorum could be reached with local disks no results 
would ever be returned.
This commit is contained in:
Klaus Post 2020-07-02 10:29:50 -07:00 committed by GitHub
parent 04de19c870
commit abd999f64a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 18 deletions

View File

@ -942,6 +942,7 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
}
entryCh, err := disk.WalkVersions(bucket, prefix, marker, recursive, endWalkCh)
if err != nil {
logger.LogIf(ctx, err)
// Disk walk returned error, ignore it.
continue
}

View File

@ -454,7 +454,7 @@ func (client *storageRESTClient) WalkVersions(volume, dirPath, marker string, re
values.Set(storageRESTDirPath, dirPath)
values.Set(storageRESTMarkerPath, marker)
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
respBody, err := client.call(storageRESTMethodWalk, values, nil, -1)
respBody, err := client.call(storageRESTMethodWalkVersions, values, nil, -1)
if err != nil {
return nil, err
}
@ -469,6 +469,9 @@ func (client *storageRESTClient) WalkVersions(volume, dirPath, marker string, re
var fi FileInfoVersions
if gerr := decoder.Decode(&fi); gerr != nil {
// Upon error return
if gerr != io.EOF {
logger.LogIf(context.Background(), gerr)
}
return
}
select {
@ -476,7 +479,6 @@ func (client *storageRESTClient) WalkVersions(volume, dirPath, marker string, re
case <-endWalkCh:
return
}
}
}()

View File

@ -526,7 +526,6 @@ func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Req
for fi := range fch {
encoder.Encode(&fi)
}
w.(http.Flusher).Flush()
}
// WalkVersionsHandler - remote caller to start walking at a requested directory path.
@ -553,9 +552,8 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R
return
}
for fi := range fch {
encoder.Encode(&fi)
logger.LogIf(r.Context(), encoder.Encode(&fi))
}
w.(http.Flusher).Flush()
}
// WalkHandler - remote caller to start walking at a requested directory path.
@ -582,9 +580,8 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request)
return
}
for fi := range fch {
encoder.Encode(&fi)
logger.LogIf(r.Context(), encoder.Encode(&fi))
}
w.(http.Flusher).Flush()
}
// ListDirHandler - list a directory.
@ -889,7 +886,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkSplunk).HandlerFunc(httpTraceHdrs(server.WalkSplunkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkVersions).HandlerFunc(httpTraceHdrs(server.WalkVersionsHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...)
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(httpTraceHdrs(server.DeleteVersionsHandler)).
Queries(restQueries(storageRESTVolume, storageRESTTotalVersions)...)

View File

@ -901,11 +901,7 @@ func (s *xlStorage) WalkVersions(volume, dirPath, marker string, recursive bool,
}
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, endWalkCh)
for {
walkResult, ok := <-walkResultCh
if !ok {
return
}
for walkResult := range walkResultCh {
var fiv FileInfoVersions
if HasSuffix(walkResult.entry, SlashSeparator) {
fiv = FileInfoVersions{
@ -981,11 +977,7 @@ func (s *xlStorage) Walk(volume, dirPath, marker string, recursive bool, endWalk
}
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, endWalkCh)
for {
walkResult, ok := <-walkResultCh
if !ok {
return
}
for walkResult := range walkResultCh {
var fi FileInfo
if HasSuffix(walkResult.entry, SlashSeparator) {
fi = FileInfo{