diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 86b88f2aa..a812a3ab2 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -709,7 +709,7 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object if versionID == "" { err = er.deleteObject(ctx, bucket, object, writeQuorum) } else { - err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: versionID}) + err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: versionID}, false) } // If Delete was successful, make sure to return the appropriate error diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 20b30b452..27091f2bb 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -414,7 +414,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{ Name: object, VersionID: opts.VersionID, - }) + }, false) } } } @@ -800,7 +800,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st return fi.ToObjectInfo(bucket, object), nil } -func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo) error { +func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error { defer ObjectPathUpdated(pathJoin(bucket, object)) disks := er.getDisks() g := errgroup.WithNErrs(len(disks)) @@ -810,7 +810,7 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object if disks[index] == nil { return errDiskNotFound } - return disks[index].DeleteVersion(ctx, bucket, object, fi) + return disks[index].DeleteVersion(ctx, bucket, object, fi, forceDelMarker) }, index) } // return errors if any during deletion @@ -1098,7 +1098,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string // version as delete marker // Add delete marker, since we don't have any version specified explicitly. // Or if a particular version id needs to be replicated. - if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil { + if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } return fi.ToObjectInfo(bucket, object), nil @@ -1115,7 +1115,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string DeleteMarkerReplicationStatus: opts.DeleteMarkerReplicationStatus, VersionPurgeStatus: opts.VersionPurgeStatus, TransitionStatus: opts.TransitionStatus, - }); err != nil { + }, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index df3a1e2f3..dc412968a 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -245,11 +245,11 @@ func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi return d.disk.WriteMetadata(ctx, volume, path, fi) } -func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) (err error) { +func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { if err := d.calcError(); err != nil { return err } - return d.disk.DeleteVersion(ctx, volume, path, fi) + return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker) } func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 8df8308e3..0531439c7 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -55,7 +55,7 @@ type StorageAPI interface { WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) // Metadata operations - DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error + DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 2417507de..37bb279ce 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -332,10 +332,11 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path return err } -func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error { +func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) + values.Set(storageRESTForceDelMarker, strconv.FormatBool(forceDelMarker)) var buffer bytes.Buffer if err := msgp.Encode(&buffer, &fi); err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index d6f8aaf1e..144dc47c1 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v26" // Add NumVersions/SuccessorModTime fields in FileInfo + storageRESTVersion = "v27" // Add force-delete-marker to DeleteVersion storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -58,6 +58,7 @@ const ( storageRESTVolumes = "volumes" storageRESTDirPath = "dir-path" storageRESTFilePath = "file-path" + storageRESTForceDelMarker = "force-delete-marker" storageRESTVersionID = "version-id" storageRESTReadData = "read-data" storageRESTTotalVersions = "total-versions" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index d56714600..14e93878e 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -300,6 +300,11 @@ func (s *storageRESTServer) DeleteVersionHandler(w http.ResponseWriter, r *http. vars := mux.Vars(r) volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] + forceDelMarker, err := strconv.ParseBool(vars[storageRESTForceDelMarker]) + if err != nil { + s.writeErrorResponse(w, errInvalidArgument) + return + } if r.ContentLength < 0 { s.writeErrorResponse(w, errInvalidArgument) @@ -312,7 +317,7 @@ func (s *storageRESTServer) DeleteVersionHandler(w http.ResponseWriter, r *http. return } - err := s.storage.DeleteVersion(r.Context(), volume, filePath, fi) + err = s.storage.DeleteVersion(r.Context(), volume, filePath, fi, forceDelMarker) if err != nil { s.writeErrorResponse(w, err) } @@ -1038,7 +1043,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWriteMetadata).HandlerFunc(httpTraceHdrs(server.WriteMetadataHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTForceDelMarker)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTReadData)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 646e2596d..56625176e 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -256,12 +256,12 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path return p.storage.WriteAll(ctx, volume, path, b) } -func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) (err error) { +func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { if err = p.checkDiskStale(); err != nil { return err } - return p.storage.DeleteVersion(ctx, volume, path, fi) + return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) } func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 65de6229f..299d43cec 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -878,7 +878,7 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error { errs := make([]error, len(versions)) for i, version := range versions { - if err := s.DeleteVersion(ctx, volume, version.Name, version); err != nil { + if err := s.DeleteVersion(ctx, volume, version.Name, version, false); err != nil { errs[i] = err } } @@ -886,9 +886,9 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions return errs } -// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. Create a fresh -// `xl.meta` if it does not exist and we creating a new delete-marker. -func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error { +// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. forceDelMarker +// will force creating a new `xl.meta` to create a new delete marker +func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error { if HasSuffix(path, SlashSeparator) { return s.Delete(ctx, volume, path, false) } @@ -898,7 +898,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F if err != errFileNotFound { return err } - if fi.Deleted { + if fi.Deleted && forceDelMarker { // Create a new xl.meta with a delete marker in it return s.WriteMetadata(ctx, volume, path, fi) }