Merge branch 'master' into resiliency-test

This commit is contained in:
Shubhendu 2024-10-09 18:22:34 +05:30 committed by GitHub
commit d71b2a7e0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 136 additions and 1956 deletions

View File

@ -93,11 +93,18 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
return return
} }
if globalInplaceUpdateDisabled || currentReleaseTime.IsZero() { if globalInplaceUpdateDisabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return return
} }
if currentReleaseTime.IsZero() || currentReleaseTime.Equal(timeSentinel) {
apiErr := errorCodes.ToAPIErr(ErrMethodNotAllowed)
apiErr.Description = fmt.Sprintf("unable to perform in-place update, release time is unrecognized: %s", currentReleaseTime)
writeErrorResponseJSON(ctx, w, apiErr, r.URL)
return
}
vars := mux.Vars(r) vars := mux.Vars(r)
updateURL := vars["updateURL"] updateURL := vars["updateURL"]
dryRun := r.Form.Get("dry-run") == "true" dryRun := r.Form.Get("dry-run") == "true"
@ -110,6 +117,11 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
} }
} }
local := globalLocalNodeName
if local == "" {
local = "127.0.0.1"
}
u, err := url.Parse(updateURL) u, err := url.Parse(updateURL)
if err != nil { if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
@ -128,6 +140,39 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
return return
} }
updateStatus := madmin.ServerUpdateStatusV2{
DryRun: dryRun,
Results: make([]madmin.ServerPeerUpdateStatus, 0, len(globalNotificationSys.allPeerClients)),
}
peerResults := make(map[string]madmin.ServerPeerUpdateStatus, len(globalNotificationSys.allPeerClients))
failedClients := make(map[int]bool, len(globalNotificationSys.allPeerClients))
if lrTime.Sub(currentReleaseTime) <= 0 {
updateStatus.Results = append(updateStatus.Results, madmin.ServerPeerUpdateStatus{
Host: local,
Err: fmt.Sprintf("server is running the latest version: %s", Version),
CurrentVersion: Version,
})
for _, client := range globalNotificationSys.peerClients {
updateStatus.Results = append(updateStatus.Results, madmin.ServerPeerUpdateStatus{
Host: client.String(),
Err: fmt.Sprintf("server is running the latest version: %s", Version),
CurrentVersion: Version,
})
}
// Marshal API response
jsonBytes, err := json.Marshal(updateStatus)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
writeSuccessResponseJSON(w, jsonBytes)
return
}
u.Path = path.Dir(u.Path) + SlashSeparator + releaseInfo u.Path = path.Dir(u.Path) + SlashSeparator + releaseInfo
// Download Binary Once // Download Binary Once
binC, bin, err := downloadBinary(u, mode) binC, bin, err := downloadBinary(u, mode)
@ -137,16 +182,6 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
return return
} }
updateStatus := madmin.ServerUpdateStatusV2{DryRun: dryRun}
peerResults := make(map[string]madmin.ServerPeerUpdateStatus)
local := globalLocalNodeName
if local == "" {
local = "127.0.0.1"
}
failedClients := make(map[int]struct{})
if globalIsDistErasure { if globalIsDistErasure {
// Push binary to other servers // Push binary to other servers
for idx, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, binC) { for idx, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, binC) {
@ -156,7 +191,7 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
Err: nerr.Err.Error(), Err: nerr.Err.Error(),
CurrentVersion: Version, CurrentVersion: Version,
} }
failedClients[idx] = struct{}{} failedClients[idx] = true
} else { } else {
peerResults[nerr.Host.String()] = madmin.ServerPeerUpdateStatus{ peerResults[nerr.Host.String()] = madmin.ServerPeerUpdateStatus{
Host: nerr.Host.String(), Host: nerr.Host.String(),
@ -167,25 +202,17 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
} }
} }
if lrTime.Sub(currentReleaseTime) > 0 { if err = verifyBinary(u, sha256Sum, releaseInfo, mode, bytes.NewReader(bin)); err != nil {
if err = verifyBinary(u, sha256Sum, releaseInfo, mode, bytes.NewReader(bin)); err != nil { peerResults[local] = madmin.ServerPeerUpdateStatus{
peerResults[local] = madmin.ServerPeerUpdateStatus{ Host: local,
Host: local, Err: err.Error(),
Err: err.Error(), CurrentVersion: Version,
CurrentVersion: Version,
}
} else {
peerResults[local] = madmin.ServerPeerUpdateStatus{
Host: local,
CurrentVersion: Version,
UpdatedVersion: lrTime.Format(MinioReleaseTagTimeLayout),
}
} }
} else { } else {
peerResults[local] = madmin.ServerPeerUpdateStatus{ peerResults[local] = madmin.ServerPeerUpdateStatus{
Host: local, Host: local,
Err: fmt.Sprintf("server is already running the latest version: %s", Version),
CurrentVersion: Version, CurrentVersion: Version,
UpdatedVersion: lrTime.Format(MinioReleaseTagTimeLayout),
} }
} }
@ -193,8 +220,7 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
if globalIsDistErasure { if globalIsDistErasure {
ng := WithNPeers(len(globalNotificationSys.peerClients)) ng := WithNPeers(len(globalNotificationSys.peerClients))
for idx, client := range globalNotificationSys.peerClients { for idx, client := range globalNotificationSys.peerClients {
_, ok := failedClients[idx] if failedClients[idx] {
if ok {
continue continue
} }
client := client client := client
@ -240,14 +266,14 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
startTime := time.Now().Add(restartUpdateDelay) startTime := time.Now().Add(restartUpdateDelay)
ng := WithNPeers(len(globalNotificationSys.peerClients)) ng := WithNPeers(len(globalNotificationSys.peerClients))
for idx, client := range globalNotificationSys.peerClients { for idx, client := range globalNotificationSys.peerClients {
_, ok := failedClients[idx] if failedClients[idx] {
if ok {
continue continue
} }
client := client client := client
ng.Go(ctx, func() error { ng.Go(ctx, func() error {
prs, ok := peerResults[client.String()] prs, ok := peerResults[client.String()]
if ok && prs.CurrentVersion != prs.UpdatedVersion && prs.UpdatedVersion != "" { // We restart only on success, not for any failures.
if ok && prs.Err == "" {
return client.SignalService(serviceRestart, "", dryRun, &startTime) return client.SignalService(serviceRestart, "", dryRun, &startTime)
} }
return nil return nil
@ -284,7 +310,9 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
writeSuccessResponseJSON(w, jsonBytes) writeSuccessResponseJSON(w, jsonBytes)
if !dryRun { if !dryRun {
if lrTime.Sub(currentReleaseTime) > 0 { prs, ok := peerResults[local]
// We restart only on success, not for any failures.
if ok && prs.Err == "" {
globalServiceSignalCh <- serviceRestart globalServiceSignalCh <- serviceRestart
} }
} }

View File

@ -57,15 +57,14 @@ type versionsHistogram [dataUsageVersionLen]uint64
type dataUsageEntry struct { type dataUsageEntry struct {
Children dataUsageHashMap `msg:"ch"` Children dataUsageHashMap `msg:"ch"`
// These fields do no include any children. // These fields do no include any children.
Size int64 `msg:"sz"` Size int64 `msg:"sz"`
Objects uint64 `msg:"os"` Objects uint64 `msg:"os"`
Versions uint64 `msg:"vs"` // Versions that are not delete markers. Versions uint64 `msg:"vs"` // Versions that are not delete markers.
DeleteMarkers uint64 `msg:"dms"` DeleteMarkers uint64 `msg:"dms"`
ObjSizes sizeHistogram `msg:"szs"` ObjSizes sizeHistogram `msg:"szs"`
ObjVersions versionsHistogram `msg:"vh"` ObjVersions versionsHistogram `msg:"vh"`
ReplicationStats *replicationAllStats `msg:"rs,omitempty"` AllTierStats *allTierStats `msg:"ats,omitempty"`
AllTierStats *allTierStats `msg:"ats,omitempty"` Compacted bool `msg:"c"`
Compacted bool `msg:"c"`
} }
// allTierStats is a collection of per-tier stats across all configured remote // allTierStats is a collection of per-tier stats across all configured remote
@ -135,93 +134,6 @@ func (ts tierStats) add(u tierStats) tierStats {
} }
} }
//msgp:tuple replicationStatsV1
type replicationStatsV1 struct {
PendingSize uint64
ReplicatedSize uint64
FailedSize uint64
ReplicaSize uint64
FailedCount uint64
PendingCount uint64
MissedThresholdSize uint64
AfterThresholdSize uint64
MissedThresholdCount uint64
AfterThresholdCount uint64
}
func (rsv1 replicationStatsV1) Empty() bool {
return rsv1.ReplicatedSize == 0 &&
rsv1.FailedSize == 0 &&
rsv1.FailedCount == 0
}
//msgp:tuple replicationStats
type replicationStats struct {
PendingSize uint64
ReplicatedSize uint64
FailedSize uint64
FailedCount uint64
PendingCount uint64
MissedThresholdSize uint64
AfterThresholdSize uint64
MissedThresholdCount uint64
AfterThresholdCount uint64
ReplicatedCount uint64
}
func (rs replicationStats) Empty() bool {
return rs.ReplicatedSize == 0 &&
rs.FailedSize == 0 &&
rs.FailedCount == 0
}
type replicationAllStats struct {
Targets map[string]replicationStats `msg:"t,omitempty"`
ReplicaSize uint64 `msg:"r,omitempty"`
ReplicaCount uint64 `msg:"rc,omitempty"`
}
//msgp:tuple replicationAllStatsV1
type replicationAllStatsV1 struct {
Targets map[string]replicationStats
ReplicaSize uint64 `msg:"ReplicaSize,omitempty"`
ReplicaCount uint64 `msg:"ReplicaCount,omitempty"`
}
// empty returns true if the replicationAllStats is empty (contains no entries).
func (r *replicationAllStats) empty() bool {
if r == nil {
return true
}
if r.ReplicaSize != 0 || r.ReplicaCount != 0 {
return false
}
for _, v := range r.Targets {
if !v.Empty() {
return false
}
}
return true
}
// clone creates a deep-copy clone.
func (r *replicationAllStats) clone() *replicationAllStats {
if r == nil {
return nil
}
// Shallow copy
dst := *r
// Copy individual targets.
dst.Targets = make(map[string]replicationStats, len(r.Targets))
for k, v := range r.Targets {
dst.Targets[k] = v
}
return &dst
}
//msgp:encode ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6 dataUsageEntryV7 //msgp:encode ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6 dataUsageEntryV7
//msgp:marshal ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6 dataUsageEntryV7 //msgp:marshal ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6 dataUsageEntryV7
@ -237,62 +149,54 @@ type dataUsageEntryV2 struct {
//msgp:tuple dataUsageEntryV3 //msgp:tuple dataUsageEntryV3
type dataUsageEntryV3 struct { type dataUsageEntryV3 struct {
// These fields do no include any children. // These fields do no include any children.
Size int64 Size int64
ReplicatedSize uint64 Objects uint64
ReplicationPendingSize uint64 ObjSizes sizeHistogram
ReplicationFailedSize uint64 Children dataUsageHashMap
ReplicaSize uint64
Objects uint64
ObjSizes sizeHistogram
Children dataUsageHashMap
} }
//msgp:tuple dataUsageEntryV4 //msgp:tuple dataUsageEntryV4
type dataUsageEntryV4 struct { type dataUsageEntryV4 struct {
Children dataUsageHashMap Children dataUsageHashMap
// These fields do no include any children. // These fields do no include any children.
Size int64 Size int64
Objects uint64 Objects uint64
ObjSizes sizeHistogram ObjSizes sizeHistogram
ReplicationStats replicationStatsV1
} }
//msgp:tuple dataUsageEntryV5 //msgp:tuple dataUsageEntryV5
type dataUsageEntryV5 struct { type dataUsageEntryV5 struct {
Children dataUsageHashMap Children dataUsageHashMap
// These fields do no include any children. // These fields do no include any children.
Size int64 Size int64
Objects uint64 Objects uint64
Versions uint64 // Versions that are not delete markers. Versions uint64 // Versions that are not delete markers.
ObjSizes sizeHistogram ObjSizes sizeHistogram
ReplicationStats *replicationStatsV1 Compacted bool
Compacted bool
} }
//msgp:tuple dataUsageEntryV6 //msgp:tuple dataUsageEntryV6
type dataUsageEntryV6 struct { type dataUsageEntryV6 struct {
Children dataUsageHashMap Children dataUsageHashMap
// These fields do no include any children. // These fields do no include any children.
Size int64 Size int64
Objects uint64 Objects uint64
Versions uint64 // Versions that are not delete markers. Versions uint64 // Versions that are not delete markers.
ObjSizes sizeHistogram ObjSizes sizeHistogram
ReplicationStats *replicationAllStatsV1 Compacted bool
Compacted bool
} }
type dataUsageEntryV7 struct { type dataUsageEntryV7 struct {
Children dataUsageHashMap `msg:"ch"` Children dataUsageHashMap `msg:"ch"`
// These fields do no include any children. // These fields do no include any children.
Size int64 `msg:"sz"` Size int64 `msg:"sz"`
Objects uint64 `msg:"os"` Objects uint64 `msg:"os"`
Versions uint64 `msg:"vs"` // Versions that are not delete markers. Versions uint64 `msg:"vs"` // Versions that are not delete markers.
DeleteMarkers uint64 `msg:"dms"` DeleteMarkers uint64 `msg:"dms"`
ObjSizes sizeHistogramV1 `msg:"szs"` ObjSizes sizeHistogramV1 `msg:"szs"`
ObjVersions versionsHistogram `msg:"vh"` ObjVersions versionsHistogram `msg:"vh"`
ReplicationStats *replicationAllStats `msg:"rs,omitempty"` AllTierStats *allTierStats `msg:"ats,omitempty"`
AllTierStats *allTierStats `msg:"ats,omitempty"` Compacted bool `msg:"c"`
Compacted bool `msg:"c"`
} }
// dataUsageCache contains a cache of data usage entries latest version. // dataUsageCache contains a cache of data usage entries latest version.
@ -373,29 +277,6 @@ func (e *dataUsageEntry) addSizes(summary sizeSummary) {
e.ObjSizes.add(summary.totalSize) e.ObjSizes.add(summary.totalSize)
e.ObjVersions.add(summary.versions) e.ObjVersions.add(summary.versions)
if e.ReplicationStats == nil {
e.ReplicationStats = &replicationAllStats{
Targets: make(map[string]replicationStats),
}
} else if e.ReplicationStats.Targets == nil {
e.ReplicationStats.Targets = make(map[string]replicationStats)
}
e.ReplicationStats.ReplicaSize += uint64(summary.replicaSize)
e.ReplicationStats.ReplicaCount += uint64(summary.replicaCount)
for arn, st := range summary.replTargetStats {
tgtStat, ok := e.ReplicationStats.Targets[arn]
if !ok {
tgtStat = replicationStats{}
}
tgtStat.PendingSize += uint64(st.pendingSize)
tgtStat.FailedSize += uint64(st.failedSize)
tgtStat.ReplicatedSize += uint64(st.replicatedSize)
tgtStat.ReplicatedCount += uint64(st.replicatedCount)
tgtStat.FailedCount += st.failedCount
tgtStat.PendingCount += st.pendingCount
e.ReplicationStats.Targets[arn] = tgtStat
}
if len(summary.tiers) != 0 { if len(summary.tiers) != 0 {
if e.AllTierStats == nil { if e.AllTierStats == nil {
e.AllTierStats = newAllTierStats() e.AllTierStats = newAllTierStats()
@ -410,26 +291,6 @@ func (e *dataUsageEntry) merge(other dataUsageEntry) {
e.Versions += other.Versions e.Versions += other.Versions
e.DeleteMarkers += other.DeleteMarkers e.DeleteMarkers += other.DeleteMarkers
e.Size += other.Size e.Size += other.Size
if other.ReplicationStats != nil {
if e.ReplicationStats == nil {
e.ReplicationStats = &replicationAllStats{Targets: make(map[string]replicationStats)}
} else if e.ReplicationStats.Targets == nil {
e.ReplicationStats.Targets = make(map[string]replicationStats)
}
e.ReplicationStats.ReplicaSize += other.ReplicationStats.ReplicaSize
e.ReplicationStats.ReplicaCount += other.ReplicationStats.ReplicaCount
for arn, stat := range other.ReplicationStats.Targets {
st := e.ReplicationStats.Targets[arn]
e.ReplicationStats.Targets[arn] = replicationStats{
PendingSize: stat.PendingSize + st.PendingSize,
FailedSize: stat.FailedSize + st.FailedSize,
ReplicatedSize: stat.ReplicatedSize + st.ReplicatedSize,
PendingCount: stat.PendingCount + st.PendingCount,
FailedCount: stat.FailedCount + st.FailedCount,
ReplicatedCount: stat.ReplicatedCount + st.ReplicatedCount,
}
}
}
for i, v := range other.ObjSizes[:] { for i, v := range other.ObjSizes[:] {
e.ObjSizes[i] += v e.ObjSizes[i] += v
@ -490,10 +351,7 @@ func (e dataUsageEntry) clone() dataUsageEntry {
} }
e.Children = ch e.Children = ch
} }
if e.ReplicationStats != nil {
// Clone ReplicationStats
e.ReplicationStats = e.ReplicationStats.clone()
}
if e.AllTierStats != nil { if e.AllTierStats != nil {
e.AllTierStats = e.AllTierStats.clone() e.AllTierStats = e.AllTierStats.clone()
} }
@ -931,22 +789,6 @@ func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]Bucke
ObjectSizesHistogram: flat.ObjSizes.toMap(), ObjectSizesHistogram: flat.ObjSizes.toMap(),
ObjectVersionsHistogram: flat.ObjVersions.toMap(), ObjectVersionsHistogram: flat.ObjVersions.toMap(),
} }
if flat.ReplicationStats != nil {
bui.ReplicaSize = flat.ReplicationStats.ReplicaSize
bui.ReplicaCount = flat.ReplicationStats.ReplicaCount
bui.ReplicationInfo = make(map[string]BucketTargetUsageInfo, len(flat.ReplicationStats.Targets))
for arn, stat := range flat.ReplicationStats.Targets {
bui.ReplicationInfo[arn] = BucketTargetUsageInfo{
ReplicationPendingSize: stat.PendingSize,
ReplicatedSize: stat.ReplicatedSize,
ReplicationFailedSize: stat.FailedSize,
ReplicationPendingCount: stat.PendingCount,
ReplicationFailedCount: stat.FailedCount,
ReplicatedCount: stat.ReplicatedCount,
}
}
}
dst[bucket.Name] = bui dst[bucket.Name] = bui
} }
return dst return dst
@ -959,9 +801,6 @@ func (d *dataUsageCache) sizeRecursive(path string) *dataUsageEntry {
return root return root
} }
flat := d.flatten(*root) flat := d.flatten(*root)
if flat.ReplicationStats.empty() {
flat.ReplicationStats = nil
}
return &flat return &flat
} }
@ -1238,20 +1077,6 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
ObjSizes: v.ObjSizes, ObjSizes: v.ObjSizes,
Children: v.Children, Children: v.Children,
} }
if v.ReplicatedSize > 0 || v.ReplicaSize > 0 || v.ReplicationFailedSize > 0 || v.ReplicationPendingSize > 0 {
cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
if cfg != nil && cfg.RoleArn != "" {
due.ReplicationStats = &replicationAllStats{
Targets: make(map[string]replicationStats),
}
due.ReplicationStats.ReplicaSize = v.ReplicaSize
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
ReplicatedSize: v.ReplicatedSize,
FailedSize: v.ReplicationFailedSize,
PendingSize: v.ReplicationPendingSize,
}
}
}
due.Compacted = len(due.Children) == 0 && k != d.Info.Name due.Compacted = len(due.Children) == 0 && k != d.Info.Name
d.Cache[k] = due d.Cache[k] = due
@ -1277,36 +1102,10 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
ObjSizes: v.ObjSizes, ObjSizes: v.ObjSizes,
Children: v.Children, Children: v.Children,
} }
empty := replicationStatsV1{}
if v.ReplicationStats != empty {
cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
if cfg != nil && cfg.RoleArn != "" {
due.ReplicationStats = &replicationAllStats{
Targets: make(map[string]replicationStats),
}
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
FailedSize: v.ReplicationStats.FailedSize,
FailedCount: v.ReplicationStats.FailedCount,
PendingSize: v.ReplicationStats.PendingSize,
PendingCount: v.ReplicationStats.PendingCount,
}
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
}
}
due.Compacted = len(due.Children) == 0 && k != d.Info.Name due.Compacted = len(due.Children) == 0 && k != d.Info.Name
d.Cache[k] = due d.Cache[k] = due
} }
// Populate compacted value and remove unneeded replica stats.
for k, e := range d.Cache {
if e.ReplicationStats != nil && len(e.ReplicationStats.Targets) == 0 {
e.ReplicationStats = nil
}
d.Cache[k] = e
}
return nil return nil
case dataUsageCacheVerV5: case dataUsageCacheVerV5:
// Zstd compressed. // Zstd compressed.
@ -1328,36 +1127,10 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
ObjSizes: v.ObjSizes, ObjSizes: v.ObjSizes,
Children: v.Children, Children: v.Children,
} }
if v.ReplicationStats != nil && !v.ReplicationStats.Empty() {
cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
if cfg != nil && cfg.RoleArn != "" {
due.ReplicationStats = &replicationAllStats{
Targets: make(map[string]replicationStats),
}
d.Info.replication = replicationConfig{Config: cfg}
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
FailedSize: v.ReplicationStats.FailedSize,
FailedCount: v.ReplicationStats.FailedCount,
PendingSize: v.ReplicationStats.PendingSize,
PendingCount: v.ReplicationStats.PendingCount,
}
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
}
}
due.Compacted = len(due.Children) == 0 && k != d.Info.Name due.Compacted = len(due.Children) == 0 && k != d.Info.Name
d.Cache[k] = due d.Cache[k] = due
} }
// Populate compacted value and remove unneeded replica stats.
for k, e := range d.Cache {
if e.ReplicationStats != nil && len(e.ReplicationStats.Targets) == 0 {
e.ReplicationStats = nil
}
d.Cache[k] = e
}
return nil return nil
case dataUsageCacheVerV6: case dataUsageCacheVerV6:
// Zstd compressed. // Zstd compressed.
@ -1373,22 +1146,13 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
d.Info = dold.Info d.Info = dold.Info
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache)) d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
for k, v := range dold.Cache { for k, v := range dold.Cache {
var replicationStats *replicationAllStats
if v.ReplicationStats != nil {
replicationStats = &replicationAllStats{
Targets: v.ReplicationStats.Targets,
ReplicaSize: v.ReplicationStats.ReplicaSize,
ReplicaCount: v.ReplicationStats.ReplicaCount,
}
}
due := dataUsageEntry{ due := dataUsageEntry{
Children: v.Children, Children: v.Children,
Size: v.Size, Size: v.Size,
Objects: v.Objects, Objects: v.Objects,
Versions: v.Versions, Versions: v.Versions,
ObjSizes: v.ObjSizes, ObjSizes: v.ObjSizes,
ReplicationStats: replicationStats, Compacted: v.Compacted,
Compacted: v.Compacted,
} }
d.Cache[k] = due d.Cache[k] = due
} }
@ -1410,13 +1174,12 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
var szHist sizeHistogram var szHist sizeHistogram
szHist.mergeV1(v.ObjSizes) szHist.mergeV1(v.ObjSizes)
d.Cache[k] = dataUsageEntry{ d.Cache[k] = dataUsageEntry{
Children: v.Children, Children: v.Children,
Size: v.Size, Size: v.Size,
Objects: v.Objects, Objects: v.Objects,
Versions: v.Versions, Versions: v.Versions,
ObjSizes: szHist, ObjSizes: szHist,
ReplicationStats: v.ReplicationStats, Compacted: v.Compacted,
Compacted: v.Compacted,
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -519,458 +519,6 @@ func BenchmarkDecodedataUsageEntry(b *testing.B) {
} }
} }
func TestMarshalUnmarshalreplicationAllStats(t *testing.T) {
v := replicationAllStats{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgreplicationAllStats(b *testing.B) {
v := replicationAllStats{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgreplicationAllStats(b *testing.B) {
v := replicationAllStats{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalreplicationAllStats(b *testing.B) {
v := replicationAllStats{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodereplicationAllStats(t *testing.T) {
v := replicationAllStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodereplicationAllStats Msgsize() is inaccurate")
}
vn := replicationAllStats{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodereplicationAllStats(b *testing.B) {
v := replicationAllStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodereplicationAllStats(b *testing.B) {
v := replicationAllStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalreplicationAllStatsV1(t *testing.T) {
v := replicationAllStatsV1{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgreplicationAllStatsV1(b *testing.B) {
v := replicationAllStatsV1{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgreplicationAllStatsV1(b *testing.B) {
v := replicationAllStatsV1{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalreplicationAllStatsV1(b *testing.B) {
v := replicationAllStatsV1{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodereplicationAllStatsV1(t *testing.T) {
v := replicationAllStatsV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodereplicationAllStatsV1 Msgsize() is inaccurate")
}
vn := replicationAllStatsV1{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodereplicationAllStatsV1(b *testing.B) {
v := replicationAllStatsV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodereplicationAllStatsV1(b *testing.B) {
v := replicationAllStatsV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalreplicationStats(t *testing.T) {
v := replicationStats{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgreplicationStats(b *testing.B) {
v := replicationStats{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgreplicationStats(b *testing.B) {
v := replicationStats{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalreplicationStats(b *testing.B) {
v := replicationStats{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodereplicationStats(t *testing.T) {
v := replicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodereplicationStats Msgsize() is inaccurate")
}
vn := replicationStats{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodereplicationStats(b *testing.B) {
v := replicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodereplicationStats(b *testing.B) {
v := replicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalreplicationStatsV1(t *testing.T) {
v := replicationStatsV1{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgreplicationStatsV1(b *testing.B) {
v := replicationStatsV1{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgreplicationStatsV1(b *testing.B) {
v := replicationStatsV1{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalreplicationStatsV1(b *testing.B) {
v := replicationStatsV1{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodereplicationStatsV1(t *testing.T) {
v := replicationStatsV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodereplicationStatsV1 Msgsize() is inaccurate")
}
vn := replicationStatsV1{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodereplicationStatsV1(b *testing.B) {
v := replicationStatsV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodereplicationStatsV1(b *testing.B) {
v := replicationStatsV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalsizeHistogram(t *testing.T) { func TestMarshalUnmarshalsizeHistogram(t *testing.T) {
v := sizeHistogram{} v := sizeHistogram{}
bts, err := v.MarshalMsg(nil) bts, err := v.MarshalMsg(nil)

View File

@ -587,17 +587,6 @@ func TestDataUsageCacheSerialize(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
e := want.find("abucket/dir2") e := want.find("abucket/dir2")
e.ReplicationStats = &replicationAllStats{
Targets: map[string]replicationStats{
"arn": {
PendingSize: 1,
ReplicatedSize: 2,
FailedSize: 3,
FailedCount: 5,
PendingCount: 6,
},
},
}
want.replace("abucket/dir2", "", *e) want.replace("abucket/dir2", "", *e)
var buf bytes.Buffer var buf bytes.Buffer
err = want.serializeTo(&buf) err = want.serializeTo(&buf)

View File

@ -551,9 +551,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
var root dataUsageEntry var root dataUsageEntry
if r := cache.root(); r != nil { if r := cache.root(); r != nil {
root = cache.flatten(*r) root = cache.flatten(*r)
if root.ReplicationStats.empty() {
root.ReplicationStats = nil
}
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -1522,7 +1522,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if !srcTimestamp.IsZero() { if !srcTimestamp.IsZero() {
ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastTaggingTimestamp) ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastTaggingTimestamp)
// update tagging metadata only if replica timestamp is newer than what's on disk // update tagging metadata only if replica timestamp is newer than what's on disk
if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) { if err != nil || (err == nil && !ondiskTimestamp.After(srcTimestamp)) {
srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano) srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano)
srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
} }

View File

@ -636,7 +636,7 @@ func (s *peerRESTServer) VerifyBinaryHandler(w http.ResponseWriter, r *http.Requ
} }
if lrTime.Sub(currentReleaseTime) <= 0 { if lrTime.Sub(currentReleaseTime) <= 0 {
s.writeErrorResponse(w, fmt.Errorf("server is already running the latest version: %s", Version)) s.writeErrorResponse(w, fmt.Errorf("server is running the latest version: %s", Version))
return return
} }