fix: cleanup old directory handling code (#10633)

we don't need them anymore, remove legacy code.
This commit is contained in:
Harshavardhana 2020-10-06 12:03:57 -07:00 committed by GitHub
parent 57f0176759
commit 18063bf25c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 148 deletions

View File

@ -820,22 +820,22 @@ var errorCodes = errorCodeMap{
HTTPStatusCode: http.StatusBadRequest, HTTPStatusCode: http.StatusBadRequest,
}, },
ErrReplicationTargetNotFoundError: { ErrReplicationTargetNotFoundError: {
Code: "XminioAdminReplicationTargetNotFoundError", Code: "XMinioAdminReplicationTargetNotFoundError",
Description: "The replication target does not exist", Description: "The replication target does not exist",
HTTPStatusCode: http.StatusNotFound, HTTPStatusCode: http.StatusNotFound,
}, },
ErrReplicationRemoteConnectionError: { ErrReplicationRemoteConnectionError: {
Code: "XminioAdminReplicationRemoteConnectionError", Code: "XMinioAdminReplicationRemoteConnectionError",
Description: "Remote service endpoint or target bucket not available", Description: "Remote service endpoint or target bucket not available",
HTTPStatusCode: http.StatusNotFound, HTTPStatusCode: http.StatusNotFound,
}, },
ErrBucketRemoteIdenticalToSource: { ErrBucketRemoteIdenticalToSource: {
Code: "XminioAdminRemoteIdenticalToSource", Code: "XMinioAdminRemoteIdenticalToSource",
Description: "The remote target cannot be identical to source", Description: "The remote target cannot be identical to source",
HTTPStatusCode: http.StatusBadRequest, HTTPStatusCode: http.StatusBadRequest,
}, },
ErrBucketRemoteAlreadyExists: { ErrBucketRemoteAlreadyExists: {
Code: "XminioAdminBucketRemoteAlreadyExists", Code: "XMinioAdminBucketRemoteAlreadyExists",
Description: "The remote target already exists", Description: "The remote target already exists",
HTTPStatusCode: http.StatusBadRequest, HTTPStatusCode: http.StatusBadRequest,
}, },

View File

