From e55926e8cf9032645ac7096ceaff1cedd38f96b2 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Sun, 31 Jul 2016 14:11:14 -0700 Subject: [PATCH] distribute: Make server work with multiple remote disks This change initializes rpc servers associated with disks that are local. It makes object layer initialization on demand, namely on the first request to the object layer. Also adds lock RPC service vendorized minio/dsync --- cmd/api-router.go | 2 +- cmd/bucket-handlers-listobjects.go | 14 +- cmd/bucket-handlers.go | 57 +++- cmd/bucket-notification-handlers.go | 24 +- cmd/bucket-policy-handlers_test.go | 4 +- cmd/bucket-policy.go | 1 - cmd/object-common.go | 52 +++- cmd/object-handlers.go | 90 +++++-- cmd/routers.go | 93 ++++--- cmd/rpc-client.go | 44 ++- cmd/rpc-server.go | 73 ++++- cmd/server-main.go | 7 +- cmd/storage-errors.go | 3 + cmd/test-utils_test.go | 2 +- cmd/web-handlers.go | 49 +++- cmd/web-router.go | 2 +- cmd/xl-v1.go | 4 + lock-rpc-server.go | 107 ++++++++ vendor/github.com/minio/dsync/LICENSE | 202 ++++++++++++++ vendor/github.com/minio/dsync/README.md | 81 ++++++ vendor/github.com/minio/dsync/dmutex.go | 311 ++++++++++++++++++++++ vendor/github.com/minio/dsync/drwmutex.go | 143 ++++++++++ vendor/github.com/minio/dsync/dsync.go | 56 ++++ vendor/vendor.json | 6 + 24 files changed, 1312 insertions(+), 115 deletions(-) create mode 100644 lock-rpc-server.go create mode 100644 vendor/github.com/minio/dsync/LICENSE create mode 100644 vendor/github.com/minio/dsync/README.md create mode 100644 vendor/github.com/minio/dsync/dmutex.go create mode 100644 vendor/github.com/minio/dsync/drwmutex.go create mode 100644 vendor/github.com/minio/dsync/dsync.go diff --git a/cmd/api-router.go b/cmd/api-router.go index 40e282e76..fe2442d14 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -20,7 +20,7 @@ import router "github.com/gorilla/mux" // objectAPIHandler implements and provides http handlers for S3 API. type objectAPIHandlers struct { - ObjectAPI ObjectLayer + ObjectAPI func() ObjectLayer } // registerAPIRouter - registers S3 compatible APIs. diff --git a/cmd/bucket-handlers-listobjects.go b/cmd/bucket-handlers-listobjects.go index dd406e711..f0b80f3ac 100644 --- a/cmd/bucket-handlers-listobjects.go +++ b/cmd/bucket-handlers-listobjects.go @@ -97,10 +97,15 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http writeErrorResponse(w, r, s3Error, r.URL.Path) return } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } // Inititate a list objects operation based on the input params. // On success would return back ListObjectsInfo object to be // marshalled into S3 compatible XML header. - listObjectsInfo, err := api.ObjectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys) + listObjectsInfo, err := objectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys) if err != nil { errorIf(err, "Unable to list objects.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -151,10 +156,15 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http return } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } // Inititate a list objects operation based on the input params. // On success would return back ListObjectsInfo object to be // marshalled into S3 compatible XML header. - listObjectsInfo, err := api.ObjectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys) + listObjectsInfo, err := objectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys) if err != nil { errorIf(err, "Unable to list objects.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 83a21cfcb..0125e5e08 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -82,7 +82,12 @@ func (api objectAPIHandlers) GetBucketLocationHandler(w http.ResponseWriter, r * } } - if _, err := api.ObjectAPI.GetBucketInfo(bucket); err != nil { + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + if _, err := objectAPI.GetBucketInfo(bucket); err != nil { errorIf(err, "Unable to fetch bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return @@ -144,7 +149,12 @@ func (api objectAPIHandlers) ListMultipartUploadsHandler(w http.ResponseWriter, } } - listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + listMultipartsInfo, err := objectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) if err != nil { errorIf(err, "Unable to list multipart uploads.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -170,7 +180,12 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R return } - bucketsInfo, err := api.ObjectAPI.ListBuckets() + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + bucketsInfo, err := objectAPI.ListBuckets() if err != nil { errorIf(err, "Unable to list buckets.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -241,6 +256,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, return } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + var wg = &sync.WaitGroup{} // Allocate a new wait group. var dErrs = make([]error, len(deleteObjects.Objects)) @@ -327,8 +348,13 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req return } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } // Proceed to creating a bucket. - err := api.ObjectAPI.MakeBucket(bucket) + err := objectAPI.MakeBucket(bucket) if err != nil { errorIf(err, "Unable to create a bucket.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -384,7 +410,12 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h metadata := make(map[string]string) // Nothing to store right now. - md5Sum, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, metadata) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + md5Sum, err := objectAPI.PutObject(bucket, object, -1, fileBody, metadata) if err != nil { errorIf(err, "Unable to create object.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -405,7 +436,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h if globalEventNotifier.IsBucketNotificationSet(bucket) { // Fetch object info for notifications. - objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) return @@ -451,7 +482,12 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re } } - if _, err := api.ObjectAPI.GetBucketInfo(bucket); err != nil { + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + if _, err := objectAPI.GetBucketInfo(bucket); err != nil { errorIf(err, "Unable to fetch bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return @@ -470,8 +506,13 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. vars := mux.Vars(r) bucket := vars["bucket"] + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } // Attempt to delete bucket. - if err := api.ObjectAPI.DeleteBucket(bucket); err != nil { + if err := objectAPI.DeleteBucket(bucket); err != nil { errorIf(err, "Unable to delete a bucket.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 73b276491..7d4c53da7 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -47,7 +47,12 @@ func (api objectAPIHandlers) GetBucketNotificationHandler(w http.ResponseWriter, vars := mux.Vars(r) bucket := vars["bucket"] // Attempt to successfully load notification config. - nConfig, err := loadNotificationConfig(bucket, api.ObjectAPI) + objAPI := api.ObjectAPI() + if objAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + nConfig, err := loadNotificationConfig(bucket, objAPI) if err != nil && err != errNoSuchNotifications { errorIf(err, "Unable to read notification configuration.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -86,7 +91,12 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, vars := mux.Vars(r) bucket := vars["bucket"] - _, err := api.ObjectAPI.GetBucketInfo(bucket) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + _, err := objectAPI.GetBucketInfo(bucket) if err != nil { errorIf(err, "Unable to find bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -133,7 +143,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, // Proceed to save notification configuration. notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - _, err = api.ObjectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), nil) + _, err = objectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), nil) if err != nil { errorIf(err, "Unable to write bucket notification configuration.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -220,7 +230,13 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit } // Validate if bucket exists. - _, err := api.ObjectAPI.GetBucketInfo(bucket) + objAPI := api.ObjectAPI() + if objAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + + _, err := objAPI.GetBucketInfo(bucket) if err != nil { errorIf(err, "Unable to bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) diff --git a/cmd/bucket-policy-handlers_test.go b/cmd/bucket-policy-handlers_test.go index 0bca9ed4e..829a586ae 100644 --- a/cmd/bucket-policy-handlers_test.go +++ b/cmd/bucket-policy-handlers_test.go @@ -294,13 +294,13 @@ func testPutBucketPolicyHandler(obj ObjectLayer, instanceType string, t TestErrH req, err := newTestSignedRequest("PUT", getPutPolicyURL("", testCase.bucketName), int64(len(bucketPolicyStr)), bytes.NewReader([]byte(bucketPolicyStr)), testCase.accessKey, testCase.secretKey) if err != nil { - t.Fatalf("Test %d: Failed to create HTTP request for PutBucketPolicyHandler: %v", i+1, err) + t.Fatalf("Test %d: %s: Failed to create HTTP request for PutBucketPolicyHandler: %v", i+1, instanceType, err) } // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic ofthe handler. // Call the ServeHTTP to execute the handler. apiRouter.ServeHTTP(rec, req) if rec.Code != testCase.expectedRespStatus { - t.Errorf("Test %d: Expected the response status to be `%d`, but instead found `%d`", i+1, testCase.expectedRespStatus, rec.Code) + t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code) } } } diff --git a/cmd/bucket-policy.go b/cmd/bucket-policy.go index 4678fecf9..c2cc5f6ed 100644 --- a/cmd/bucket-policy.go +++ b/cmd/bucket-policy.go @@ -169,7 +169,6 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error { if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} } - policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil { if _, ok := err.(ObjectNotFound); ok { diff --git a/cmd/object-common.go b/cmd/object-common.go index d9395f1b3..02d6e87fd 100644 --- a/cmd/object-common.go +++ b/cmd/object-common.go @@ -17,7 +17,7 @@ package cmd import ( - "path/filepath" + "net" "strings" "sync" ) @@ -53,13 +53,57 @@ func fsHouseKeeping(storageDisk StorageAPI) error { return nil } +// Check if a network path is local to this node. +func isLocalStorage(networkPath string) bool { + if idx := strings.LastIndex(networkPath, ":"); idx != -1 { + // e.g 10.0.0.1:9000:/mnt/networkPath + netAddr, _ := splitNetPath(networkPath) + var netHost string + var err error + netHost, _, err = net.SplitHostPort(netAddr) + if err != nil { + netHost = netAddr + } + // Resolve host to address to check if the IP is loopback. + // If address resolution fails, assume it's a non-local host. + addrs, err := net.LookupHost(netHost) + if err != nil { + return false + } + for _, addr := range addrs { + if ip := net.ParseIP(addr); ip.IsLoopback() { + return true + } + } + iaddrs, err := net.InterfaceAddrs() + if err != nil { + return false + } + for _, addr := range addrs { + for _, iaddr := range iaddrs { + ip, _, err := net.ParseCIDR(iaddr.String()) + if err != nil { + return false + } + if ip.String() == addr { + return true + } + + } + } + return false + } + return true +} + // Depending on the disk type network or local, initialize storage API. func newStorageAPI(disk string) (storage StorageAPI, err error) { - if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" { - // Initialize filesystem storage API. + if isLocalStorage(disk) { + if idx := strings.LastIndex(disk, ":"); idx != -1 { + return newPosix(disk[idx+1:]) + } return newPosix(disk) } - // Initialize rpc client storage API. return newRPCClient(disk) } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 90d0702f4..d99f4e870 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -102,7 +102,12 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req } } // Fetch object stat info. - objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info.") apiErr := toAPIErrorCode(err) @@ -161,7 +166,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req }) // Reads the object at startOffset and writes to mw. - if err := api.ObjectAPI.GetObject(bucket, object, startOffset, length, writer); err != nil { + if err := objectAPI.GetObject(bucket, object, startOffset, length, writer); err != nil { errorIf(err, "Unable to write to client.") if !dataWritten { // Error response only if no data has been written to client yet. i.e if @@ -208,7 +213,12 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re } } - objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info.") apiErr := toAPIErrorCode(err) @@ -289,7 +299,12 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re return } - objInfo, err := api.ObjectAPI.GetObjectInfo(sourceBucket, sourceObject) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + objInfo, err := objectAPI.GetObjectInfo(sourceBucket, sourceObject) if err != nil { errorIf(err, "Unable to fetch object info.") writeErrorResponse(w, r, toAPIErrorCode(err), objectSource) @@ -311,7 +326,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re go func() { startOffset := int64(0) // Read the whole file. // Get the object. - gErr := api.ObjectAPI.GetObject(sourceBucket, sourceObject, startOffset, objInfo.Size, pipeWriter) + gErr := objectAPI.GetObject(sourceBucket, sourceObject, startOffset, objInfo.Size, pipeWriter) if gErr != nil { errorIf(gErr, "Unable to read an object.") pipeWriter.CloseWithError(gErr) @@ -332,7 +347,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // same md5sum as the source. // Create the object. - md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, pipeReader, metadata) + md5Sum, err := objectAPI.PutObject(bucket, object, size, pipeReader, metadata) if err != nil { // Close the this end of the pipe upon error in PutObject. pipeReader.CloseWithError(err) @@ -343,7 +358,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // Explicitly close the reader, before fetching object info. pipeReader.Close() - objInfo, err = api.ObjectAPI.GetObjectInfo(bucket, object) + objInfo, err = objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -418,6 +433,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req // Make sure we hex encode md5sum here. metadata["md5Sum"] = hex.EncodeToString(md5Bytes) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } var md5Sum string switch rAuthType { default: @@ -431,7 +451,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } // Create anonymous object. - md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, metadata) + md5Sum, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata) case authTypeStreamingSigned: // Initialize stream signature verifier. reader, s3Error := newSignV4ChunkedReader(r) @@ -439,12 +459,12 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(w, r, s3Error, r.URL.Path) return } - md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata) + md5Sum, err = objectAPI.PutObject(bucket, object, size, reader, metadata) case authTypePresigned, authTypeSigned: // Initialize signature verifier. reader := newSignVerify(r) // Create object. - md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata) + md5Sum, err = objectAPI.PutObject(bucket, object, size, reader, metadata) } if err != nil { errorIf(err, "Unable to create an object.") @@ -458,7 +478,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req if globalEventNotifier.IsBucketNotificationSet(bucket) { // Fetch object info for notifications. - objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) return @@ -506,7 +526,12 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r // Extract metadata that needs to be saved. metadata := extractMetadataFromHeader(r.Header) - uploadID, err := api.ObjectAPI.NewMultipartUpload(bucket, object, metadata) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + uploadID, err := objectAPI.NewMultipartUpload(bucket, object, metadata) if err != nil { errorIf(err, "Unable to initiate new multipart upload id.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -574,6 +599,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } var partMD5 string incomingMD5 := hex.EncodeToString(md5Bytes) switch rAuthType { @@ -588,7 +618,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } // No need to verify signature, anonymous request access is already allowed. - partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5) + partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5) case authTypeStreamingSigned: // Initialize stream signature verifier. reader, s3Error := newSignV4ChunkedReader(r) @@ -596,11 +626,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http writeErrorResponse(w, r, s3Error, r.URL.Path) return } - partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5) + partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5) case authTypePresigned, authTypeSigned: // Initialize signature verifier. reader := newSignVerify(r) - partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5) + partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5) } if err != nil { errorIf(err, "Unable to create object part.") @@ -638,8 +668,13 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, } } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } uploadID, _, _, _ := getObjectResources(r.URL.Query()) - if err := api.ObjectAPI.AbortMultipartUpload(bucket, object, uploadID); err != nil { + if err := objectAPI.AbortMultipartUpload(bucket, object, uploadID); err != nil { errorIf(err, "Unable to abort multipart upload.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return @@ -680,7 +715,12 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht writeErrorResponse(w, r, ErrInvalidMaxParts, r.URL.Path) return } - listPartsInfo, err := api.ObjectAPI.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + listPartsInfo, err := objectAPI.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) if err != nil { errorIf(err, "Unable to list uploaded parts.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -761,10 +801,15 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite return } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } doneCh := make(chan struct{}) // Signal that completeMultipartUpload is over via doneCh go func(doneCh chan<- struct{}) { - md5Sum, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) + md5Sum, err = objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) doneCh <- struct{}{} }(doneCh) @@ -799,7 +844,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite if globalEventNotifier.IsBucketNotificationSet(bucket) { // Fetch object info for notifications. - objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) return @@ -842,10 +887,15 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } } + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } /// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html /// Ignore delete object errors, since we are suppposed to reply /// only 204. - if err := api.ObjectAPI.DeleteObject(bucket, object); err != nil { + if err := objectAPI.DeleteObject(bucket, object); err != nil { writeSuccessNoContent(w) return } diff --git a/cmd/routers.go b/cmd/routers.go index 42503de70..97538040c 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -32,6 +32,7 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) { // Initialize FS object layer. return newFSObjects(exportPath) } + // TODO: use dsync to block other concurrently booting up nodes. // Initialize XL object layer. objAPI, err := newXLObjects(disks, ignoredDisks) if err == errXLWriteQuorum { @@ -40,65 +41,75 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) { return objAPI, err } +func newObjectLayerFactory(disks, ignoredDisks []string) func() ObjectLayer { + var objAPI ObjectLayer + // FIXME: This needs to be go-routine safe. + return func() ObjectLayer { + var err error + if objAPI != nil { + return objAPI + } + objAPI, err = newObjectLayer(disks, ignoredDisks) + if err != nil { + return nil + } + // Migrate bucket policy from configDir to .minio.sys/buckets/ + err = migrateBucketPolicyConfig(objAPI) + fatalIf(err, "Unable to migrate bucket policy from config directory") + + err = cleanupOldBucketPolicyConfigs() + fatalIf(err, "Unable to clean up bucket policy from config directory.") + + // Initialize and monitor shutdown signals. + err = initGracefulShutdown(os.Exit) + fatalIf(err, "Unable to initialize graceful shutdown operation") + + // Register the callback that should be called when the process shuts down. + globalShutdownCBs.AddObjectLayerCB(func() errCode { + if sErr := objAPI.Shutdown(); sErr != nil { + return exitFailure + } + return exitSuccess + }) + + // Initialize a new event notifier. + err = initEventNotifier(objAPI) + fatalIf(err, "Unable to initialize event notification queue") + + // Initialize and load bucket policies. + err = initBucketPolicies(objAPI) + fatalIf(err, "Unable to load all bucket policies") + + return objAPI + } +} + // configureServer handler returns final handler for the http server. func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { - // Initialize name space lock. + // Initialize Namespace locking. initNSLock() - objAPI, err := newObjectLayer(srvCmdConfig.disks, srvCmdConfig.ignoredDisks) - fatalIf(err, "Unable to intialize object layer.") - - // Migrate bucket policy from configDir to .minio.sys/buckets/ - err = migrateBucketPolicyConfig(objAPI) - fatalIf(err, "Unable to migrate bucket policy from config directory") - - err = cleanupOldBucketPolicyConfigs() - fatalIf(err, "Unable to clean up bucket policy from config directory.") - - // Initialize storage rpc server. - storageRPC, err := newRPCServer(srvCmdConfig.disks[0]) // FIXME: should only have one path. + // Initialize storage rpc servers for every disk that is hosted on this node. + storageRPCs, err := newRPCServer(srvCmdConfig) fatalIf(err, "Unable to initialize storage RPC server.") + newObjectLayerFn := newObjectLayerFactory(srvCmdConfig.disks, srvCmdConfig.ignoredDisks) // Initialize API. apiHandlers := objectAPIHandlers{ - ObjectAPI: objAPI, + ObjectAPI: newObjectLayerFn, } // Initialize Web. webHandlers := &webAPIHandlers{ - ObjectAPI: objAPI, + ObjectAPI: newObjectLayerFn, } - // Initialize Controller. - ctrlHandlers := &controllerAPIHandlers{ - ObjectAPI: objAPI, - } - - // Initialize and monitor shutdown signals. - err = initGracefulShutdown(os.Exit) - fatalIf(err, "Unable to initialize graceful shutdown operation") - - // Register the callback that should be called when the process shuts down. - globalShutdownCBs.AddObjectLayerCB(func() errCode { - if sErr := objAPI.Shutdown(); sErr != nil { - return exitFailure - } - return exitSuccess - }) - - // Initialize a new event notifier. - err = initEventNotifier(objAPI) - fatalIf(err, "Unable to initialize event notification queue") - - // Initialize a new bucket policies. - err = initBucketPolicies(objAPI) - fatalIf(err, "Unable to load all bucket policies") - // Initialize router. mux := router.NewRouter() // Register all routers. - registerStorageRPCRouter(mux, storageRPC) + registerStorageRPCRouters(mux, storageRPCs) + initDistributedNSLock(mux, srvCmdConfig) // FIXME: till net/rpc auth is brought in "minio control" can be enabled only though // this env variable. diff --git a/cmd/rpc-client.go b/cmd/rpc-client.go index 391ee2477..a3eb3af8d 100644 --- a/cmd/rpc-client.go +++ b/cmd/rpc-client.go @@ -19,6 +19,8 @@ package cmd import ( "net/http" "net/rpc" + "path" + "strconv" "strings" "time" ) @@ -78,12 +80,6 @@ func newRPCClient(networkPath string) (StorageAPI, error) { // TODO validate netAddr and netPath. netAddr, netPath := splitNetPath(networkPath) - // Dial minio rpc storage http path. - rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, storageRPCPath) - if err != nil { - return nil, err - } - // Initialize http client. httpClient := &http.Client{ // Setting a sensible time out of 6minutes to wait for @@ -93,6 +89,15 @@ func newRPCClient(networkPath string) (StorageAPI, error) { Transport: http.DefaultTransport, } + // Dial minio rpc storage http path. + rpcPath := path.Join(storageRPCPath, netPath) + port := getPort(srvConfig.serverAddr) + rpcAddr := netAddr + ":" + strconv.Itoa(port) + rpcClient, err := rpc.DialHTTPPath("tcp", rpcAddr, rpcPath) + if err != nil { + return nil, err + } + // Initialize network storage. ndisk := &networkStorage{ netScheme: "http", // TODO: fix for ssl rpc support. @@ -108,6 +113,9 @@ func newRPCClient(networkPath string) (StorageAPI, error) { // MakeVol - make a volume. func (n networkStorage) MakeVol(volume string) error { + if n.rpcClient == nil { + return errVolumeBusy + } reply := GenericReply{} if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil { return toStorageErr(err) @@ -117,6 +125,9 @@ func (n networkStorage) MakeVol(volume string) error { // ListVols - List all volumes. func (n networkStorage) ListVols() (vols []VolInfo, err error) { + if n.rpcClient == nil { + return nil, errVolumeBusy + } ListVols := ListVolsReply{} err = n.rpcClient.Call("Storage.ListVolsHandler", "", &ListVols) if err != nil { @@ -127,6 +138,9 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) { // StatVol - get current Stat volume info. func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { + if n.rpcClient == nil { + return VolInfo{}, errVolumeBusy + } if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { return VolInfo{}, toStorageErr(err) } @@ -135,6 +149,9 @@ func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - Delete a volume. func (n networkStorage) DeleteVol(volume string) error { + if n.rpcClient == nil { + return errVolumeBusy + } reply := GenericReply{} if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil { return toStorageErr(err) @@ -146,6 +163,9 @@ func (n networkStorage) DeleteVol(volume string) error { // CreateFile - create file. func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { + if n.rpcClient == nil { + return errVolumeBusy + } reply := GenericReply{} if err = n.rpcClient.Call("Storage.AppendFileHandler", AppendFileArgs{ Vol: volume, @@ -184,6 +204,9 @@ func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { // ReadFile - reads a file. func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { + if n.rpcClient == nil { + return 0, errVolumeBusy + } if err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{ Vol: volume, Path: path, @@ -197,6 +220,9 @@ func (n networkStorage) ReadFile(volume string, path string, offset int64, buffe // ListDir - list all entries at prefix. func (n networkStorage) ListDir(volume, path string) (entries []string, err error) { + if n.rpcClient == nil { + return nil, errVolumeBusy + } if err = n.rpcClient.Call("Storage.ListDirHandler", ListDirArgs{ Vol: volume, Path: path, @@ -209,6 +235,9 @@ func (n networkStorage) ListDir(volume, path string) (entries []string, err erro // DeleteFile - Delete a file at path. func (n networkStorage) DeleteFile(volume, path string) (err error) { + if n.rpcClient == nil { + return errVolumeBusy + } reply := GenericReply{} if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{ Vol: volume, @@ -221,6 +250,9 @@ func (n networkStorage) DeleteFile(volume, path string) (err error) { // RenameFile - Rename file. func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { + if n.rpcClient == nil { + return errVolumeBusy + } reply := GenericReply{} if err = n.rpcClient.Call("Storage.RenameFileHandler", RenameFileArgs{ SrcVol: srcVolume, diff --git a/cmd/rpc-server.go b/cmd/rpc-server.go index 2a32705d1..b9bdb8c97 100644 --- a/cmd/rpc-server.go +++ b/cmd/rpc-server.go @@ -1,7 +1,25 @@ -package cmd +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main import ( "net/rpc" + "path" + "strings" router "github.com/gorilla/mux" ) @@ -10,6 +28,7 @@ import ( // disk over a network. type storageServer struct { storage StorageAPI + path string } /// Volume operations handlers @@ -103,22 +122,50 @@ func (s *storageServer) RenameFileHandler(arg *RenameFileArgs, reply *GenericRep } // Initialize new storage rpc. -func newRPCServer(exportPath string) (*storageServer, error) { +func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err error) { // Initialize posix storage API. - storage, err := newPosix(exportPath) - if err != nil && err != errDiskNotFound { - return nil, err + exports := serverConfig.disks + ignoredExports := serverConfig.ignoredDisks + + // Save ignored disks in a map + skipDisks := make(map[string]bool) + for _, ignoredExport := range ignoredExports { + skipDisks[ignoredExport] = true } - return &storageServer{ - storage: storage, - }, nil + for _, export := range exports { + if skipDisks[export] { + continue + } + // e.g server:/mnt/disk1 + if isLocalStorage(export) { + if idx := strings.LastIndex(export, ":"); idx != -1 { + export = export[idx+1:] + } + var storage StorageAPI + storage, err = newPosix(export) + if err != nil && err != errDiskNotFound { + return nil, err + } + if idx := strings.LastIndex(export, ":"); idx != -1 { + export = export[idx+1:] + } + servers = append(servers, &storageServer{ + storage: storage, + path: export, + }) + } + } + return servers, err } // registerStorageRPCRouter - register storage rpc router. -func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) { +func registerStorageRPCRouters(mux *router.Router, stServers []*storageServer) { storageRPCServer := rpc.NewServer() - storageRPCServer.RegisterName("Storage", stServer) - storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() - // Add minio storage routes. - storageRouter.Path("/storage").Handler(storageRPCServer) + // Create a unique route for each disk exported from this node. + for _, stServer := range stServers { + storageRPCServer.RegisterName("Storage", stServer) + // Add minio storage routes. + storageRouter := mux.PathPrefix(reservedBucket).Subrouter() + storageRouter.Path(path.Join("/storage", stServer.path)).Handler(storageRPCServer) + } } diff --git a/cmd/server-main.go b/cmd/server-main.go index a0ed0307a..da48a067f 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -28,6 +28,7 @@ import ( "github.com/minio/cli" ) +var srvConfig serverCmdConfig var serverCmd = cli.Command{ Name: "server", Usage: "Start object storage server.", @@ -245,11 +246,13 @@ func serverMain(c *cli.Context) { disks := c.Args() // Configure server. - handler := configureServerHandler(serverCmdConfig{ + srvConfig = serverCmdConfig{ serverAddr: serverAddress, disks: disks, ignoredDisks: ignoredDisks, - }) + } + // Configure server. + handler := configureServerHandler(srvConfig) apiServer := NewServerMux(serverAddress, handler) diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index 787febf25..8e6fb65f0 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -59,3 +59,6 @@ var errVolumeAccessDenied = errors.New("volume access denied") // errVolumeAccessDenied - cannot access file, insufficient permissions. var errFileAccessDenied = errors.New("file access denied") + +// errVolumeBusy - remote disk is not connected to yet. +var errVolumeBusy = errors.New("volume is busy") diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index c538fd908..d465762a9 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -888,7 +888,7 @@ func initTestAPIEndPoints(objLayer ObjectLayer, apiFunctions []string) http.Hand // All object storage operations are registered as HTTP handlers on `objectAPIHandlers`. // When the handlers get a HTTP request they use the underlyting ObjectLayer to perform operations. api := objectAPIHandlers{ - ObjectAPI: objLayer, + ObjectAPI: func() ObjectLayer { return objLayer }, } // API Router. apiRouter := muxRouter.NewRoute().PathPrefix("/").Subrouter() diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index add91dd9a..8456636d6 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" @@ -124,7 +125,11 @@ func (web *webAPIHandlers) StorageInfo(r *http.Request, args *GenericArgs, reply return &json2.Error{Message: "Unauthorized request"} } reply.UIVersion = miniobrowser.UIVersion - reply.StorageInfo = web.ObjectAPI.StorageInfo() + objectAPI := web.ObjectAPI() + if objectAPI == nil { + return &json2.Error{Message: "Volume not found"} + } + reply.StorageInfo = objectAPI.StorageInfo() return nil } @@ -139,7 +144,11 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep return &json2.Error{Message: "Unauthorized request"} } reply.UIVersion = miniobrowser.UIVersion - if err := web.ObjectAPI.MakeBucket(args.BucketName); err != nil { + objectAPI := web.ObjectAPI() + if objectAPI == nil { + return &json2.Error{Message: "Volume not found"} + } + if err := objectAPI.MakeBucket(args.BucketName); err != nil { return &json2.Error{Message: err.Error()} } return nil @@ -164,7 +173,11 @@ func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, re if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } - buckets, err := web.ObjectAPI.ListBuckets() + objectAPI := web.ObjectAPI() + if objectAPI == nil { + return &json2.Error{Message: "Volume not found"} + } + buckets, err := objectAPI.ListBuckets() if err != nil { return &json2.Error{Message: err.Error()} } @@ -212,7 +225,11 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r return &json2.Error{Message: "Unauthorized request"} } for { - lo, err := web.ObjectAPI.ListObjects(args.BucketName, args.Prefix, marker, "/", 1000) + objectAPI := web.ObjectAPI() + if objectAPI == nil { + return &json2.Error{Message: "Volume not found"} + } + lo, err := objectAPI.ListObjects(args.BucketName, args.Prefix, marker, "/", 1000) if err != nil { return &json2.Error{Message: err.Error()} } @@ -250,7 +267,11 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, return &json2.Error{Message: "Unauthorized request"} } reply.UIVersion = miniobrowser.UIVersion - if err := web.ObjectAPI.DeleteObject(args.BucketName, args.ObjectName); err != nil { + objectAPI := web.ObjectAPI() + if objectAPI == nil { + return &json2.Error{Message: "Volume not found"} + } + if err := objectAPI.DeleteObject(args.BucketName, args.ObjectName); err != nil { return &json2.Error{Message: err.Error()} } return nil @@ -384,13 +405,18 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { // Extract incoming metadata if any. metadata := extractMetadataFromHeader(r.Header) - if _, err := web.ObjectAPI.PutObject(bucket, object, -1, r.Body, metadata); err != nil { + objectAPI := web.ObjectAPI() + if objectAPI == nil { + writeWebErrorResponse(w, errors.New("Volume not found")) + return + } + if _, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata); err != nil { writeWebErrorResponse(w, err) return } // Fetch object info for notifications. - objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object) + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) return @@ -435,13 +461,18 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { // Add content disposition. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object))) - objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object) + objectAPI := web.ObjectAPI() + if objectAPI == nil { + writeWebErrorResponse(w, errors.New("Volume not found")) + return + } + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { writeWebErrorResponse(w, err) return } offset := int64(0) - err = web.ObjectAPI.GetObject(bucket, object, offset, objInfo.Size, w) + err = objectAPI.GetObject(bucket, object, offset, objInfo.Size, w) if err != nil { /// No need to print error, response writer already written to. return diff --git a/cmd/web-router.go b/cmd/web-router.go index f2a5e18e4..e5bee0c48 100644 --- a/cmd/web-router.go +++ b/cmd/web-router.go @@ -30,7 +30,7 @@ import ( // webAPI container for Web API. type webAPIHandlers struct { - ObjectAPI ObjectLayer + ObjectAPI func() ObjectLayer } // indexHandler - Handler to serve index.html diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index f80f1a49f..dd030d89f 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -118,6 +118,10 @@ func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) { // to handle these errors internally. storageDisks[index], err = newStorageAPI(disk) if err != nil && err != errDiskNotFound { + switch diskType := storageDisks[index].(type) { + case networkStorage: + diskType.rpcClient.Close() + } return nil, err } } diff --git a/lock-rpc-server.go b/lock-rpc-server.go new file mode 100644 index 000000000..785f3e0bd --- /dev/null +++ b/lock-rpc-server.go @@ -0,0 +1,107 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "net/rpc" + "path" + "strings" + "sync" + + router "github.com/gorilla/mux" +) + +const lockRPCPath = "/lock" + +type lockServer struct { + rpcPath string + mutex sync.Mutex + lockMap map[string]struct{} +} + +/// Distributed lock handlers + +// LockHandler - rpc handler for lock operation. +func (l *lockServer) LockHandler(name *string, reply *bool) error { + l.mutex.Lock() + defer l.mutex.Unlock() + _, ok := l.lockMap[*name] + if !ok { + *reply = true + l.lockMap[*name] = struct{}{} + return nil + } + *reply = false + return nil +} + +// UnlockHandler - rpc handler for unlock operation. +func (l *lockServer) UnlockHandler(name *string, reply *bool) error { + l.mutex.Lock() + defer l.mutex.Unlock() + _, ok := l.lockMap[*name] + if !ok { + return fmt.Errorf("Unlock attempted on an un-locked entity: %s", *name) + } + *reply = true + delete(l.lockMap, *name) + return nil +} + +// Initialize distributed lock. +func initDistributedNSLock(mux *router.Router, serverConfig serverCmdConfig) { + lockServers := newLockServers(serverConfig) + registerStorageLockers(mux, lockServers) +} + +// Create one lock server for every local storage rpc server. +func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { + // Initialize posix storage API. + exports := serverConfig.disks + ignoredExports := serverConfig.ignoredDisks + + // Save ignored disks in a map + skipDisks := make(map[string]bool) + for _, ignoredExport := range ignoredExports { + skipDisks[ignoredExport] = true + } + for _, export := range exports { + if skipDisks[export] { + continue + } + if idx := strings.LastIndex(export, ":"); idx != -1 { + export = export[idx+1:] + } + lockServers = append(lockServers, &lockServer{ + rpcPath: export, + mutex: sync.Mutex{}, + lockMap: make(map[string]struct{}), + }) + } + return lockServers +} + +// registerStorageLockers - register locker rpc handlers for valyala/gorpc library clients +func registerStorageLockers(mux *router.Router, lockServers []*lockServer) { + lockRPCServer := rpc.NewServer() + for _, lockServer := range lockServers { + lockRPCServer.RegisterName("Dsync", lockServer) + lockRouter := mux.PathPrefix(reservedBucket).Subrouter() + lockRouter.Path(path.Join("/lock", lockServer.rpcPath)).Handler(lockRPCServer) + } +} diff --git a/vendor/github.com/minio/dsync/LICENSE b/vendor/github.com/minio/dsync/LICENSE new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/vendor/github.com/minio/dsync/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/minio/dsync/README.md b/vendor/github.com/minio/dsync/README.md new file mode 100644 index 000000000..80326d6f5 --- /dev/null +++ b/vendor/github.com/minio/dsync/README.md @@ -0,0 +1,81 @@ +dsync +===== + +A distributed sync package. + +Introduction +------------ + +`dsync` is a package for doing distributed locks over a network of `n` nodes. It is designed with simplicity in mind and hence offers limited scalability (`n <= 16`). Each node will be connected to all other nodes and lock requests from any node will be broadcast to all connected nodes. A node will succeed in getting the lock if `n/2 + 1` nodes (including itself) respond positively. If the lock is acquired it can be held for some time and needs to be released afterwards. This will cause the release to be broadcast to all nodes after which the lock becomes available again. + +Design goals +------------ + +* Simple design: by keeping the design simple, many tricky edge cases can be avoided. +* No master node: there is no concept of a master node which, if this would be used and the master would be down, causes locking to come to a complete stop. (Unless you have a design with a slave node but this adds yet more complexity.) +* Resilient: if one or more nodes go down, the other nodes should not be affected and can continue to acquire locks (provided not more than `n/2 - 1` nodes are down). +* Automatically reconnect to (restarted) nodes. +* Compatible with `sync/mutex` API. + + +Restrictions +------------ + +* Limited scalability: up to 16 nodes. +* Fixed configuration: changes in the number and/or network names/IP addresses need a restart of all nodes in order to take effect. +* If a down node comes up, it will not in any way (re)acquire any locks that it may have held. +* Not designed for high performance applications such as key/value stores + +Performance +----------- + +* Lock requests (successful) should not take longer than 1ms (provided decent network connection of 1 Gbit or more between the nodes) +* Support up to 4000 locks per node per second. +* Scale linearly with the number of locks. For the maximum size case of 16 nodes this means a maximum of 64K locks/sec (and 2048K lock request & release messages/sec) +* Do not take more than (overall) 10% CPU usage + +Issues +------ + +* In case the node that has the lock goes down, the lock release will not be broadcast: what do we do? (periodically ping 'back' to requesting node from all nodes that have the lock?) Or detect that the network connection has gone down. +* If one of the nodes that participated in the lock goes down, this is not a problem since (when it comes back online) the node that originally acquired the lock will still have it, and a request for a new lock will fail due to only `n/2` being available. +* If two nodes go down and both participated in the lock then there is a chance that a new lock will acquire locks from `n/2 + 1` nodes and will success, so we would have two concurrent locks. One way to counter this would be to monitor the network connections from the nodes that originated the lock, and, upon losing a connection to a node that granted a lock, get a new lock from a free node. +* When two nodes want to acquire the same lock, it is possible for both to just acquire `n` locks and there is no majority winner so both would fail (and presumably fail back to their clients?). This then requires a retry in order to acquire the lock at a later time. +* What if late acquire response still comes in after lock has been obtained (quorum is in) and has already been released again. + +Comparison to other techniques +------------------------------ + +We are well aware that there are more sophisticated systems such as zookeeper, raft, etc but we found that for our limited use case this was adding too much complexity. So if `dsync` does not meet your requirements than you are probably better off using one of those systems. + +Performance +----------- + +``` +benchmark old ns/op new ns/op delta +BenchmarkMutexUncontended-8 4.22 1164018 +27583264.93% +BenchmarkMutex-8 96.5 1223266 +1267533.16% +BenchmarkMutexSlack-8 120 1192900 +993983.33% +BenchmarkMutexWork-8 108 1239893 +1147949.07% +BenchmarkMutexWorkSlack-8 142 1210129 +852103.52% +BenchmarkMutexNoSpin-8 292 319479 +109310.62% +BenchmarkMutexSpin-8 1163 1270066 +109106.02% +``` + +Usage +----- + +Explain usage +``` +``` + + +License +------- + +Released under the Apache License v2.0. You can find the complete text in the file LICENSE. + +Contributing +------------ + +Contributions are welcome, please send PRs for any enhancements. diff --git a/vendor/github.com/minio/dsync/dmutex.go b/vendor/github.com/minio/dsync/dmutex.go new file mode 100644 index 000000000..2212598d5 --- /dev/null +++ b/vendor/github.com/minio/dsync/dmutex.go @@ -0,0 +1,311 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dsync + +import ( + "log" + "math" + "math/rand" + "net/rpc" + "strings" + "sync" + "time" +) + +const DMutexAcquireTimeout = 25 * time.Millisecond + +// A DMutex is a distributed mutual exclusion lock. +type DMutex struct { + Name string + locks []bool // Array of nodes that granted a lock + uids []string // Array of uids for verification of sending correct release messages + m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node + + // TODO: Decide: create per object or create once for whole class + clnts []*rpc.Client +} + +type Granted struct { + index int + locked bool + uid string +} + +func connectLazy(dm *DMutex) { + if dm.clnts == nil { + dm.clnts = make([]*rpc.Client, n) + } + for i := range dm.clnts { + if dm.clnts[i] == nil { + // pass in unique path (as required by server.HandleHTTP() + dm.clnts[i], _ = rpc.DialHTTPPath("tcp", nodes[i], rpcPath+"-"+strings.Split(nodes[i], ":")[1]) + } + } +} + +// Lock locks dm. +// +// If the lock is already in use, the calling goroutine +// blocks until the mutex is available. +func (dm *DMutex) Lock() { + + // Shield Lock() with local mutex in order to prevent more than + // one broadcast going out at the same time from this node + dm.m.Lock() + defer dm.m.Unlock() + + runs, backOff := 1, 1 + + for { + // TODO: Implement reconnect + connectLazy(dm) + + // create temp arrays on stack + locks := make([]bool, n) + ids := make([]string, n) + + // try to acquire the lock + success := lock(dm.clnts, &locks, &ids, dm.Name) + if success { + // if success, copy array to object + dm.locks = make([]bool, n) + copy(dm.locks, locks[:]) + dm.uids = make([]string, n) + copy(dm.uids, ids[:]) + return + } + + // We timed out on the previous lock, incrementally wait for a longer back-off time, + // and try again afterwards + time.Sleep(time.Duration(backOff) * time.Millisecond) + + backOff += int(rand.Float64() * math.Pow(2, float64(runs))) + if backOff > 1024 { + backOff = backOff % 64 + + runs = 1 // reset runs + } else if runs < 10 { + runs++ + } + } +} + +func (dm *DMutex) tryLockTimeout() bool { + + // Shield Lock() with local mutex in order to prevent more than + // one broadcast going out at the same time from this node + dm.m.Lock() + defer dm.m.Unlock() + + // TODO: Implement reconnect + connectLazy(dm) + + // create temp arrays on stack + locks := make([]bool, n) + ids := make([]string, n) + + // try to acquire the lock + success := lock(dm.clnts, &locks, &ids, dm.Name) + if success { + // if success, copy array to object + dm.locks = make([]bool, n) + copy(dm.locks, locks[:]) + dm.uids = make([]string, n) + copy(dm.uids, ids[:]) + } + return success +} + +// lock tries to acquire the distributed lock, returning true or false +// +func lock(clnts []*rpc.Client, locks *[]bool, uids *[]string, lockName string) bool { + + // Create buffered channel of quorum size + ch := make(chan Granted, n/2+1) + + for index, c := range clnts { + + if c == nil { + continue + } + // broadcast lock request to all nodes + go func(index int, c *rpc.Client) { + // All client methods issuing RPCs are thread-safe and goroutine-safe, + // i.e. it is safe to call them from multiple concurrently running go routines. + var status bool + err := c.Call("Dsync.Lock", lockName, &status) + + locked, uid := false, "" + if err == nil { + locked = status + // TODO: Get UIOD again + uid = "" + } else { + // silently ignore error, retry later + } + + ch <- Granted{index: index, locked: locked, uid: uid} + + }(index, c) + } + + var wg sync.WaitGroup + wg.Add(1) + + quorum := false + + go func() { + + // Wait until we have received (minimally) quorum number of responses or timeout + i := 0 + done := false + timeout := time.After(DMutexAcquireTimeout) + + for ; i < n; i++ { + + select { + case grant := <-ch: + if grant.locked { + // Mark that this node has acquired the lock + (*locks)[grant.index] = true + (*uids)[grant.index] = grant.uid + } else { + done = true + //fmt.Println("one lock failed before quorum -- release locks acquired") + releaseAll(clnts, locks, uids, lockName) + } + + case <-timeout: + done = true + // timeout happened, maybe one of the nodes is slow, count + // number of locks to check whether we have quorum or not + if !quorumMet(locks) { + //fmt.Println("timed out -- release locks acquired") + releaseAll(clnts, locks, uids, lockName) + } + } + + if done { + break + } + } + + // Count locks in order to determine whterh we have quorum or not + quorum = quorumMet(locks) + + // Signal that we have the quorum + wg.Done() + + // Wait for the other responses and immediately release the locks + // (do not add them to the locks array because the DMutex could + // already has been unlocked again by the original calling thread) + for ; i < n; i++ { + grantToBeReleased := <-ch + if grantToBeReleased.locked { + // release lock + go sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.uid) + } + } + }() + + wg.Wait() + + return quorum +} + +// quorumMet determines whether we have acquired n/2+1 underlying locks or not +func quorumMet(locks *[]bool) bool { + + count := 0 + for _, locked := range *locks { + if locked { + count++ + } + } + + return count >= n/2+1 +} + +// releaseAll releases all locks that are marked as locked +func releaseAll(clnts []*rpc.Client, locks *[]bool, ids *[]string, lockName string) { + + for lock := 0; lock < n; lock++ { + if (*locks)[lock] { + go sendRelease(clnts[lock], lockName, (*ids)[lock]) + (*locks)[lock] = false + (*ids)[lock] = "" + } + } + +} + +// hasLock returns whether or not a node participated in granting the lock +func (dm *DMutex) hasLock(node string) bool { + + for index, n := range nodes { + if n == node { + return dm.locks[index] + } + } + + return false +} + +// locked returns whether or not we have met the quorum +func (dm *DMutex) locked() bool { + + locks := make([]bool, n) + copy(locks[:], dm.locks[:]) + + return quorumMet(&locks) +} + +// Unlock unlocks dm. +// +// It is a run-time error if dm is not locked on entry to Unlock. +func (dm *DMutex) Unlock() { + + // Verify that we have the lock or panic otherwise (similar to sync.mutex) + if !dm.locked() { + panic("dsync: unlock of unlocked distributed mutex") + } + + // We don't need to wait until we have released all the locks (or the quorum) + // (a subsequent lock will retry automatically in case it would fail to get + // quorum) + for index, c := range dm.clnts { + + if dm.locks[index] { + // broadcast lock release to all nodes the granted the lock + go sendRelease(c, dm.Name, dm.uids[index]) + + dm.locks[index] = false + } + } +} + +// sendRelease sends a release message to a node that previously granted a lock +func sendRelease(c *rpc.Client, name, uid string) { + + // All client methods issuing RPCs are thread-safe and goroutine-safe, + // i.e. it is safe to call them from multiple concurrently running goroutines. + var status bool + // TODO: Send UID to server + if err := c.Call("Dsync.Unlock", name, &status); err != nil { + log.Fatal("Unlock on %s failed on client %v", name, c) + } +} diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go new file mode 100644 index 000000000..2f9f2d3f3 --- /dev/null +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -0,0 +1,143 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dsync + +import ( + "fmt" + "sync" +) + +const maxReaders = 8 + +type DRWMutex struct { + rArray []*DMutex + rLockedArray []bool + w DMutex // held if there are pending writers + m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node + m2 sync.Mutex // Mutex to prevent multiple simultaneous locks from this node +} + +func NewDRWMutex(name string) (drw *DRWMutex) { + + rArray := make([]*DMutex, maxReaders) + rLockedArray := make([]bool, maxReaders) + + for r := 0; r < maxReaders; r++ { + rArray[r] = &DMutex{Name: fmt.Sprintf("%s-r%d", name, r)} + } + + return &DRWMutex{ + rArray: rArray, + rLockedArray: rLockedArray, + w: DMutex{Name: name + "-w"}} +} + +// RLock locks drw for reading. +func (drw *DRWMutex) RLock() { + + drw.m.Lock() + defer drw.m.Unlock() + + // Check if no write is active, block otherwise + // Can skip this? + drw.w.Lock() + drw.w.Unlock() + + // Lock either one of the reader locks + for i := 0; ; i++ { + drw.rLockedArray[i%maxReaders] = drw.rArray[i%maxReaders].tryLockTimeout() + if drw.rLockedArray[i%maxReaders] { + return + } + } +} + +// RUnlock undoes a single RLock call; +// it does not affect other simultaneous readers. +// It is a run-time error if rw is not locked for reading +// on entry to RUnlock. +func (drw *DRWMutex) RUnlock() { + + drw.m.Lock() + defer drw.m.Unlock() + + // Unlock whichever readlock that was acquired) + for r := 0; r < maxReaders; r++ { + if drw.rLockedArray[r] { + drw.rArray[r].Unlock() + drw.rLockedArray[r] = false + // we only want to release a single read lock at a time + break + } + } +} + +// Lock locks rw for writing. +// If the lock is already locked for reading or writing, +// Lock blocks until the lock is available. +// To ensure that the lock eventually becomes available, +// a blocked Lock call excludes new readers from acquiring +// the lock. +func (drw *DRWMutex) Lock() { + + drw.m.Lock() + defer drw.m.Unlock() + + // First, resolve competition with other writers. + drw.w.Lock() + + // Acquire all read locks. + var wg sync.WaitGroup + wg.Add(maxReaders) + + for r := 0; r < maxReaders; r++ { + go func(r int) { + defer wg.Done() + drw.rArray[r].Lock() + drw.rLockedArray[r] = true + }(r) + } + + wg.Wait() +} + +// Unlock unlocks rw for writing. It is a run-time error if rw is +// not locked for writing on entry to Unlock. +// +// As with Mutexes, a locked RWMutex is not associated with a particular +// goroutine. One goroutine may RLock (Lock) an RWMutex and then +// arrange for another goroutine to RUnlock (Unlock) it. +func (drw *DRWMutex) Unlock() { + + drw.m.Lock() + defer drw.m.Unlock() + + for r := 0; r < maxReaders; r++ { + if !drw.rLockedArray[r] { + panic("dsync: unlock of unlocked distributed rwmutex") + } + } + + // Unlock all read locks + for r := 0; r < maxReaders; r++ { + drw.rArray[r].Unlock() + drw.rLockedArray[r] = false + } + + // Allow other writers to proceed. + drw.w.Unlock() +} diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go new file mode 100644 index 000000000..8977614be --- /dev/null +++ b/vendor/github.com/minio/dsync/dsync.go @@ -0,0 +1,56 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dsync + +import ( + "errors" + "net/rpc" +) + +const RpcPath = "/dsync" +const DebugPath = "/debug" + +const DefaultPath = "/rpc/dsync" + +var n int +var nodes []string +var rpcPath string + +func closeClients(clients []*rpc.Client) { + for _, clnt := range clients { + clnt.Close() + } +} + +// Same as SetNodes, but takes a path argument different from the package-level default. +func SetNodesWithPath(nodeList []string, path string) (err error) { + + // Validate if number of nodes is within allowable range. + if n != 0 { + return errors.New("Cannot reinitialize dsync package") + } else if len(nodeList) < 4 { + return errors.New("Dsync not designed for less than 4 nodes") + } else if len(nodeList) > 16 { + return errors.New("Dsync not designed for more than 16 nodes") + } + + nodes = make([]string, len(nodeList)) + copy(nodes, nodeList[:]) + rpcPath = path + n = len(nodes) + return nil +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 803181ae5..c29dacc65 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -97,6 +97,12 @@ "revision": "c4a07c7b68db77ccd119183fb1d01dd5972434ab", "revisionTime": "2015-11-18T20:00:48-08:00" }, + { + "checksumSHA1": "KCM0UiuvLA5fPiX5I83/HTklxlI=", + "path": "github.com/minio/dsync", + "revision": "c10eebd6b637bb834d502a6574c53e0ea6c64997", + "revisionTime": "2016-08-05T20:56:13Z" + }, { "path": "github.com/minio/go-homedir", "revision": "0b1069c753c94b3633cc06a1995252dbcc27c7a6",