deprecate/remove global WORM mode (#9436)

global WORM mode is a complex piece for which
the time has passed, with the advent of S3 compatible
object locking and retention implementation global
WORM is sort of deprecated, this has been mentioned
in our documentation for some time, now the time
has come for this to go.
This commit is contained in:
Harshavardhana
2020-04-24 16:37:05 -07:00
committed by GitHub
parent 45e22cf8aa
commit 60d415bb8a
23 changed files with 36 additions and 732 deletions

View File

@@ -66,12 +66,6 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
if r.ContentLength > maxEConfigJSONSize || r.ContentLength == -1 {
// More than maxConfigSize bytes were available
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigTooLarge), r.URL)
@@ -112,12 +106,6 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
if r.ContentLength > maxEConfigJSONSize || r.ContentLength == -1 {
// More than maxConfigSize bytes were available
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigTooLarge), r.URL)
@@ -358,12 +346,6 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
if r.ContentLength > maxEConfigJSONSize || r.ContentLength == -1 {
// More than maxConfigSize bytes were available
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigTooLarge), r.URL)

View File

@@ -60,12 +60,6 @@ func (a adminAPIHandlers) RemoveUser(w http.ResponseWriter, r *http.Request) {
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
vars := mux.Vars(r)
accessKey := vars["accessKey"]
@@ -290,12 +284,6 @@ func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request)
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
vars := mux.Vars(r)
accessKey := vars["accessKey"]
status := vars["status"]
@@ -329,12 +317,6 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) {
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
vars := mux.Vars(r)
accessKey := vars["accessKey"]
@@ -415,12 +397,6 @@ func (a adminAPIHandlers) AddServiceAccount(w http.ResponseWriter, r *http.Reque
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
newCred, err := globalIAMSys.NewServiceAccount(ctx, cred.AccessKey, createReq.Policy)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
@@ -528,12 +504,6 @@ func (a adminAPIHandlers) DeleteServiceAccount(w http.ResponseWriter, r *http.Re
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
serviceAccount := mux.Vars(r)["accessKey"]
if serviceAccount == "" {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminInvalidArgument), r.URL)
@@ -681,12 +651,6 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ
vars := mux.Vars(r)
policyName := vars["name"]
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
if err := globalIAMSys.DeletePolicy(policyName); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
@@ -713,12 +677,6 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request
vars := mux.Vars(r)
policyName := vars["name"]
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
// Error out if Content-Length is missing.
if r.ContentLength <= 0 {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
@@ -771,12 +729,6 @@ func (a adminAPIHandlers) SetPolicyForUserOrGroup(w http.ResponseWriter, r *http
entityName := vars["userOrGroup"]
isGroup := vars["isGroup"] == "true"
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
if !isGroup {
ok, err := globalIAMSys.IsTempUser(entityName)
if err != nil && err != errNoSuchUser {

View File

@@ -920,7 +920,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
}
}
if _, ok := globalBucketObjectLockConfig.Get(bucket); (ok || globalWORMEnabled) && forceDelete {
if _, ok := globalBucketObjectLockConfig.Get(bucket); ok && forceDelete {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -1021,11 +1021,6 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.PutBucketObjectLockConfigurationAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return

View File

@@ -173,7 +173,14 @@ func handleCommonCmdArgs(ctx *cli.Context) {
}
func handleCommonEnvVars() {
var err error
wormEnabled, err := config.LookupWorm()
if err != nil {
logger.Fatal(config.ErrInvalidWormValue(err), "Invalid worm configuration")
}
if wormEnabled {
logger.Fatal(errors.New("WORM is deprecated"), "global MINIO_WORM support is removed, please downgrade your server or migrate to https://github.com/minio/minio/tree/master/docs/retention")
}
globalBrowserEnabled, err = config.ParseBool(env.Get(config.EnvBrowser, config.EnableOn))
if err != nil {
logger.Fatal(config.ErrInvalidBrowserValue(err), "Invalid MINIO_BROWSER value in environment variable")
@@ -239,12 +246,6 @@ func handleCommonEnvVars() {
os.Unsetenv(config.EnvAccessKeyOld)
os.Unsetenv(config.EnvSecretKeyOld)
}
globalWORMEnabled, err = config.LookupWorm()
if err != nil {
logger.Fatal(config.ErrInvalidWormValue(err), "Invalid worm configuration")
}
}
func logStartupMessage(msg string) {

View File

@@ -51,11 +51,6 @@ func startDailyLifecycle(ctx context.Context, objAPI ObjectLayer) {
}
func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
// No action is expected when WORM is enabled
if globalWORMEnabled {
return nil
}
buckets, err := objAPI.ListBuckets(ctx)
if err != nil {
return err

View File

@@ -681,13 +681,6 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
return oi, toObjectErr(err, bucket, object)
}
// Deny if WORM is enabled
if isWORMEnabled(bucket) {
if _, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}
err = fsRenameFile(ctx, appendFilePath, pathJoin(fs.fsPath, bucket, object))
if err != nil {
logger.LogIf(ctx, err)

View File

@@ -991,12 +991,6 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
// Deny if WORM is enabled
if isWORMEnabled(bucket) {
if _, err := fsStatFile(ctx, fsNSObjPath); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}
if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}

View File

@@ -213,9 +213,6 @@ var (
globalOperationTimeout = newDynamicTimeout(10*time.Minute /*30*/, 600*time.Second) // default timeout for general ops
globalHealingTimeout = newDynamicTimeout(30*time.Minute /*1*/, 30*time.Minute) // timeout for healing related ops
// Is worm enabled
globalWORMEnabled bool
globalBucketObjectLockConfig = objectlock.NewBucketObjectLockConfig()
// Disk cache drives

View File

@@ -1527,14 +1527,6 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
}
// Validate storage class metadata if present
if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
if !storageclass.IsValid(sc) {
@@ -1725,14 +1717,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
// Deny if global WORM is enabled
if globalWORMEnabled {
if _, err := objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
}
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
@@ -2105,14 +2089,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
rawReader := hashReader
pReader := NewPutObjReader(rawReader, nil, nil)
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
}
isEncrypted := false
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() && !isCompressed {
@@ -2430,19 +2406,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
return
}
// Deny if global WORM is enabled
if globalWORMEnabled {
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
}
if _, _, _, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, ErrNone, ErrNone); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
@@ -2627,19 +2590,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
}
}
// Deny if global WORM is enabled
if globalWORMEnabled {
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
}
apiErr := ErrNone
if _, ok := globalBucketObjectLockConfig.Get(bucket); ok {
apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, object, getObjectInfo)
@@ -2864,11 +2814,6 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
return
}
if globalWORMEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrObjectLocked), r.URL, guessIsBrowserReq(r))
return
}
if _, ok := globalBucketObjectLockConfig.Get(bucket); !ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL, guessIsBrowserReq(r))
return

