heal: Add ListBucketsHeal object API (#3563)

ListBucketsHeal will list which buckets that need to be healed:
  * ListBucketsHeal() (buckets []BucketInfo, err error)
This commit is contained in:
Anis Elleuch 2017-01-19 18:34:18 +01:00 committed by Harshavardhana
parent dfc2ef3004
commit 0715032598
16 changed files with 445 additions and 43 deletions

View File

@ -345,6 +345,34 @@ func (adminAPI adminAPIHandlers) ListObjectsHealHandler(w http.ResponseWriter, r
writeSuccessResponseXML(w, encodeResponse(listResponse)) writeSuccessResponseXML(w, encodeResponse(listResponse))
} }
// ListBucketsHealHandler - GET /?heal
func (adminAPI adminAPIHandlers) ListBucketsHealHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// Get the list buckets to be healed.
bucketsInfo, err := objLayer.ListBucketsHeal()
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
listResponse := generateListBucketsResponse(bucketsInfo)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(listResponse))
}
// HealBucketHandler - POST /?heal&bucket=mybucket // HealBucketHandler - POST /?heal&bucket=mybucket
// - bucket is mandatory query parameter // - bucket is mandatory query parameter
// Heal a given bucket, if present. // Heal a given bucket, if present.

View File

@ -687,7 +687,7 @@ func TestListObjectsHealHandler(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Test %d - Failed to construct list objects needing heal request - %v", i+1, err) t.Fatalf("Test %d - Failed to construct list objects needing heal request - %v", i+1, err)
} }
req.Header.Set(minioAdminOpHeader, "list") req.Header.Set(minioAdminOpHeader, "list-objects")
cred := serverConfig.GetCredential() cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey) err = signRequestV4(req, cred.AccessKey, cred.SecretKey)

View File

@ -49,7 +49,10 @@ func registerAdminRouter(mux *router.Router) {
/// Heal operations /// Heal operations
// List Objects needing heal. // List Objects needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list").HandlerFunc(adminAPI.ListObjectsHealHandler) adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-objects").HandlerFunc(adminAPI.ListObjectsHealHandler)
// List Buckets needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-buckets").HandlerFunc(adminAPI.ListBucketsHealHandler)
// Heal Buckets. // Heal Buckets.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "bucket").HandlerFunc(adminAPI.HealBucketHandler) adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "bucket").HandlerFunc(adminAPI.HealBucketHandler)
// Heal Objects. // Heal Objects.

View File

@ -183,6 +183,7 @@ type CommonPrefix struct {
type Bucket struct { type Bucket struct {
Name string Name string
CreationDate string // time string of format "2006-01-02T15:04:05.000Z" CreationDate string // time string of format "2006-01-02T15:04:05.000Z"
HealBucketInfo *HealBucketInfo `xml:"HealBucketInfo,omitempty"`
} }
// Object container for object metadata // Object container for object metadata
@ -197,7 +198,7 @@ type Object struct {
// The class of storage used to store the object. // The class of storage used to store the object.
StorageClass string StorageClass string
HealInfo *HealInfo `xml:"HealInfo,omitempty"` HealObjectInfo *HealObjectInfo `xml:"HealObjectInfo,omitempty"`
} }
// CopyObjectResponse container returns ETag and LastModified of the successfully copied object // CopyObjectResponse container returns ETag and LastModified of the successfully copied object
@ -285,6 +286,7 @@ func generateListBucketsResponse(buckets []BucketInfo) ListBucketsResponse {
var listbucket = Bucket{} var listbucket = Bucket{}
listbucket.Name = bucket.Name listbucket.Name = bucket.Name
listbucket.CreationDate = bucket.Created.Format(timeFormatAMZLong) listbucket.CreationDate = bucket.Created.Format(timeFormatAMZLong)
listbucket.HealBucketInfo = bucket.HealBucketInfo
listbuckets = append(listbuckets, listbucket) listbuckets = append(listbuckets, listbucket)
} }
@ -317,8 +319,8 @@ func generateListObjectsV1Response(bucket, prefix, marker, delimiter string, max
content.Size = object.Size content.Size = object.Size
content.StorageClass = globalMinioDefaultStorageClass content.StorageClass = globalMinioDefaultStorageClass
content.Owner = owner content.Owner = owner
// object.HealInfo is non-empty only when resp is constructed in ListObjectsHeal. // object.HealObjectInfo is non-empty only when resp is constructed in ListObjectsHeal.
content.HealInfo = object.HealInfo content.HealObjectInfo = object.HealObjectInfo
contents = append(contents, content) contents = append(contents, content)
} }
// TODO - support EncodingType in xml decoding // TODO - support EncodingType in xml decoding

