mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
avoid a crash in crawler when lifecycle is not initialized (#11170)
Bonus for static buffers use bytes.NewReader instead of bytes.NewBuffer, to use a more reader friendly implementation
This commit is contained in:
parent
d3c853a3be
commit
c19e6ce773
@ -364,7 +364,7 @@ func TestExtractHealInitParams(t *testing.T) {
|
||||
// Test all combinations!
|
||||
for pIdx, parms := range qParmsArr {
|
||||
for vIdx, vars := range varsArr {
|
||||
_, err := extractHealInitParams(vars, parms, bytes.NewBuffer([]byte(body)))
|
||||
_, err := extractHealInitParams(vars, parms, bytes.NewReader([]byte(body)))
|
||||
isErrCase := false
|
||||
if pIdx < 4 || vIdx < 1 {
|
||||
isErrCase = true
|
||||
|
@ -55,7 +55,7 @@ func runPutObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
// insert the object.
|
||||
objInfo, err := obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i),
|
||||
mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
@ -114,7 +114,7 @@ func runPutObjectPartBenchmark(b *testing.B, obj ObjectLayer, partSize int) {
|
||||
md5hex := getMD5Hash([]byte(textPartData))
|
||||
var partInfo PartInfo
|
||||
partInfo, err = obj.PutObjectPart(context.Background(), bucket, object, uploadID, j,
|
||||
mustGetPutObjReader(b, bytes.NewBuffer(textPartData), int64(len(textPartData)), md5hex, sha256hex), ObjectOptions{})
|
||||
mustGetPutObjReader(b, bytes.NewReader(textPartData), int64(len(textPartData)), md5hex, sha256hex), ObjectOptions{})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
@ -200,7 +200,7 @@ func runGetObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) {
|
||||
// insert the object.
|
||||
var objInfo ObjectInfo
|
||||
objInfo, err = obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i),
|
||||
mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
@ -301,7 +301,7 @@ func runPutObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) {
|
||||
for pb.Next() {
|
||||
// insert the object.
|
||||
objInfo, err := obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i),
|
||||
mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
@ -340,7 +340,7 @@ func runGetObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) {
|
||||
// insert the object.
|
||||
var objInfo ObjectInfo
|
||||
objInfo, err = obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i),
|
||||
mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ func TestRemoveBucketHandler(t *testing.T) {
|
||||
|
||||
func testRemoveBucketHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
|
||||
credentials auth.Credentials, t *testing.T) {
|
||||
_, err := obj.PutObject(GlobalContext, bucketName, "test-object", mustGetPutObjReader(t, bytes.NewBuffer([]byte{}), int64(0), "", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), ObjectOptions{})
|
||||
_, err := obj.PutObject(GlobalContext, bucketName, "test-object", mustGetPutObjReader(t, bytes.NewReader([]byte{}), int64(0), "", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), ObjectOptions{})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Error uploading object: <ERROR> %v", err)
|
||||
@ -669,7 +669,7 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
|
||||
for i := 0; i < 10; i++ {
|
||||
objectName := "test-object-" + strconv.Itoa(i)
|
||||
// uploading the object.
|
||||
_, err = obj.PutObject(GlobalContext, bucketName, objectName, mustGetPutObjReader(t, bytes.NewBuffer(contentBytes), int64(len(contentBytes)), "", sha256sum), ObjectOptions{})
|
||||
_, err = obj.PutObject(GlobalContext, bucketName, objectName, mustGetPutObjReader(t, bytes.NewReader(contentBytes), int64(len(contentBytes)), "", sha256sum), ObjectOptions{})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Put Object %d: Error uploading object: <ERROR> %v", i, err)
|
||||
|
@ -219,7 +219,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
}
|
||||
if len(cache.Info.BloomFilter) > 0 {
|
||||
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
|
||||
_, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter))
|
||||
_, err := s.withFilter.ReadFrom(bytes.NewReader(cache.Info.BloomFilter))
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
|
||||
s.withFilter = nil
|
||||
|
@ -225,7 +225,7 @@ func TestDataUpdateTracker(t *testing.T) {
|
||||
|
||||
// Rerun test with returned bfr2
|
||||
bf := dut.newBloomFilter()
|
||||
_, err = bf.ReadFrom(bytes.NewBuffer(bfr2.Filter))
|
||||
_, err = bf.ReadFrom(bytes.NewReader(bfr2.Filter))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -163,8 +163,8 @@ func cmpReaders(r1, r2 io.Reader) (bool, string) {
|
||||
|
||||
func TestCmpReaders(t *testing.T) {
|
||||
{
|
||||
r1 := bytes.NewBuffer([]byte("abc"))
|
||||
r2 := bytes.NewBuffer([]byte("abc"))
|
||||
r1 := bytes.NewReader([]byte("abc"))
|
||||
r2 := bytes.NewReader([]byte("abc"))
|
||||
ok, msg := cmpReaders(r1, r2)
|
||||
if !(ok && msg == "") {
|
||||
t.Fatalf("unexpected")
|
||||
@ -172,8 +172,8 @@ func TestCmpReaders(t *testing.T) {
|
||||
}
|
||||
|
||||
{
|
||||
r1 := bytes.NewBuffer([]byte("abc"))
|
||||
r2 := bytes.NewBuffer([]byte("abcd"))
|
||||
r1 := bytes.NewReader([]byte("abc"))
|
||||
r2 := bytes.NewReader([]byte("abcd"))
|
||||
ok, _ := cmpReaders(r1, r2)
|
||||
if ok {
|
||||
t.Fatalf("unexpected")
|
||||
|
@ -19,6 +19,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
@ -37,6 +38,65 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
|
||||
return newDisks
|
||||
}
|
||||
|
||||
type sortSlices struct {
|
||||
disks []StorageAPI
|
||||
infos []DiskInfo
|
||||
}
|
||||
|
||||
type sortByOther sortSlices
|
||||
|
||||
func (sbo sortByOther) Len() int {
|
||||
return len(sbo.disks)
|
||||
}
|
||||
|
||||
func (sbo sortByOther) Swap(i, j int) {
|
||||
sbo.disks[i], sbo.disks[j] = sbo.disks[j], sbo.disks[i]
|
||||
sbo.infos[i], sbo.infos[j] = sbo.infos[j], sbo.infos[i]
|
||||
}
|
||||
|
||||
func (sbo sortByOther) Less(i, j int) bool {
|
||||
return sbo.infos[i].UsedInodes < sbo.infos[j].UsedInodes
|
||||
}
|
||||
|
||||
func (er erasureObjects) getOnlineDisksSortedByUsedInodes() (newDisks []StorageAPI) {
|
||||
disks := er.getDisks()
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
var infos []DiskInfo
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if disks[i-1] == nil {
|
||||
return
|
||||
}
|
||||
di, err := disks[i-1].DiskInfo(context.Background())
|
||||
if err != nil || di.Healing {
|
||||
|
||||
// - Do not consume disks which are not reachable
|
||||
// unformatted or simply not accessible for some reason.
|
||||
//
|
||||
// - Do not consume disks which are being healed
|
||||
//
|
||||
// - Future: skip busy disks
|
||||
return
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
newDisks = append(newDisks, disks[i-1])
|
||||
infos = append(infos, di)
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
slices := sortSlices{newDisks, infos}
|
||||
sort.Sort(sortByOther(slices))
|
||||
|
||||
return slices.disks
|
||||
}
|
||||
|
||||
func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
|
||||
disks := er.getDisks()
|
||||
var wg sync.WaitGroup
|
||||
|
@ -245,8 +245,8 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
return nil
|
||||
}
|
||||
|
||||
// Collect disks we can use.
|
||||
disks := er.getOnlineDisks()
|
||||
// Collect disks we can use, sorted by least inode usage.
|
||||
disks := er.getOnlineDisksSortedByUsedInodes()
|
||||
if len(disks) == 0 {
|
||||
logger.Info(color.Green("data-crawl:") + " all disks are offline or being healed, skipping crawl")
|
||||
return nil
|
||||
@ -312,7 +312,6 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
defer saverWg.Done()
|
||||
var lastSave time.Time
|
||||
|
||||
saveLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -327,17 +326,17 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
lastSave = cache.Info.LastUpdate
|
||||
case v, ok := <-bucketResults:
|
||||
if !ok {
|
||||
break saveLoop
|
||||
// Save final state...
|
||||
cache.Info.NextCycle++
|
||||
cache.Info.LastUpdate = time.Now()
|
||||
logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName))
|
||||
updates <- cache
|
||||
return
|
||||
}
|
||||
cache.replace(v.Name, v.Parent, v.Entry)
|
||||
cache.Info.LastUpdate = time.Now()
|
||||
}
|
||||
}
|
||||
// Save final state...
|
||||
cache.Info.NextCycle++
|
||||
cache.Info.LastUpdate = time.Now()
|
||||
logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName))
|
||||
updates <- cache
|
||||
}()
|
||||
|
||||
// Start one crawler per disk
|
||||
|
@ -43,7 +43,7 @@ func TestIsValidLocationContraint(t *testing.T) {
|
||||
|
||||
// Corrupted XML
|
||||
malformedReq := &http.Request{
|
||||
Body: ioutil.NopCloser(bytes.NewBuffer([]byte("<>"))),
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("<>"))),
|
||||
ContentLength: int64(len("<>")),
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ func TestIsValidLocationContraint(t *testing.T) {
|
||||
createBucketConfig := createBucketLocationConfiguration{}
|
||||
createBucketConfig.Location = location
|
||||
createBucketConfigBytes, _ := xml.Marshal(createBucketConfig)
|
||||
createBucketConfigBuffer := bytes.NewBuffer(createBucketConfigBytes)
|
||||
createBucketConfigBuffer := bytes.NewReader(createBucketConfigBytes)
|
||||
req.Body = ioutil.NopCloser(createBucketConfigBuffer)
|
||||
req.ContentLength = int64(createBucketConfigBuffer.Len())
|
||||
return req
|
||||
|
@ -33,7 +33,7 @@ func loadMetacacheSample(t testing.TB) *metacacheReader {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r, err := newMetacacheReader(bytes.NewBuffer(b))
|
||||
r, err := newMetacacheReader(bytes.NewReader(b))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -342,7 +342,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
|
||||
logger.LogIf(ctx, zerr)
|
||||
continue
|
||||
}
|
||||
if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil {
|
||||
if _, err = io.Copy(zwriter, bytes.NewReader(data)); err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
|
||||
ctx := logger.SetReqInfo(ctx, reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
@ -387,7 +387,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
|
||||
return profilingDataFound
|
||||
}
|
||||
|
||||
if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil {
|
||||
if _, err = io.Copy(zwriter, bytes.NewReader(data)); err != nil {
|
||||
return profilingDataFound
|
||||
}
|
||||
}
|
||||
@ -443,7 +443,7 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6
|
||||
if err == nil && bfr.Complete {
|
||||
nbf := intDataUpdateTracker.newBloomFilter()
|
||||
bf = &nbf
|
||||
_, err = bf.ReadFrom(bytes.NewBuffer(bfr.Filter))
|
||||
_, err = bf.ReadFrom(bytes.NewReader(bfr.Filter))
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
||||
@ -471,7 +471,7 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6
|
||||
}
|
||||
|
||||
var tmp bloom.BloomFilter
|
||||
_, err = tmp.ReadFrom(bytes.NewBuffer(serverBF.Filter))
|
||||
_, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter))
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
bf = nil
|
||||
@ -508,7 +508,7 @@ func (sys *NotificationSys) collectBloomFilter(ctx context.Context, from uint64)
|
||||
if err == nil && bfr.Complete {
|
||||
nbf := intDataUpdateTracker.newBloomFilter()
|
||||
bf = &nbf
|
||||
_, err = bf.ReadFrom(bytes.NewBuffer(bfr.Filter))
|
||||
_, err = bf.ReadFrom(bytes.NewReader(bfr.Filter))
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
if !bfr.Complete {
|
||||
@ -540,7 +540,7 @@ func (sys *NotificationSys) collectBloomFilter(ctx context.Context, from uint64)
|
||||
}
|
||||
|
||||
var tmp bloom.BloomFilter
|
||||
_, err = tmp.ReadFrom(bytes.NewBuffer(serverBF.Filter))
|
||||
_, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter))
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
bf = nil
|
||||
|
@ -17,10 +17,10 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@ -91,7 +91,7 @@ func testDeleteObject(obj ObjectLayer, instanceType string, t TestErrHandler) {
|
||||
|
||||
for _, object := range testCase.objectToUploads {
|
||||
md5Bytes := md5.Sum([]byte(object.content))
|
||||
_, err = obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
|
||||
_, err = obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, strings.NewReader(object.content),
|
||||
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
||||
|
@ -85,7 +85,7 @@ func testAPIHeadObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
// iterate through the above set of inputs and upload the object.
|
||||
for i, input := range putObjectInputs {
|
||||
// uploading the object.
|
||||
_, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
_, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
|
||||
@ -357,7 +357,7 @@ func testAPIGetObjectHandler(obj ObjectLayer, instanceType, bucketName string, a
|
||||
// iterate through the above set of inputs and upload the object.
|
||||
for i, input := range putObjectInputs {
|
||||
// uploading the object.
|
||||
_, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
_, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
|
||||
@ -1571,7 +1571,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
|
||||
for i, input := range putObjectInputs {
|
||||
// uploading the object.
|
||||
_, err = obj.PutObject(context.Background(), input.bucketName, input.objectName,
|
||||
mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
|
||||
@ -1681,7 +1681,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
|
||||
// iterate through the above set of inputs and upload the object.
|
||||
for i, input := range putObjectInputs {
|
||||
// uploading the object.
|
||||
_, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
_, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
|
||||
@ -2018,7 +2018,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
for i, input := range putObjectInputs {
|
||||
// uploading the object.
|
||||
var objInfo ObjectInfo
|
||||
objInfo, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.md5sum, ""), ObjectOptions{UserDefined: input.metaData})
|
||||
objInfo, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.md5sum, ""), ObjectOptions{UserDefined: input.metaData})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
|
||||
@ -2669,7 +2669,7 @@ func testAPICompleteMultipartHandler(obj ObjectLayer, instanceType, bucketName s
|
||||
// Iterating over creatPartCases to generate multipart chunks.
|
||||
for _, part := range parts {
|
||||
_, err = obj.PutObjectPart(context.Background(), part.bucketName, part.objName, part.uploadID, part.PartID,
|
||||
mustGetPutObjReader(t, bytes.NewBufferString(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts)
|
||||
mustGetPutObjReader(t, strings.NewReader(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts)
|
||||
if err != nil {
|
||||
t.Fatalf("%s : %s", instanceType, err)
|
||||
}
|
||||
@ -3040,7 +3040,7 @@ func testAPIAbortMultipartHandler(obj ObjectLayer, instanceType, bucketName stri
|
||||
// Iterating over createPartCases to generate multipart chunks.
|
||||
for _, part := range parts {
|
||||
_, err = obj.PutObjectPart(context.Background(), part.bucketName, part.objName, part.uploadID, part.PartID,
|
||||
mustGetPutObjReader(t, bytes.NewBufferString(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts)
|
||||
mustGetPutObjReader(t, strings.NewReader(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts)
|
||||
if err != nil {
|
||||
t.Fatalf("%s : %s", instanceType, err)
|
||||
}
|
||||
@ -3177,7 +3177,7 @@ func testAPIDeleteObjectHandler(obj ObjectLayer, instanceType, bucketName string
|
||||
// iterate through the above set of inputs and upload the object.
|
||||
for i, input := range putObjectInputs {
|
||||
// uploading the object.
|
||||
_, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
_, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData})
|
||||
// if object upload fails stop the test.
|
||||
if err != nil {
|
||||
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
|
||||
|
@ -25,16 +25,17 @@ import (
|
||||
// DiskInfo is an extended type which returns current
|
||||
// disk usage per path.
|
||||
type DiskInfo struct {
|
||||
Total uint64
|
||||
Free uint64
|
||||
Used uint64
|
||||
FSType string
|
||||
RootDisk bool
|
||||
Healing bool
|
||||
Endpoint string
|
||||
MountPath string
|
||||
ID string
|
||||
Error string // carries the error over the network
|
||||
Total uint64
|
||||
Free uint64
|
||||
Used uint64
|
||||
UsedInodes uint64
|
||||
FSType string
|
||||
RootDisk bool
|
||||
Healing bool
|
||||
Endpoint string
|
||||
MountPath string
|
||||
ID string
|
||||
Error string // carries the error over the network
|
||||
}
|
||||
|
||||
// VolsInfo is a collection of volume(bucket) information
|
||||
|
@ -42,6 +42,12 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err, "Used")
|
||||
return
|
||||
}
|
||||
case "UsedInodes":
|
||||
z.UsedInodes, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "UsedInodes")
|
||||
return
|
||||
}
|
||||
case "FSType":
|
||||
z.FSType, err = dc.ReadString()
|
||||
if err != nil {
|
||||
@ -97,9 +103,9 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// map header, size 10
|
||||
// map header, size 11
|
||||
// write "Total"
|
||||
err = en.Append(0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
|
||||
err = en.Append(0x8b, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -128,6 +134,16 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
err = msgp.WrapError(err, "Used")
|
||||
return
|
||||
}
|
||||
// write "UsedInodes"
|
||||
err = en.Append(0xaa, 0x55, 0x73, 0x65, 0x64, 0x49, 0x6e, 0x6f, 0x64, 0x65, 0x73)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.UsedInodes)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "UsedInodes")
|
||||
return
|
||||
}
|
||||
// write "FSType"
|
||||
err = en.Append(0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65)
|
||||
if err != nil {
|
||||
@ -204,9 +220,9 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// map header, size 10
|
||||
// map header, size 11
|
||||
// string "Total"
|
||||
o = append(o, 0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
|
||||
o = append(o, 0x8b, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
|
||||
o = msgp.AppendUint64(o, z.Total)
|
||||
// string "Free"
|
||||
o = append(o, 0xa4, 0x46, 0x72, 0x65, 0x65)
|
||||
@ -214,6 +230,9 @@ func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
// string "Used"
|
||||
o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x64)
|
||||
o = msgp.AppendUint64(o, z.Used)
|
||||
// string "UsedInodes"
|
||||
o = append(o, 0xaa, 0x55, 0x73, 0x65, 0x64, 0x49, 0x6e, 0x6f, 0x64, 0x65, 0x73)
|
||||
o = msgp.AppendUint64(o, z.UsedInodes)
|
||||
// string "FSType"
|
||||
o = append(o, 0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65)
|
||||
o = msgp.AppendString(o, z.FSType)
|
||||
@ -274,6 +293,12 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err, "Used")
|
||||
return
|
||||
}
|
||||
case "UsedInodes":
|
||||
z.UsedInodes, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "UsedInodes")
|
||||
return
|
||||
}
|
||||
case "FSType":
|
||||
z.FSType, bts, err = msgp.ReadStringBytes(bts)
|
||||
if err != nil {
|
||||
@ -330,7 +355,7 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
|
||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||
func (z *DiskInfo) Msgsize() (s int) {
|
||||
s = 1 + 6 + msgp.Uint64Size + 5 + msgp.Uint64Size + 5 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.FSType) + 9 + msgp.BoolSize + 8 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 10 + msgp.StringPrefixSize + len(z.MountPath) + 3 + msgp.StringPrefixSize + len(z.ID) + 6 + msgp.StringPrefixSize + len(z.Error)
|
||||
s = 1 + 6 + msgp.Uint64Size + 5 + msgp.Uint64Size + 5 + msgp.Uint64Size + 11 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.FSType) + 9 + msgp.BoolSize + 8 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 10 + msgp.StringPrefixSize + len(z.MountPath) + 3 + msgp.StringPrefixSize + len(z.ID) + 6 + msgp.StringPrefixSize + len(z.Error)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -332,20 +332,22 @@ func (s *xlStorage) Healing() bool {
|
||||
}
|
||||
|
||||
func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
|
||||
var lc *lifecycle.Lifecycle
|
||||
var err error
|
||||
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, err := globalLifecycleSys.Get(cache.Info.Name)
|
||||
if err == nil && lc.HasActiveRules("", true) {
|
||||
cache.Info.lifeCycle = lc
|
||||
if intDataUpdateTracker.debug {
|
||||
logger.Info(color.Green("crawlDisk:") + " lifecycle: Active rules found")
|
||||
if globalLifecycleSys != nil {
|
||||
lc, err = globalLifecycleSys.Get(cache.Info.Name)
|
||||
if err == nil && lc.HasActiveRules("", true) {
|
||||
cache.Info.lifeCycle = lc
|
||||
if intDataUpdateTracker.debug {
|
||||
logger.Info(color.Green("crawlDisk:") + " lifecycle: Active rules found")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get object api
|
||||
// return initialized object layer
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
return cache, errServerNotInitialized
|
||||
}
|
||||
|
||||
globalHealConfigMu.Lock()
|
||||
healOpts := globalHealConfig
|
||||
@ -388,31 +390,33 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
|
||||
successorModTime = fivs.Versions[i-1].ModTime
|
||||
}
|
||||
oi := version.ToObjectInfo(item.bucket, item.objectPath())
|
||||
size := item.applyActions(ctx, objAPI, actionMeta{
|
||||
numVersions: numVersions,
|
||||
successorModTime: successorModTime,
|
||||
oi: oi,
|
||||
})
|
||||
if !version.Deleted {
|
||||
// Bitrot check local data
|
||||
if size > 0 && item.heal && healOpts.Bitrot {
|
||||
// HealObject verifies bitrot requirement internally
|
||||
res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{
|
||||
Remove: healDeleteDangling,
|
||||
ScanMode: madmin.HealDeepScan,
|
||||
})
|
||||
if err != nil {
|
||||
if !errors.Is(err, NotImplemented{}) {
|
||||
logger.LogIf(ctx, err)
|
||||
if objAPI != nil {
|
||||
size := item.applyActions(ctx, objAPI, actionMeta{
|
||||
numVersions: numVersions,
|
||||
successorModTime: successorModTime,
|
||||
oi: oi,
|
||||
})
|
||||
if !version.Deleted {
|
||||
// Bitrot check local data
|
||||
if size > 0 && item.heal && healOpts.Bitrot {
|
||||
// HealObject verifies bitrot requirement internally
|
||||
res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{
|
||||
Remove: healDeleteDangling,
|
||||
ScanMode: madmin.HealDeepScan,
|
||||
})
|
||||
if err != nil {
|
||||
if !errors.Is(err, NotImplemented{}) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
size = 0
|
||||
} else {
|
||||
size = res.ObjectSize
|
||||
}
|
||||
size = 0
|
||||
} else {
|
||||
size = res.ObjectSize
|
||||
}
|
||||
totalSize += size
|
||||
}
|
||||
totalSize += size
|
||||
item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}, &sizeS)
|
||||
}
|
||||
item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}, &sizeS)
|
||||
}
|
||||
sizeS.totalSize = totalSize
|
||||
return sizeS, nil
|
||||
@ -449,6 +453,7 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
|
||||
dcinfo.Total = di.Total
|
||||
dcinfo.Free = di.Free
|
||||
dcinfo.Used = di.Used
|
||||
dcinfo.UsedInodes = di.Files - di.Ffree
|
||||
dcinfo.FSType = di.FSType
|
||||
|
||||
diskID, err := s.GetDiskID()
|
||||
|
Loading…
Reference in New Issue
Block a user