@ -17,7 +17,6 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -123,10 +122,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
// GetObjectNInfo - returns object info and an object // GetObjectNInfo - returns object info and an object
// Read(Closer). When err != nil, the returned reader is always nil. // Read(Closer). When err != nil, the returned reader is always nil.
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return nil, err
}
var unlockOnDefer bool var unlockOnDefer bool
var nsUnlocker = func() {} var nsUnlocker = func() {}
defer func() { defer func() {
@ -153,17 +148,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
unlockOnDefer = true unlockOnDefer = true
} }
// Handler directory request by returning a reader that
// returns no bytes.
if HasSuffix(object, SlashSeparator) {
var objInfo ObjectInfo
if objInfo, err = er.getObjectInfoDir(ctx, bucket, object); err != nil {
return nil, toObjectErr(err, bucket, object)
}
unlockOnDefer = false
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts, nsUnlocker)
}
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts)
if err != nil { if err != nil {
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
@ -208,10 +192,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
// startOffset indicates the starting read location of the object. // startOffset indicates the starting read location of the object.
// length indicates the total length of the object. // length indicates the total length of the object.
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
if err := checkGetObjArgs(ctx, bucket, object); err != nil {
return err
}
// Lock the object before reading. // Lock the object before reading.
lk := er.NewNSLock(ctx, bucket, object) lk := er.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalOperationTimeout); err != nil { if err := lk.GetRLock(globalOperationTimeout); err != nil {
@ -366,43 +346,8 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s
return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, etag, opts, fi, metaArr, onlineDisks) return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, etag, opts, fi, metaArr, onlineDisks)
} }
// getObjectInfoDir - This getObjectInfo is specific to object directory lookup.
func (er erasureObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (ObjectInfo, error) {
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
// Prepare object creation in a all disks
for index, disk := range storageDisks {
if disk == nil {
continue
}
index := index
g.Go(func() error {
// Check if 'prefix' is an object on this 'disk'.
entries, err := storageDisks[index].ListDir(ctx, bucket, object, 1)
if err != nil {
return err
}
if len(entries) > 0 {
// Not a directory if not empty.
return errFileNotFound
}
return nil
}, index)
}
readQuorum := getReadQuorum(len(storageDisks))
err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum)
return dirObjectInfo(bucket, object, 0, map[string]string{}), err
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo. // GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return info, err
}
// Lock the object before reading. // Lock the object before reading.
lk := er.NewNSLock(ctx, bucket, object) lk := er.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalOperationTimeout); err != nil { if err := lk.GetRLock(globalOperationTimeout); err != nil {
@ -410,14 +355,6 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
} }
defer lk.RUnlock() defer lk.RUnlock()
if HasSuffix(object, SlashSeparator) {
info, err = er.getObjectInfoDir(ctx, bucket, object)
if err != nil {
return info, toObjectErr(err, bucket, object)
}
return info, nil
}
return er.getObjectInfo(ctx, bucket, object, opts) return er.getObjectInfo(ctx, bucket, object, opts)
} }
@ -584,11 +521,6 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
// writes `xl.meta` which carries the necessary metadata for future // writes `xl.meta` which carries the necessary metadata for future
// object operations. // object operations.
func (er erasureObjects) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (er erasureObjects) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
// Validate put object input args.
if err = checkPutObjectArgs(ctx, bucket, object, er, data.Size()); err != nil {
return ObjectInfo{}, err
}
return er.putObject(ctx, bucket, object, data, opts) return er.putObject(ctx, bucket, object, data, opts)
} }
@ -788,11 +720,7 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object
if disks[index] == nil { if disks[index] == nil {
return errDiskNotFound return errDiskNotFound
} }
err := disks[index].DeleteVersion(ctx, bucket, object, fi) return disks[index].DeleteVersion(ctx, bucket, object, fi)
if err != nil && err != errVolumeNotFound {
return err
}
return nil
}, index) }, index)
} }
@ -853,16 +781,9 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
dobjects := make([]DeletedObject, len(objects)) dobjects := make([]DeletedObject, len(objects))
writeQuorums := make([]int, len(objects)) writeQuorums := make([]int, len(objects))
for i, object := range objects {
errs[i] = checkDelObjArgs(ctx, bucket, object.ObjectName)
}
storageDisks := er.getDisks() storageDisks := er.getDisks()
for i := range objects { for i := range objects {
if errs[i] != nil {
continue
}
// Assume (N/2 + 1) quorums for all objects // Assume (N/2 + 1) quorums for all objects
// this is a theoretical assumption such that // this is a theoretical assumption such that
// for delete's we do not need to honor storage // for delete's we do not need to honor storage
@ -876,7 +797,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
for i := range objects { for i := range objects {
if objects[i].VersionID == "" { if objects[i].VersionID == "" {
if opts.Versioned || opts.VersionSuspended { if opts.Versioned || opts.VersionSuspended {
if !HasSuffix(objects[i].ObjectName, SlashSeparator) {
fi := FileInfo{ fi := FileInfo{
Name: objects[i].ObjectName, Name: objects[i].ObjectName,
ModTime: UTCNow(), ModTime: UTCNow(),
@ -892,7 +812,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
continue continue
} }
} }
}
versions[i] = FileInfo{ versions[i] = FileInfo{
Name: objects[i].ObjectName, Name: objects[i].ObjectName,
VersionID: objects[i].VersionID, VersionID: objects[i].VersionID,
@ -978,10 +897,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
// any error as it is not necessary for the handler to reply back a // any error as it is not necessary for the handler to reply back a
// response to the client request. // response to the client request.
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
return objInfo, err
}
// Acquire a write lock before deleting the object. // Acquire a write lock before deleting the object.
lk := er.NewNSLock(ctx, bucket, object) lk := er.NewNSLock(ctx, bucket, object)
if err = lk.GetLock(globalOperationTimeout); err != nil { if err = lk.GetLock(globalOperationTimeout); err != nil {
@ -994,7 +909,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
if opts.VersionID == "" { if opts.VersionID == "" {
if opts.Versioned || opts.VersionSuspended { if opts.Versioned || opts.VersionSuspended {
if !HasSuffix(object, SlashSeparator) {
fi := FileInfo{ fi := FileInfo{
Name: object, Name: object,
Deleted: true, Deleted: true,
@ -1015,7 +929,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
return fi.ToObjectInfo(bucket, object), nil return fi.ToObjectInfo(bucket, object), nil
} }
} }
}
// Delete the object version on all disks. // Delete the object version on all disks.
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{ if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{

View File

@ -460,6 +460,10 @@ func (z *erasureZones) MakeBucketWithLocation(ctx context.Context, bucket string
} }
func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return nil, err
}
object = encodeDirObject(object) object = encodeDirObject(object)
for _, zone := range z.zones { for _, zone := range z.zones {
@ -479,6 +483,10 @@ func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string
} }
func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
if err := checkGetObjArgs(ctx, bucket, object); err != nil {
return err
}
object = encodeDirObject(object) object = encodeDirObject(object)
for _, zone := range z.zones { for _, zone := range z.zones {
@ -497,6 +505,10 @@ func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, sta
} }
func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return objInfo, err
}
object = encodeDirObject(object) object = encodeDirObject(object)
for _, zone := range z.zones { for _, zone := range z.zones {
objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts) objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts)
@ -517,6 +529,11 @@ func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string,
// PutObject - writes an object to least used erasure zone. // PutObject - writes an object to least used erasure zone.
func (z *erasureZones) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) { func (z *erasureZones) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) {
// Validate put object input args.
if err := checkPutObjectArgs(ctx, bucket, object, z); err != nil {
return ObjectInfo{}, err
}
object = encodeDirObject(object) object = encodeDirObject(object)
if z.SingleZone() { if z.SingleZone() {
@ -533,6 +550,10 @@ func (z *erasureZones) PutObject(ctx context.Context, bucket string, object stri
} }
func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
return objInfo, err
}
object = encodeDirObject(object) object = encodeDirObject(object)
if z.SingleZone() { if z.SingleZone() {

View File

@ -653,7 +653,7 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
return fsMeta.ToObjectInfo(srcBucket, srcObject, fi), nil return fsMeta.ToObjectInfo(srcBucket, srcObject, fi), nil
} }
if err := checkPutObjectArgs(ctx, dstBucket, dstObject, fs, srcInfo.PutObjReader.Size()); err != nil { if err := checkPutObjectArgs(ctx, dstBucket, dstObject, fs); err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
} }
@ -1097,7 +1097,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
return objInfo, NotImplemented{} return objInfo, NotImplemented{}
} }
if err := checkPutObjectArgs(ctx, bucket, object, fs, r.Size()); err != nil { if err := checkPutObjectArgs(ctx, bucket, object, fs); err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
} }

View File

@ -56,29 +56,6 @@ func isObjectDir(object string, size int64) bool {
return HasSuffix(object, SlashSeparator) && size == 0 return HasSuffix(object, SlashSeparator) && size == 0
} }
// Converts just bucket, object metadata into ObjectInfo datatype.
func dirObjectInfo(bucket, object string, size int64, metadata map[string]string) ObjectInfo {
// This is a special case with size as '0' and object ends with
// a slash separator, we treat it like a valid operation and
// return success.
etag := metadata["etag"]
delete(metadata, "etag")
if etag == "" {
etag = emptyETag
}
return ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: UTCNow(),
ContentType: "application/octet-stream",
IsDir: true,
Size: size,
ETag: etag,
UserDefined: metadata,
}
}
// Depending on the disk type network or local, initialize storage API. // Depending on the disk type network or local, initialize storage API.
func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) { func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
if endpoint.IsLocal { if endpoint.IsLocal {

View File

@ -173,7 +173,7 @@ func checkObjectArgs(ctx context.Context, bucket, object string, obj ObjectLayer
} }
// Checks for PutObject arguments validity, also validates if bucket exists. // Checks for PutObject arguments validity, also validates if bucket exists.
func checkPutObjectArgs(ctx context.Context, bucket, object string, obj getBucketInfoI, size int64) error { func checkPutObjectArgs(ctx context.Context, bucket, object string, obj getBucketInfoI) error {
// Verify if bucket exists before validating object name. // Verify if bucket exists before validating object name.
// This is done on purpose since the order of errors is // This is done on purpose since the order of errors is
// important here bucket does not exist error should // important here bucket does not exist error should
@ -187,7 +187,6 @@ func checkPutObjectArgs(ctx context.Context, bucket, object string, obj getBucke
return err return err
} }
if len(object) == 0 || if len(object) == 0 ||
(HasSuffix(object, SlashSeparator) && size != 0) ||
!IsValidObjectPrefix(object) { !IsValidObjectPrefix(object) {
return ObjectNameInvalid{ return ObjectNameInvalid{
Bucket: bucket, Bucket: bucket,