xl: Make it clear when to create delete marker for a non existant object (#11423)

This commit is contained in:
Anis Elleuch 2021-02-03 19:33:43 +01:00 committed by GitHub
parent a71e0483c9
commit b3f81e75f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 27 additions and 20 deletions

View File

@ -709,7 +709,7 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
if versionID == "" {
err = er.deleteObject(ctx, bucket, object, writeQuorum)
} else {
err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: versionID})
err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: versionID}, false)
}
// If Delete was successful, make sure to return the appropriate error

View File

@ -414,7 +414,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
Name: object,
VersionID: opts.VersionID,
})
}, false)
}
}
}
@ -800,7 +800,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
return fi.ToObjectInfo(bucket, object), nil
}
func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo) error {
func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
defer ObjectPathUpdated(pathJoin(bucket, object))
disks := er.getDisks()
g := errgroup.WithNErrs(len(disks))
@ -810,7 +810,7 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object
if disks[index] == nil {
return errDiskNotFound
}
return disks[index].DeleteVersion(ctx, bucket, object, fi)
return disks[index].DeleteVersion(ctx, bucket, object, fi, forceDelMarker)
}, index)
}
// return errors if any during deletion
@ -1098,7 +1098,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
// version as delete marker
// Add delete marker, since we don't have any version specified explicitly.
// Or if a particular version id needs to be replicated.
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil {
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
@ -1115,7 +1115,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
DeleteMarkerReplicationStatus: opts.DeleteMarkerReplicationStatus,
VersionPurgeStatus: opts.VersionPurgeStatus,
TransitionStatus: opts.TransitionStatus,
}); err != nil {
}, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}

View File

@ -245,11 +245,11 @@ func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi
return d.disk.WriteMetadata(ctx, volume, path, fi)
}
func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) (err error) {
func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.DeleteVersion(ctx, volume, path, fi)
return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker)
}
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {

View File

@ -55,7 +55,7 @@ type StorageAPI interface {
WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error)
// Metadata operations
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error)

View File

@ -332,10 +332,11 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
return err
}
func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error {
func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
values.Set(storageRESTForceDelMarker, strconv.FormatBool(forceDelMarker))
var buffer bytes.Buffer
if err := msgp.Encode(&buffer, &fi); err != nil {

View File

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v26" // Add NumVersions/SuccessorModTime fields in FileInfo
storageRESTVersion = "v27" // Add force-delete-marker to DeleteVersion
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)
@ -58,6 +58,7 @@ const (
storageRESTVolumes = "volumes"
storageRESTDirPath = "dir-path"
storageRESTFilePath = "file-path"
storageRESTForceDelMarker = "force-delete-marker"
storageRESTVersionID = "version-id"
storageRESTReadData = "read-data"
storageRESTTotalVersions = "total-versions"

View File

@ -300,6 +300,11 @@ func (s *storageRESTServer) DeleteVersionHandler(w http.ResponseWriter, r *http.
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
forceDelMarker, err := strconv.ParseBool(vars[storageRESTForceDelMarker])
if err != nil {
s.writeErrorResponse(w, errInvalidArgument)
return
}
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
@ -312,7 +317,7 @@ func (s *storageRESTServer) DeleteVersionHandler(w http.ResponseWriter, r *http.
return
}
err := s.storage.DeleteVersion(r.Context(), volume, filePath, fi)
err = s.storage.DeleteVersion(r.Context(), volume, filePath, fi, forceDelMarker)
if err != nil {
s.writeErrorResponse(w, err)
}
@ -1038,7 +1043,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWriteMetadata).HandlerFunc(httpTraceHdrs(server.WriteMetadataHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTForceDelMarker)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTReadData)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).

View File

@ -256,12 +256,12 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path
return p.storage.WriteAll(ctx, volume, path, b)
}
func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) (err error) {
func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.DeleteVersion(ctx, volume, path, fi)
return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker)
}
func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {

View File

@ -878,7 +878,7 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error {
errs := make([]error, len(versions))
for i, version := range versions {
if err := s.DeleteVersion(ctx, volume, version.Name, version); err != nil {
if err := s.DeleteVersion(ctx, volume, version.Name, version, false); err != nil {
errs[i] = err
}
}
@ -886,9 +886,9 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
return errs
}
// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. Create a fresh
// `xl.meta` if it does not exist and we creating a new delete-marker.
func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error {
// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. forceDelMarker
// will force creating a new `xl.meta` to create a new delete marker
func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error {
if HasSuffix(path, SlashSeparator) {
return s.Delete(ctx, volume, path, false)
}
@ -898,7 +898,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
if err != errFileNotFound {
return err
}
if fi.Deleted {
if fi.Deleted && forceDelMarker {
// Create a new xl.meta with a delete marker in it
return s.WriteMetadata(ctx, volume, path, fi)
}