View File

@ -897,3 +897,8 @@ func (fs fsObjects) HealBucket(bucket string) error {
func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return ListObjectsInfo{}, traceError(NotImplemented{}) return ListObjectsInfo{}, traceError(NotImplemented{})
} }
// ListBucketsHeal - list all buckets to be healed. Valid only for XL
func (fs fsObjects) ListBucketsHeal() ([]BucketInfo, error) {
return []BucketInfo{}, traceError(NotImplemented{})
}

View File

@ -50,6 +50,20 @@ type StorageInfo struct {
} }
} }
type healStatus int
const (
healthy healStatus = iota // Object is healthy
canHeal // Object can be healed
corrupted // Object can't be healed
quorumUnavailable // Object can't be healed until read quorum is available
)
// HealBucketInfo - represents healing related information of a bucket.
type HealBucketInfo struct {
Status healStatus
}
// BucketInfo - represents bucket metadata. // BucketInfo - represents bucket metadata.
type BucketInfo struct { type BucketInfo struct {
// Name of the bucket. // Name of the bucket.
@ -57,18 +71,13 @@ type BucketInfo struct {
// Date and time when the bucket was created. // Date and time when the bucket was created.
Created time.Time Created time.Time
// Healing information
HealBucketInfo *HealBucketInfo `xml:"HealBucketInfo,omitempty"`
} }
type healStatus int // HealObjectInfo - represents healing related information of an object.
type HealObjectInfo struct {
const (
canHeal healStatus = iota // Object can be healed
corrupted // Object can't be healed
quorumUnavailable // Object can't be healed until read quorum is available
)
// HealInfo - represents healing related information of an object.
type HealInfo struct {
Status healStatus Status healStatus
MissingDataCount int MissingDataCount int
MissingPartityCount int MissingPartityCount int
@ -104,7 +113,7 @@ type ObjectInfo struct {
// User-Defined metadata // User-Defined metadata
UserDefined map[string]string UserDefined map[string]string
HealInfo *HealInfo `xml:"HealInfo,omitempty"` HealObjectInfo *HealObjectInfo `xml:"HealObjectInfo,omitempty"`
} }
// ListPartsInfo - represents list of all parts. // ListPartsInfo - represents list of all parts.

View File

@ -48,6 +48,7 @@ type ObjectLayer interface {
// Healing operations. // Healing operations.
HealBucket(bucket string) error HealBucket(bucket string) error
ListBucketsHeal() (buckets []BucketInfo, err error)
HealObject(bucket, object string) error HealObject(bucket, object string) error
ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
} }

View File

@ -141,12 +141,12 @@ func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool {
// xlHealStat - returns a structure which describes how many data, // xlHealStat - returns a structure which describes how many data,
// parity erasure blocks are missing and if it is possible to heal // parity erasure blocks are missing and if it is possible to heal
// with the blocks present. // with the blocks present.
func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealInfo { func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealObjectInfo {
// Less than quorum erasure coded blocks of the object have the same create time. // Less than quorum erasure coded blocks of the object have the same create time.
// This object can't be healed with the information we have. // This object can't be healed with the information we have.
modTime, count := commonTime(listObjectModtimes(partsMetadata, errs)) modTime, count := commonTime(listObjectModtimes(partsMetadata, errs))
if count < xl.readQuorum { if count < xl.readQuorum {
return HealInfo{ return HealObjectInfo{
Status: quorumUnavailable, Status: quorumUnavailable,
MissingDataCount: 0, MissingDataCount: 0,
MissingPartityCount: 0, MissingPartityCount: 0,
@ -156,7 +156,7 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealInfo {
// If there isn't a valid xlMeta then we can't heal the object. // If there isn't a valid xlMeta then we can't heal the object.
xlMeta, err := pickValidXLMeta(partsMetadata, modTime) xlMeta, err := pickValidXLMeta(partsMetadata, modTime)
if err != nil { if err != nil {
return HealInfo{ return HealObjectInfo{
Status: corrupted, Status: corrupted,
MissingDataCount: 0, MissingDataCount: 0,
MissingPartityCount: 0, MissingPartityCount: 0,
@ -183,7 +183,7 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealInfo {
// This object can be healed. We have enough object metadata // This object can be healed. We have enough object metadata
// to reconstruct missing erasure coded blocks. // to reconstruct missing erasure coded blocks.
return HealInfo{ return HealObjectInfo{
Status: canHeal, Status: canHeal,
MissingDataCount: missingDataCount, MissingDataCount: missingDataCount,
MissingPartityCount: missingParityCount, MissingPartityCount: missingParityCount,

View File

@ -19,6 +19,7 @@ package cmd
import ( import (
"fmt" "fmt"
"path" "path"
"sort"
"sync" "sync"
) )
@ -153,9 +154,11 @@ func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int
return healBucketMetaFn(lConfigPath) return healBucketMetaFn(lConfigPath)
} }
// listBucketNames list all bucket names from all disks to heal. // listAllBuckets lists all buckets from all disks. It also
func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{}, err error) { // returns the occurrence of each buckets in all disks
bucketNames = make(map[string]struct{}) func listAllBuckets(storageDisks []StorageAPI) (buckets map[string]VolInfo, bucketsOcc map[string]int, err error) {
buckets = make(map[string]VolInfo)
bucketsOcc = make(map[string]int)
for _, disk := range storageDisks { for _, disk := range storageDisks {
if disk == nil { if disk == nil {
continue continue
@ -173,7 +176,10 @@ func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{}
if isMinioMetaBucketName(volInfo.Name) { if isMinioMetaBucketName(volInfo.Name) {
continue continue
} }
bucketNames[volInfo.Name] = struct{}{} // Increase counter per bucket name
bucketsOcc[volInfo.Name]++
// Save volume info under bucket name
buckets[volInfo.Name] = volInfo
} }
continue continue
} }
@ -183,7 +189,101 @@ func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{}
} }
break break
} }
return bucketNames, err return buckets, bucketsOcc, err
}
// reduceHealStatus - fetches the worst heal status in a provided slice
func reduceHealStatus(status []healStatus) healStatus {
worstStatus := healthy
for _, st := range status {
if st > worstStatus {
worstStatus = st
}
}
return worstStatus
}
// bucketHealStatus - returns the heal status of the provided bucket. Internally,
// this function lists all object heal status of objects inside meta bucket config
// directory and returns the worst heal status that can be found
func (xl xlObjects) bucketHealStatus(bucketName string) (healStatus, error) {
// A list of all the bucket config files
configFiles := []string{bucketPolicyConfig, bucketNotificationConfig, bucketListenerConfig}
// The status of buckets config files
configsHealStatus := make([]healStatus, len(configFiles))
// The list of errors found during checking heal status of each config file
configsErrs := make([]error, len(configFiles))
// The path of meta bucket that contains all config files
configBucket := path.Join(minioMetaBucket, bucketConfigPrefix, bucketName)
// Check of config files heal status in go-routines
var wg sync.WaitGroup
// Loop over config files
for idx, configFile := range configFiles {
wg.Add(1)
// Compute heal status of current config file
go func(bucket, object string, index int) {
defer wg.Done()
// Check
listObjectsHeal, err := xl.listObjectsHeal(bucket, object, "", "", 1)
// If any error, save and immediately quit
if err != nil {
configsErrs[index] = err
return
}
// Check if current bucket contains any not healthy config file and save heal status
if len(listObjectsHeal.Objects) > 0 {
configsHealStatus[index] = listObjectsHeal.Objects[0].HealObjectInfo.Status
}
}(configBucket, configFile, idx)
}
wg.Wait()
// Return any found error
for _, err := range configsErrs {
if err != nil {
return healthy, err
}
}
// Reduce and return heal status
return reduceHealStatus(configsHealStatus), nil
}
// ListBucketsHeal - Find all buckets that need to be healed
func (xl xlObjects) ListBucketsHeal() ([]BucketInfo, error) {
listBuckets := []BucketInfo{}
// List all buckets that can be found in all disks
buckets, occ, err := listAllBuckets(xl.storageDisks)
if err != nil {
return listBuckets, err
}
// Iterate over all buckets
for _, currBucket := range buckets {
// Check the status of bucket metadata
bucketHealStatus, err := xl.bucketHealStatus(currBucket.Name)
if err != nil {
return []BucketInfo{}, err
}
// If all metadata are sane, check if the bucket directory is present in all disks
if bucketHealStatus == healthy && occ[currBucket.Name] != len(xl.storageDisks) {
// Current bucket is missing in some of the storage disks
bucketHealStatus = canHeal
}
// Add current bucket to the returned result if not healthy
if bucketHealStatus != healthy {
listBuckets = append(listBuckets,
BucketInfo{
Name: currBucket.Name,
Created: currBucket.Created,
HealBucketInfo: &HealBucketInfo{Status: bucketHealStatus},
})
}
}
// Sort found buckets
sort.Sort(byBucketName(listBuckets))
return listBuckets, nil
} }
// This function is meant for all the healing that needs to be done // This function is meant for all the healing that needs to be done
@ -196,7 +296,7 @@ func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{}
// - add support for healing dangling `xl.json`. // - add support for healing dangling `xl.json`.
func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error { func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error {
// List all bucket names from all disks. // List all bucket names from all disks.
bucketNames, err := listBucketNames(storageDisks) bucketNames, _, err := listAllBuckets(storageDisks)
if err != nil { if err != nil {
return err return err
} }

View File

@ -423,3 +423,66 @@ func TestQuickHeal(t *testing.T) {
t.Fatal("Got an unexpected error: ", err) t.Fatal("Got an unexpected error: ", err)
} }
} }
// TestListBucketsHeal lists buckets heal result
func TestListBucketsHeal(t *testing.T) {
root, err := newTestConfig("us-east-1")
if err != nil {
t.Fatal(err)
}
defer removeAll(root)
nDisks := 16
fsDirs, err := getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
defer removeRoots(fsDirs)
endpoints, err := parseStorageEndpoints(fsDirs)
if err != nil {
t.Fatal(err)
}
obj, _, err := initObjectLayer(endpoints)
if err != nil {
t.Fatal(err)
}
// Create a bucket that won't get corrupted
saneBucket := "sanebucket"
if err = obj.MakeBucket(saneBucket); err != nil {
t.Fatal(err)
}
// Create a bucket that will be removed in some disks
corruptedBucketName := getRandomBucketName()
if err = obj.MakeBucket(corruptedBucketName); err != nil {
t.Fatal(err)
}
xl := obj.(*xlObjects)
// Remove bucket in disk 0, 1 and 2
for i := 0; i <= 2; i++ {
if err = xl.storageDisks[i].DeleteVol(corruptedBucketName); err != nil {
t.Fatal(err)
}
}
// List the missing buckets.
buckets, err := xl.ListBucketsHeal()
if err != nil {
t.Fatal(err)
}
// Check the number of buckets in list buckets heal result
if len(buckets) != 1 {
t.Fatalf("Length of missing buckets is incorrect, expected: 1, found: %d", len(buckets))
}
// Check the name of bucket in list buckets heal result
if buckets[0].Name != corruptedBucketName {
t.Fatalf("Name of missing bucket is incorrect, expected: %s, found: %s", corruptedBucketName, buckets[0].Name)
}
}

View File

@ -163,7 +163,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
ModTime: objInfo.ModTime, ModTime: objInfo.ModTime,
Size: objInfo.Size, Size: objInfo.Size,
IsDir: false, IsDir: false,
HealInfo: &healStat, HealObjectInfo: &healStat,
}) })
} }
objectLock.RUnlock() objectLock.RUnlock()

View File

@ -112,3 +112,9 @@
- ErrInvalidBucketName - ErrInvalidBucketName
- ErrInvalidObjectName - ErrInvalidObjectName
- ErrInvalidDuration - ErrInvalidDuration
### Healing
* ListBucketsHeal
- GET /?heal
- x-minio-operation: list-buckets

View File

@ -171,8 +171,8 @@ __Example__
log.Fatalln(err) log.Fatalln(err)
return return
} }
if object.HealInfo != nil { if object.HealObjectInfo != nil {
switch healInfo := *object.HealInfo; healInfo.Status { switch healInfo := *object.HealObjectInfo; healInfo.Status {
case madmin.CanHeal: case madmin.CanHeal:
fmt.Println(object.Key, " can be healed.") fmt.Println(object.Key, " can be healed.")
case madmin.QuorumUnavailable: case madmin.QuorumUnavailable:
@ -185,6 +185,34 @@ __Example__
} }
``` ```
<a name="ListBucketsList"></a>
### ListBucketsList() error
If successful returns information on the list of buckets that need healing.
__Example__
``` go
// List buckets that need healing
healBucketsList, err := madmClnt.ListBucketsHeal()
if err != nil {
fmt.Println(err)
return
}
for bucket := range healBucketsList {
if bucket.HealBucketInfo != nil {
switch healInfo := *object.HealBucketInfo; healInfo.Status {
case madmin.CanHeal:
fmt.Println(bucket.Key, " can be healed.")
case madmin.QuorumUnavailable:
fmt.Println(bucket.Key, " can't be healed until quorum is available.")
case madmin.Corrupted:
fmt.Println(bucket.Key, " can't be healed, not enough information.")
}
}
fmt.Println("bucket: ", bucket)
}
```
<a name="HealBucket"></a> <a name="HealBucket"></a>
### HealBucket(bucket string, isDryRun bool) error ### HealBucket(bucket string, isDryRun bool) error
If bucket is successfully healed returns nil, otherwise returns error indicating the reason for failure. If isDryRun is true, then the bucket is not healed, but heal bucket request is validated by the server. e.g, if the bucket exists, if bucket name is valid etc. If bucket is successfully healed returns nil, otherwise returns error indicating the reason for failure. If isDryRun is true, then the bucket is not healed, but heal bucket request is validated by the server. e.g, if the bucket exists, if bucket name is valid etc.

View File

@ -0,0 +1,60 @@
// +build ignore
package main
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import (
"fmt"
"log"
"github.com/minio/minio/pkg/madmin"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
// New returns an Minio Admin client object.
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
if err != nil {
log.Fatalln(err)
}
// List buckets that need healing
healBucketsList, err := madmClnt.ListBucketsHeal()
if err != nil {
log.Fatalln(err)
}
for _, bucket := range healBucketsList {
if bucket.HealBucketInfo != nil {
switch healInfo := *bucket.HealBucketInfo; healInfo.Status {
case madmin.CanHeal:
fmt.Println(bucket.Name, " can be healed.")
case madmin.QuorumUnavailable:
fmt.Println(bucket.Name, " can't be healed until quorum is available.")
case madmin.Corrupted:
fmt.Println(bucket.Name, " can't be healed, not enough information.")
}
}
fmt.Println("bucket: ", bucket)
}
}

View File

@ -63,20 +63,65 @@ type commonPrefix struct {
Prefix string Prefix string
} }
// Owner - bucket owner/principal
type Owner struct {
ID string
DisplayName string
}
// Bucket container for bucket metadata
type Bucket struct {
Name string
CreationDate string // time string of format "2006-01-02T15:04:05.000Z"
HealBucketInfo *HealBucketInfo `xml:"HealBucketInfo,omitempty"`
}
// ListBucketsHealResponse - format for list buckets response
type ListBucketsHealResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult" json:"-"`
Owner Owner
// Container for one or more buckets.
Buckets struct {
Buckets []Bucket `xml:"Bucket"`
} // Buckets are nested
}
// HealStatus - represents different states of healing an object could be in. // HealStatus - represents different states of healing an object could be in.
type healStatus int type healStatus int
const ( const (
// Healthy - Object that is already healthy
Healthy healStatus = iota
// CanHeal - Object can be healed // CanHeal - Object can be healed
CanHeal healStatus = iota CanHeal
// Corrupted - Object can't be healed // Corrupted - Object can't be healed
Corrupted Corrupted
// QuorumUnavailable - Object can't be healed until read quorum is available // QuorumUnavailable - Object can't be healed until read quorum is available
QuorumUnavailable QuorumUnavailable
) )
// HealInfo - represents healing related information of an object. // HealBucketInfo - represents healing related information of a bucket.
type HealInfo struct { type HealBucketInfo struct {
Status healStatus
}
// BucketInfo - represents bucket metadata.
type BucketInfo struct {
// Name of the bucket.
Name string
// Date and time when the bucket was created.
Created time.Time
// Healing information
HealBucketInfo *HealBucketInfo `xml:"HealBucketInfo,omitempty"`
}
// HealObjectInfo - represents healing related information of an object.
type HealObjectInfo struct {
Status healStatus Status healStatus
MissingDataCount int MissingDataCount int
MissingPartityCount int MissingPartityCount int
@ -109,7 +154,7 @@ type ObjectInfo struct {
// Error // Error
Err error `json:"-"` Err error `json:"-"`
HealInfo *HealInfo `json:"healInfo,omitempty"` HealObjectInfo *HealObjectInfo `json:"healObjectInfo,omitempty"`
} }
type healQueryKey string type healQueryKey string
@ -143,7 +188,7 @@ func (adm *AdminClient) listObjectsHeal(bucket, prefix, delimiter, marker string
queryVal := mkHealQueryVal(bucket, prefix, marker, delimiter, maxKeyStr) queryVal := mkHealQueryVal(bucket, prefix, marker, delimiter, maxKeyStr)
hdrs := make(http.Header) hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "list") hdrs.Set(minioAdminOpHeader, "list-objects")
reqData := requestData{ reqData := requestData{
queryValues: queryVal, queryValues: queryVal,
@ -240,6 +285,58 @@ func (adm *AdminClient) ListObjectsHeal(bucket, prefix string, recursive bool, d
return objectStatCh, nil return objectStatCh, nil
} }
const timeFormatAMZLong = "2006-01-02T15:04:05.000Z" // Reply date format with nanosecond precision.
// ListBucketsHeal - issues heal bucket list API request
func (adm *AdminClient) ListBucketsHeal() ([]BucketInfo, error) {
queryVal := url.Values{}
queryVal.Set("heal", "")
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "list-buckets")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute GET on /?heal to list objects needing heal.
resp, err := adm.executeMethod("GET", reqData)
defer closeResponse(resp)
if err != nil {
return []BucketInfo{}, err
}
if resp.StatusCode != http.StatusOK {
return []BucketInfo{}, errors.New("Got HTTP Status: " + resp.Status)
}
var listBucketsHealResult ListBucketsHealResponse
err = xml.NewDecoder(resp.Body).Decode(&listBucketsHealResult)
if err != nil {
return []BucketInfo{}, err
}
var bucketsToBeHealed []BucketInfo
for _, bucket := range listBucketsHealResult.Buckets.Buckets {
creationDate, err := time.Parse(timeFormatAMZLong, bucket.CreationDate)
if err != nil {
return []BucketInfo{}, err
}
bucketsToBeHealed = append(bucketsToBeHealed,
BucketInfo{
Name: bucket.Name,
Created: creationDate,
HealBucketInfo: bucket.HealBucketInfo,
})
}
return bucketsToBeHealed, nil
}
// HealBucket - Heal the given bucket // HealBucket - Heal the given bucket
func (adm *AdminClient) HealBucket(bucket string, dryrun bool) error { func (adm *AdminClient) HealBucket(bucket string, dryrun bool) error {
// Construct query params. // Construct query params.