Add bucket lifecycle expiry feature (#7834)

This commit is contained in:
Anis Elleuch
2019-08-09 18:02:41 +01:00
committed by Harshavardhana
parent a8296445ad
commit 1ce8d2c476
17 changed files with 499 additions and 39 deletions

View File

@@ -24,6 +24,7 @@ import (
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/lifecycle"
"github.com/minio/minio/pkg/policy"
)
const (
@@ -47,7 +48,7 @@ func (api objectAPIHandlers) PutBucketLifecycleHandler(w http.ResponseWriter, r
vars := mux.Vars(r)
bucket := vars["bucket"]
if s3Error := checkRequestAuthType(ctx, r, lifecycle.PutBucketLifecycleAction, bucket, ""); s3Error != ErrNone {
if s3Error := checkRequestAuthType(ctx, r, policy.PutBucketLifecycleAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
@@ -97,7 +98,7 @@ func (api objectAPIHandlers) GetBucketLifecycleHandler(w http.ResponseWriter, r
vars := mux.Vars(r)
bucket := vars["bucket"]
if s3Error := checkRequestAuthType(ctx, r, lifecycle.GetBucketLifecycleAction, bucket, ""); s3Error != ErrNone {
if s3Error := checkRequestAuthType(ctx, r, policy.GetBucketLifecycleAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
@@ -140,7 +141,7 @@ func (api objectAPIHandlers) DeleteBucketLifecycleHandler(w http.ResponseWriter,
vars := mux.Vars(r)
bucket := vars["bucket"]
if s3Error := checkRequestAuthType(ctx, r, lifecycle.DeleteBucketLifecycleAction, bucket, ""); s3Error != ErrNone {
if s3Error := checkRequestAuthType(ctx, r, policy.PutBucketLifecycleAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}

164
cmd/daily-lifecycle-ops.go Normal file
View File

@@ -0,0 +1,164 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/lifecycle"
)
const (
bgLifecycleInterval = 24 * time.Hour
bgLifecycleTick = time.Hour
)
type lifecycleOps struct {
LastActivity time.Time
}
// Register to the daily objects listing
var globalLifecycleOps = &lifecycleOps{}
func getLocalBgLifecycleOpsStatus() BgLifecycleOpsStatus {
return BgLifecycleOpsStatus{
LastActivity: globalLifecycleOps.LastActivity,
}
}
// initDailyLifecycle starts the routine that receives the daily
// listing of all objects and applies any matching bucket lifecycle
// rules.
func initDailyLifecycle() {
go startDailyLifecycle()
}
func startDailyLifecycle() {
var objAPI ObjectLayer
var ctx = context.Background()
// Wait until the object API is ready
for {
objAPI = newObjectLayerFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Calculate the time of the last lifecycle operation in all peers node of the cluster
computeLastLifecycleActivity := func(status []BgOpsStatus) time.Time {
var lastAct time.Time
for _, st := range status {
if st.LifecycleOps.LastActivity.After(lastAct) {
lastAct = st.LifecycleOps.LastActivity
}
}
return lastAct
}
for {
// Check if we should perform lifecycle ops based on the last lifecycle activity, sleep one hour otherwise
allLifecycleStatus := []BgOpsStatus{
{LifecycleOps: getLocalBgLifecycleOpsStatus()},
}
if globalIsDistXL {
allLifecycleStatus = append(allLifecycleStatus, globalNotificationSys.BackgroundOpsStatus()...)
}
lastAct := computeLastLifecycleActivity(allLifecycleStatus)
if !lastAct.IsZero() && time.Since(lastAct) < bgLifecycleInterval {
time.Sleep(bgLifecycleTick)
}
// Perform one lifecycle operation
err := lifecycleRound(ctx, objAPI)
switch err.(type) {
// Unable to hold a lock means there is another
// instance doing the lifecycle round round
case OperationTimedOut:
time.Sleep(bgLifecycleTick)
default:
logger.LogIf(ctx, err)
time.Sleep(time.Minute)
continue
}
}
}
func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
zeroDuration := time.Millisecond
zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration)
// Lock to avoid concurrent lifecycle ops from other nodes
sweepLock := globalNSMutex.NewNSLock(ctx, "system", "daily-lifecycle-ops")
if err := sweepLock.GetLock(zeroDynamicTimeout); err != nil {
return err
}
defer sweepLock.Unlock()
buckets, err := objAPI.ListBuckets(ctx)
if err != nil {
return err
}
for _, bucket := range buckets {
// Check if the current bucket has a configured lifecycle policy, skip otherwise
l, ok := globalLifecycleSys.Get(bucket.Name)
if !ok {
continue
}
// Calculate the common prefix of all lifecycle rules
var prefixes []string
for _, rule := range l.Rules {
prefixes = append(prefixes, rule.Filter.Prefix)
}
commonPrefix := lcp(prefixes)
// List all objects and calculate lifecycle action based on object name & object modtime
marker := ""
for {
res, err := objAPI.ListObjects(ctx, bucket.Name, commonPrefix, marker, "", 1000)
if err != nil {
continue
}
for _, obj := range res.Objects {
// Find the action that need to be executed
action := l.ComputeAction(obj.Name, obj.ModTime)
switch action {
case lifecycle.DeleteAction:
objAPI.DeleteObject(ctx, bucket.Name, obj.Name)
default:
// Nothing
}
}
if !res.IsTruncated {
break
} else {
marker = res.NextMarker
}
}
}
return nil
}

View File

@@ -50,6 +50,15 @@ func (sys *LifecycleSys) Set(bucketName string, lifecycle lifecycle.Lifecycle) {
sys.bucketLifecycleMap[bucketName] = lifecycle
}
// Get - gets lifecycle config associated to a given bucket name.
func (sys *LifecycleSys) Get(bucketName string) (lifecycle lifecycle.Lifecycle, ok bool) {
sys.Lock()
defer sys.Unlock()
l, ok := sys.bucketLifecycleMap[bucketName]
return l, ok
}
func saveLifecycleConfig(ctx context.Context, objAPI ObjectLayer, bucketName string, bucketLifecycle *lifecycle.Lifecycle) error {
data, err := xml.Marshal(bucketLifecycle)
if err != nil {

View File

@@ -263,6 +263,24 @@ func (sys *NotificationSys) BackgroundHealStatus() []madmin.BgHealState {
return states
}
// BackgroundOpsStatus - returns the status of all background operations of all peers
func (sys *NotificationSys) BackgroundOpsStatus() []BgOpsStatus {
states := make([]BgOpsStatus, len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
st, err := client.BackgroundOpsStatus()
if err != nil {
logger.LogIf(context.Background(), err)
} else {
states[idx] = st
}
}
return states
}
// StartProfiling - start profiling on remote peers, by initiating a remote RPC.
func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))

View File

@@ -479,6 +479,32 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error)
return state, err
}
// BgLifecycleOpsStatus describes the status
// of the background lifecycle operations
type BgLifecycleOpsStatus struct {
LastActivity time.Time
}
// BgOpsStatus describes the status of all operations performed
// in background such as auto-healing and lifecycle.
// Notice: We need to increase peer REST API version when adding
// new fields to this struct.
type BgOpsStatus struct {
LifecycleOps BgLifecycleOpsStatus
}
func (client *peerRESTClient) BackgroundOpsStatus() (BgOpsStatus, error) {
respBody, err := client.call(peerRESTMethodBackgroundOpsStatus, nil, nil, -1)
if err != nil {
return BgOpsStatus{}, err
}
defer http.DrainBody(respBody)
state := BgOpsStatus{}
err = gob.NewDecoder(respBody).Decode(&state)
return state, err
}
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) {
values := make(url.Values)
values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll))

View File

@@ -16,7 +16,7 @@
package cmd
const peerRESTVersion = "v2"
const peerRESTVersion = "v3"
const peerRESTPath = minioReservedBucketPath + "/peer/" + peerRESTVersion
const (
@@ -27,6 +27,7 @@ const (
peerRESTMethodDeleteBucket = "deletebucket"
peerRESTMethodSignalService = "signalservice"
peerRESTMethodBackgroundHealStatus = "backgroundhealstatus"
peerRESTMethodBackgroundOpsStatus = "backgroundopsstatus"
peerRESTMethodGetLocks = "getlocks"
peerRESTMethodBucketPolicyRemove = "removebucketpolicy"
peerRESTMethodLoadUser = "loaduser"

View File

@@ -784,6 +784,22 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
func (s *peerRESTServer) BackgroundOpsStatusHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
ctx := newContext(r, w, "BackgroundOpsStatus")
state := BgOpsStatus{
LifecycleOps: getLocalBgLifecycleOpsStatus(),
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(err.Error()))
@@ -831,6 +847,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodBucketLifecycleSet).HandlerFunc(httpTraceHdrs(server.SetBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodBucketLifecycleRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodBackgroundOpsStatus).HandlerFunc(server.BackgroundOpsStatusHandler)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)

View File

@@ -385,6 +385,8 @@ func serverMain(ctx *cli.Context) {
// - compression
verifyObjectLayerFeatures("server", newObject)
initDailyLifecycle()
if globalIsXL {
initBackgroundHealing()
initDailyHeal()

View File

@@ -475,3 +475,36 @@ func reverseStringSlice(input []string) {
input[left], input[right] = input[right], input[left]
}
}
// lcp finds the longest common prefix of the input strings.
// It compares by bytes instead of runes (Unicode code points).
// It's up to the caller to do Unicode normalization if desired
// (e.g. see golang.org/x/text/unicode/norm).
func lcp(l []string) string {
// Special cases first
switch len(l) {
case 0:
return ""
case 1:
return l[0]
}
// LCP of min and max (lexigraphically)
// is the LCP of the whole set.
min, max := l[0], l[0]
for _, s := range l[1:] {
switch {
case s < min:
min = s
case s > max:
max = s
}
}
for i := 0; i < len(min) && i < len(max); i++ {
if min[i] != max[i] {
return min[:i]
}
}
// In the case where lengths are not equal but all bytes
// are equal, min is the answer ("foo" < "foobar").
return min
}

View File

@@ -479,3 +479,26 @@ func TestQueries(t *testing.T) {
}
}
}
func TestLCP(t *testing.T) {
var testCases = []struct {
prefixes []string
commonPrefix string
}{
{[]string{"", ""}, ""},
{[]string{"a", "b"}, ""},
{[]string{"a", "a"}, "a"},
{[]string{"a/", "a/"}, "a/"},
{[]string{"abcd/", ""}, ""},
{[]string{"abcd/foo/", "abcd/bar/"}, "abcd/"},
{[]string{"abcd/foo/bar/", "abcd/foo/bar/zoo"}, "abcd/foo/bar/"},
}
for i, test := range testCases {
foundPrefix := lcp(test.prefixes)
if foundPrefix != test.commonPrefix {
t.Fatalf("Test %d: Common prefix found: `%v`, expected: `%v`", i+1, foundPrefix, test.commonPrefix)
}
}
}