snowball-repl: Add support of immediate tiering (#18508)

Also, fix a possible crash when some fields are not added to the batch
snowball yaml
This commit is contained in:
Anis Eleuch 2023-11-22 16:33:11 -08:00 committed by GitHub
parent fba883839d
commit fbc6f3f6e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 10 deletions

View File

@ -1521,16 +1521,6 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request)
return
}
// Validate the incoming job request
if err := job.Validate(ctx, objectAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints))
job.User = user
job.Started = time.Now()
// Fill with default values
if job.Replicate != nil {
if job.Replicate.Source.Snowball.Disable == nil {
@ -1553,6 +1543,16 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request)
}
}
// Validate the incoming job request
if err := job.Validate(ctx, objectAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints))
job.User = user
job.Started = time.Now()
if err := job.save(ctx, objectAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return

View File

@ -2505,12 +2505,22 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
os := newObjSweeper(bucket, object).WithVersioning(opts.Versioned, opts.VersionSuspended)
if !globalTierConfigMgr.Empty() {
// Get appropriate object info to identify the remote object to delete
goiOpts := os.GetOpts()
if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
}
// Create the object..
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
if err != nil {
return err
}
origETag := objInfo.ETag
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
@ -2545,6 +2555,14 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
}
sendEvent(evt)
// Remove the transitioned object whose object version is being overwritten.
if !globalTierConfigMgr.Empty() {
objInfo.ETag = origETag
// Schedule object for immediate transition if eligible.
enqueueTransitionImmediate(objInfo, lcEventSrc_s3PutObject)
os.Sweep()
}
return nil
}