mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
- When modifying notification configuration - When modifying listener configuration - When modifying policy configuration With this change we also stop early checking if the bucket exists, since that uses a Read-lock and causes a deadlock due to the outer Write-lock.
This commit is contained in:
parent
0905398459
commit
3977d6b7bd
@ -164,6 +164,13 @@ func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI
|
|||||||
return errInvalidArgument
|
return errInvalidArgument
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Acquire a write lock on bucket before modifying its
|
||||||
|
// configuration.
|
||||||
|
opsID := getOpsID()
|
||||||
|
nsMutex.Lock(bucket, "", opsID)
|
||||||
|
// Release lock after notifying peers
|
||||||
|
defer nsMutex.Unlock(bucket, "", opsID)
|
||||||
|
|
||||||
// persist config to disk
|
// persist config to disk
|
||||||
err := persistNotificationConfig(bucket, ncfg, objAPI)
|
err := persistNotificationConfig(bucket, ncfg, objAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -365,6 +372,13 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL
|
|||||||
// add new lid to listeners and persist to object layer.
|
// add new lid to listeners and persist to object layer.
|
||||||
listenerCfgs = append(listenerCfgs, *lcfg)
|
listenerCfgs = append(listenerCfgs, *lcfg)
|
||||||
|
|
||||||
|
// Acquire a write lock on bucket before modifying its
|
||||||
|
// configuration.
|
||||||
|
opsID := getOpsID()
|
||||||
|
nsMutex.Lock(bucket, "", opsID)
|
||||||
|
// Release lock after notifying peers
|
||||||
|
defer nsMutex.Unlock(bucket, "", opsID)
|
||||||
|
|
||||||
// update persistent config
|
// update persistent config
|
||||||
err := persistListenerConfig(bucket, listenerCfgs, objAPI)
|
err := persistListenerConfig(bucket, listenerCfgs, objAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -398,6 +412,13 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Acquire a write lock on bucket before modifying its
|
||||||
|
// configuration.
|
||||||
|
opsID := getOpsID()
|
||||||
|
nsMutex.Lock(bucket, "", opsID)
|
||||||
|
// Release lock after notifying peers
|
||||||
|
defer nsMutex.Unlock(bucket, "", opsID)
|
||||||
|
|
||||||
// update persistent config
|
// update persistent config
|
||||||
err := persistListenerConfig(bucket, updatedLcfgs, objAPI)
|
err := persistListenerConfig(bucket, updatedLcfgs, objAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -200,12 +200,20 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
|
|||||||
// persists it to storage, and notify nodes in the cluster about the
|
// persists it to storage, and notify nodes in the cluster about the
|
||||||
// change. In-memory state is updated in response to the notification.
|
// change. In-memory state is updated in response to the notification.
|
||||||
func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error {
|
func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error {
|
||||||
// FIXME: Race exists between the bucket existence check and
|
// Verify if bucket actually exists. FIXME: Ideally this check
|
||||||
// then updating the bucket policy.
|
// should not be used but is kept here to error out for
|
||||||
|
// invalid and non-existent buckets.
|
||||||
if err := isBucketExist(bucket, objAPI); err != nil {
|
if err := isBucketExist(bucket, objAPI); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Acquire a write lock on bucket before modifying its
|
||||||
|
// configuration.
|
||||||
|
opsID := getOpsID()
|
||||||
|
nsMutex.Lock(bucket, "", opsID)
|
||||||
|
// Release lock after notifying peers
|
||||||
|
defer nsMutex.Unlock(bucket, "", opsID)
|
||||||
|
|
||||||
if pCh.IsRemove {
|
if pCh.IsRemove {
|
||||||
if err := removeBucketPolicy(bucket, objAPI); err != nil {
|
if err := removeBucketPolicy(bucket, objAPI); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -193,11 +193,6 @@ func readBucketPolicy(bucket string, objAPI ObjectLayer) (*bucketPolicy, error)
|
|||||||
// removeBucketPolicy - removes any previously written bucket policy. Returns BucketPolicyNotFound
|
// removeBucketPolicy - removes any previously written bucket policy. Returns BucketPolicyNotFound
|
||||||
// if no policies are found.
|
// if no policies are found.
|
||||||
func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
|
func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
|
||||||
// Verify if bucket actually exists
|
|
||||||
if err := isBucketExist(bucket, objAPI); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
||||||
if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil {
|
if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil {
|
||||||
errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket)
|
errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket)
|
||||||
@ -213,11 +208,6 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
|
|||||||
// writeBucketPolicy - save a bucket policy that is assumed to be
|
// writeBucketPolicy - save a bucket policy that is assumed to be
|
||||||
// validated.
|
// validated.
|
||||||
func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) error {
|
func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) error {
|
||||||
// Verify if bucket actually exists
|
|
||||||
if err := isBucketExist(bucket, objAPI); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
buf, err := json.Marshal(bpy)
|
buf, err := json.Marshal(bpy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err, "Unable to marshal bucket policy '%v' to JSON", *bpy)
|
errorIf(err, "Unable to marshal bucket policy '%v' to JSON", *bpy)
|
||||||
|
@ -399,12 +399,6 @@ func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj Obje
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify bucket exists
|
|
||||||
// FIXME: There is a race between this check and PutObject
|
|
||||||
if err = isBucketExist(bucket, obj); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// build path
|
// build path
|
||||||
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||||
// write object to path
|
// write object to path
|
||||||
@ -425,12 +419,6 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify bucket exists
|
|
||||||
// FIXME: There is a race between this check and PutObject
|
|
||||||
if err = isBucketExist(bucket, obj); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// build path
|
// build path
|
||||||
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
||||||
// write object to path
|
// write object to path
|
||||||
|
@ -277,15 +277,6 @@ func TestInitEventNotifier(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// write without an existing bucket and check
|
|
||||||
if err := persistNotificationConfig(bucketName, ¬ificationConfig{}, obj); err == nil {
|
|
||||||
t.Fatalf("Did not get an error though bucket does not exist!")
|
|
||||||
}
|
|
||||||
// no bucket write check for listener
|
|
||||||
if err := persistListenerConfig(bucketName, []listenerConfig{}, obj); err == nil {
|
|
||||||
t.Fatalf("Did not get an error though bucket does not exist!")
|
|
||||||
}
|
|
||||||
|
|
||||||
// create bucket
|
// create bucket
|
||||||
if err := obj.MakeBucket(bucketName); err != nil {
|
if err := obj.MakeBucket(bucketName); err != nil {
|
||||||
t.Fatal("Unexpected error:", err)
|
t.Fatal("Unexpected error:", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user