Add immediate inline tiering support (#13298)

This commit is contained in:
Krishnan Parthasarathi
2021-10-01 11:58:17 -07:00
committed by GitHub
parent cfbaf7bf1c
commit f3aeed77e5
10 changed files with 223 additions and 119 deletions

View File

@@ -220,16 +220,31 @@ var errInvalidStorageClass = errors.New("invalid storage class")
func validateTransitionTier(lc *lifecycle.Lifecycle) error {
for _, rule := range lc.Rules {
if rule.Transition.StorageClass == "" {
continue
if rule.Transition.StorageClass != "" {
if valid := globalTierConfigMgr.IsTierValid(rule.Transition.StorageClass); !valid {
return errInvalidStorageClass
}
}
if valid := globalTierConfigMgr.IsTierValid(rule.Transition.StorageClass); !valid {
return errInvalidStorageClass
if rule.NoncurrentVersionTransition.StorageClass != "" {
if valid := globalTierConfigMgr.IsTierValid(rule.NoncurrentVersionTransition.StorageClass); !valid {
return errInvalidStorageClass
}
}
}
return nil
}
// enqueueTransitionImmediate enqueues obj for transition if eligible.
// This is to be called after a successful upload of an object (version).
func enqueueTransitionImmediate(obj ObjectInfo) {
if lc, err := globalLifecycleSys.Get(obj.Bucket); err == nil {
switch lc.ComputeAction(obj.ToLifecycleOpts()) {
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
globalTransitionState.queueTransitionTask(obj)
}
}
}
// expireAction represents different actions to be performed on expiry of a
// restored/transitioned object
type expireAction int
@@ -702,17 +717,16 @@ func isRestoredObjectOnDisk(meta map[string]string) (onDisk bool) {
// ToLifecycleOpts returns lifecycle.ObjectOpts value for oi.
func (oi ObjectInfo) ToLifecycleOpts() lifecycle.ObjectOpts {
return lifecycle.ObjectOpts{
Name: oi.Name,
UserTags: oi.UserTags,
VersionID: oi.VersionID,
ModTime: oi.ModTime,
IsLatest: oi.IsLatest,
NumVersions: oi.NumVersions,
DeleteMarker: oi.DeleteMarker,
SuccessorModTime: oi.SuccessorModTime,
RestoreOngoing: oi.RestoreOngoing,
RestoreExpires: oi.RestoreExpires,
TransitionStatus: oi.TransitionedObject.Status,
RemoteTiersImmediately: globalDebugRemoteTiersImmediately,
Name: oi.Name,
UserTags: oi.UserTags,
VersionID: oi.VersionID,
ModTime: oi.ModTime,
IsLatest: oi.IsLatest,
NumVersions: oi.NumVersions,
DeleteMarker: oi.DeleteMarker,
SuccessorModTime: oi.SuccessorModTime,
RestoreOngoing: oi.RestoreOngoing,
RestoreExpires: oi.RestoreExpires,
TransitionStatus: oi.TransitionedObject.Status,
}
}

View File

@@ -598,10 +598,6 @@ func handleCommonEnvVars() {
}
GlobalKMS = KMS
}
if tiers := env.Get("_MINIO_DEBUG_REMOTE_TIERS_IMMEDIATELY", ""); tiers != "" {
globalDebugRemoteTiersImmediately = strings.Split(tiers, ",")
}
}
func logStartupMessage(msg string) {

View File

@@ -879,18 +879,17 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
versionID := oi.VersionID
action := i.lifeCycle.ComputeAction(
lifecycle.ObjectOpts{
Name: i.objectPath(),
UserTags: oi.UserTags,
ModTime: oi.ModTime,
VersionID: oi.VersionID,
DeleteMarker: oi.DeleteMarker,
IsLatest: oi.IsLatest,
NumVersions: oi.NumVersions,
SuccessorModTime: oi.SuccessorModTime,
RestoreOngoing: oi.RestoreOngoing,
RestoreExpires: oi.RestoreExpires,
TransitionStatus: oi.TransitionedObject.Status,
RemoteTiersImmediately: globalDebugRemoteTiersImmediately,
Name: i.objectPath(),
UserTags: oi.UserTags,
ModTime: oi.ModTime,
VersionID: oi.VersionID,
DeleteMarker: oi.DeleteMarker,
IsLatest: oi.IsLatest,
NumVersions: oi.NumVersions,
SuccessorModTime: oi.SuccessorModTime,
RestoreOngoing: oi.RestoreOngoing,
RestoreExpires: oi.RestoreExpires,
TransitionStatus: oi.TransitionedObject.Status,
})
if i.debug {
if versionID != "" {

View File

@@ -321,7 +321,6 @@ var (
globalConsoleSrv *restapi.Server
globalDebugRemoteTiersImmediately []string
// Add new variable global values here.
)

View File

@@ -1507,6 +1507,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
if !globalTierConfigMgr.Empty() {
// Schedule object for immediate transition if eligible.
enqueueTransitionImmediate(objInfo)
}
}
// PutObjectHandler - PUT Object
@@ -1851,6 +1856,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
// Remove the transitioned object whose object version is being overwritten.
if !globalTierConfigMgr.Empty() {
// Schedule object for immediate transition if eligible.
enqueueTransitionImmediate(objInfo)
logger.LogIf(ctx, os.Sweep())
}
}
@@ -3292,6 +3299,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
// Remove the transitioned object whose object version is being overwritten.
if !globalTierConfigMgr.Empty() {
// Schedule object for immediate transition if eligible.
enqueueTransitionImmediate(objInfo)
logger.LogIf(ctx, os.Sweep())
}
}