View File

@@ -314,7 +314,7 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj
if objInfo, err := getObjectInfoFn(ctx, bucket, object, opts); err == nil {
objExists = true
r := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
if globalWORMEnabled || ((r.Mode == objectlock.RetCompliance) && r.RetainUntilDate.After(t)) {
if r.Mode == objectlock.RetCompliance && r.RetainUntilDate.After(t) {
return mode, retainDate, legalHold, ErrObjectLocked
}
mode = r.Mode

View File

@@ -652,10 +652,3 @@ func iamPolicyClaimNameOpenID() string {
func iamPolicyClaimNameSA() string {
return "sa-policy"
}
func isWORMEnabled(bucket string) bool {
if isMinioMetaBucketName(bucket) {
return false
}
return globalWORMEnabled
}

View File

@@ -745,17 +745,6 @@ next:
}
apiErr := ErrNone
// Deny if global WORM is enabled
if globalWORMEnabled {
opts, err := getOpts(ctx, r, args.BucketName, objectName)
if err != nil {
apiErr = toAPIErrorCode(ctx, err)
} else {
if _, err := getObjectInfo(ctx, args.BucketName, objectName, opts); err == nil {
apiErr = ErrMethodNotAllowed
}
}
}
if _, ok := globalBucketObjectLockConfig.Get(args.BucketName); ok && (apiErr == ErrNone) {
apiErr = enforceRetentionBypassForDeleteWeb(ctx, r, args.BucketName, objectName, getObjectInfo)
if apiErr != ErrNone && apiErr != ErrNoSuchKey {
@@ -909,11 +898,6 @@ func (web *webAPIHandlers) SetAuth(r *http.Request, args *SetAuthArgs, reply *Se
return toJSONError(ctx, authErr)
}
// When WORM is enabled, disallow changing credenatials for owner and user
if globalWORMEnabled {
return toJSONError(ctx, errChangeCredNotAllowed)
}
if owner {
// Owner is not allowed to change credentials through browser.
return toJSONError(ctx, errChangeCredNotAllowed)

View File

@@ -673,13 +673,6 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
if xl.isObject(bucket, object) {
// Deny if WORM is enabled
if isWORMEnabled(bucket) {
if _, err := xl.getObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}
// Rename if an object already exists to temporary location.
newUniqueID := mustGetUUID()

View File

@@ -628,13 +628,6 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
}
if xl.isObject(bucket, object) {
// Deny if WORM is enabled
if isWORMEnabled(bucket) {
if _, err := xl.getObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}
// Rename if an object already exists to temporary location.
newUniqueID := mustGetUUID()