mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
distribute: Make server work with multiple remote disks
This change initializes rpc servers associated with disks that are local. It makes object layer initialization on demand, namely on the first request to the object layer. Also adds lock RPC service vendorized minio/dsync
This commit is contained in:
committed by
Harshavardhana
parent
f82f535509
commit
e55926e8cf
@@ -20,7 +20,7 @@ import router "github.com/gorilla/mux"
|
||||
|
||||
// objectAPIHandler implements and provides http handlers for S3 API.
|
||||
type objectAPIHandlers struct {
|
||||
ObjectAPI ObjectLayer
|
||||
ObjectAPI func() ObjectLayer
|
||||
}
|
||||
|
||||
// registerAPIRouter - registers S3 compatible APIs.
|
||||
|
||||
@@ -97,10 +97,15 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
|
||||
writeErrorResponse(w, r, s3Error, r.URL.Path)
|
||||
return
|
||||
}
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
// Inititate a list objects operation based on the input params.
|
||||
// On success would return back ListObjectsInfo object to be
|
||||
// marshalled into S3 compatible XML header.
|
||||
listObjectsInfo, err := api.ObjectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
|
||||
listObjectsInfo, err := objectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to list objects.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -151,10 +156,15 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
|
||||
return
|
||||
}
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
// Inititate a list objects operation based on the input params.
|
||||
// On success would return back ListObjectsInfo object to be
|
||||
// marshalled into S3 compatible XML header.
|
||||
listObjectsInfo, err := api.ObjectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
|
||||
listObjectsInfo, err := objectAPI.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to list objects.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
|
||||
@@ -82,7 +82,12 @@ func (api objectAPIHandlers) GetBucketLocationHandler(w http.ResponseWriter, r *
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := api.ObjectAPI.GetBucketInfo(bucket); err != nil {
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if _, err := objectAPI.GetBucketInfo(bucket); err != nil {
|
||||
errorIf(err, "Unable to fetch bucket info.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
return
|
||||
@@ -144,7 +149,12 @@ func (api objectAPIHandlers) ListMultipartUploadsHandler(w http.ResponseWriter,
|
||||
}
|
||||
}
|
||||
|
||||
listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
listMultipartsInfo, err := objectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to list multipart uploads.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -170,7 +180,12 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
|
||||
return
|
||||
}
|
||||
|
||||
bucketsInfo, err := api.ObjectAPI.ListBuckets()
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
bucketsInfo, err := objectAPI.ListBuckets()
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to list buckets.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -241,6 +256,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
return
|
||||
}
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
var wg = &sync.WaitGroup{} // Allocate a new wait group.
|
||||
var dErrs = make([]error, len(deleteObjects.Objects))
|
||||
|
||||
@@ -327,8 +348,13 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
// Proceed to creating a bucket.
|
||||
err := api.ObjectAPI.MakeBucket(bucket)
|
||||
err := objectAPI.MakeBucket(bucket)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to create a bucket.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -384,7 +410,12 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
metadata := make(map[string]string)
|
||||
// Nothing to store right now.
|
||||
|
||||
md5Sum, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, metadata)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
md5Sum, err := objectAPI.PutObject(bucket, object, -1, fileBody, metadata)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to create object.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -405,7 +436,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
|
||||
if globalEventNotifier.IsBucketNotificationSet(bucket) {
|
||||
// Fetch object info for notifications.
|
||||
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
|
||||
return
|
||||
@@ -451,7 +482,12 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := api.ObjectAPI.GetBucketInfo(bucket); err != nil {
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if _, err := objectAPI.GetBucketInfo(bucket); err != nil {
|
||||
errorIf(err, "Unable to fetch bucket info.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
return
|
||||
@@ -470,8 +506,13 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
// Attempt to delete bucket.
|
||||
if err := api.ObjectAPI.DeleteBucket(bucket); err != nil {
|
||||
if err := objectAPI.DeleteBucket(bucket); err != nil {
|
||||
errorIf(err, "Unable to delete a bucket.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
return
|
||||
|
||||
@@ -47,7 +47,12 @@ func (api objectAPIHandlers) GetBucketNotificationHandler(w http.ResponseWriter,
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
// Attempt to successfully load notification config.
|
||||
nConfig, err := loadNotificationConfig(bucket, api.ObjectAPI)
|
||||
objAPI := api.ObjectAPI()
|
||||
if objAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
nConfig, err := loadNotificationConfig(bucket, objAPI)
|
||||
if err != nil && err != errNoSuchNotifications {
|
||||
errorIf(err, "Unable to read notification configuration.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -86,7 +91,12 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
_, err := api.ObjectAPI.GetBucketInfo(bucket)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
_, err := objectAPI.GetBucketInfo(bucket)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to find bucket info.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -133,7 +143,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
|
||||
|
||||
// Proceed to save notification configuration.
|
||||
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
_, err = api.ObjectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), nil)
|
||||
_, err = objectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), nil)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to write bucket notification configuration.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -220,7 +230,13 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
|
||||
}
|
||||
|
||||
// Validate if bucket exists.
|
||||
_, err := api.ObjectAPI.GetBucketInfo(bucket)
|
||||
objAPI := api.ObjectAPI()
|
||||
if objAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
_, err := objAPI.GetBucketInfo(bucket)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to bucket info.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
|
||||
@@ -294,13 +294,13 @@ func testPutBucketPolicyHandler(obj ObjectLayer, instanceType string, t TestErrH
|
||||
req, err := newTestSignedRequest("PUT", getPutPolicyURL("", testCase.bucketName),
|
||||
int64(len(bucketPolicyStr)), bytes.NewReader([]byte(bucketPolicyStr)), testCase.accessKey, testCase.secretKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: Failed to create HTTP request for PutBucketPolicyHandler: <ERROR> %v", i+1, err)
|
||||
t.Fatalf("Test %d: %s: Failed to create HTTP request for PutBucketPolicyHandler: <ERROR> %v", i+1, instanceType, err)
|
||||
}
|
||||
// Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic ofthe handler.
|
||||
// Call the ServeHTTP to execute the handler.
|
||||
apiRouter.ServeHTTP(rec, req)
|
||||
if rec.Code != testCase.expectedRespStatus {
|
||||
t.Errorf("Test %d: Expected the response status to be `%d`, but instead found `%d`", i+1, testCase.expectedRespStatus, rec.Code)
|
||||
t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,7 +169,6 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
|
||||
if !IsValidBucketName(bucket) {
|
||||
return BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
|
||||
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
||||
if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil {
|
||||
if _, ok := err.(ObjectNotFound); ok {
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
@@ -53,13 +53,57 @@ func fsHouseKeeping(storageDisk StorageAPI) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if a network path is local to this node.
|
||||
func isLocalStorage(networkPath string) bool {
|
||||
if idx := strings.LastIndex(networkPath, ":"); idx != -1 {
|
||||
// e.g 10.0.0.1:9000:/mnt/networkPath
|
||||
netAddr, _ := splitNetPath(networkPath)
|
||||
var netHost string
|
||||
var err error
|
||||
netHost, _, err = net.SplitHostPort(netAddr)
|
||||
if err != nil {
|
||||
netHost = netAddr
|
||||
}
|
||||
// Resolve host to address to check if the IP is loopback.
|
||||
// If address resolution fails, assume it's a non-local host.
|
||||
addrs, err := net.LookupHost(netHost)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
if ip := net.ParseIP(addr); ip.IsLoopback() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
iaddrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
for _, iaddr := range iaddrs {
|
||||
ip, _, err := net.ParseCIDR(iaddr.String())
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if ip.String() == addr {
|
||||
return true
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Depending on the disk type network or local, initialize storage API.
|
||||
func newStorageAPI(disk string) (storage StorageAPI, err error) {
|
||||
if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" {
|
||||
// Initialize filesystem storage API.
|
||||
if isLocalStorage(disk) {
|
||||
if idx := strings.LastIndex(disk, ":"); idx != -1 {
|
||||
return newPosix(disk[idx+1:])
|
||||
}
|
||||
return newPosix(disk)
|
||||
}
|
||||
// Initialize rpc client storage API.
|
||||
return newRPCClient(disk)
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,12 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
// Fetch object stat info.
|
||||
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info.")
|
||||
apiErr := toAPIErrorCode(err)
|
||||
@@ -161,7 +166,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
})
|
||||
|
||||
// Reads the object at startOffset and writes to mw.
|
||||
if err := api.ObjectAPI.GetObject(bucket, object, startOffset, length, writer); err != nil {
|
||||
if err := objectAPI.GetObject(bucket, object, startOffset, length, writer); err != nil {
|
||||
errorIf(err, "Unable to write to client.")
|
||||
if !dataWritten {
|
||||
// Error response only if no data has been written to client yet. i.e if
|
||||
@@ -208,7 +213,12 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
}
|
||||
|
||||
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info.")
|
||||
apiErr := toAPIErrorCode(err)
|
||||
@@ -289,7 +299,12 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
objInfo, err := api.ObjectAPI.GetObjectInfo(sourceBucket, sourceObject)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
objInfo, err := objectAPI.GetObjectInfo(sourceBucket, sourceObject)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), objectSource)
|
||||
@@ -311,7 +326,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
go func() {
|
||||
startOffset := int64(0) // Read the whole file.
|
||||
// Get the object.
|
||||
gErr := api.ObjectAPI.GetObject(sourceBucket, sourceObject, startOffset, objInfo.Size, pipeWriter)
|
||||
gErr := objectAPI.GetObject(sourceBucket, sourceObject, startOffset, objInfo.Size, pipeWriter)
|
||||
if gErr != nil {
|
||||
errorIf(gErr, "Unable to read an object.")
|
||||
pipeWriter.CloseWithError(gErr)
|
||||
@@ -332,7 +347,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
// same md5sum as the source.
|
||||
|
||||
// Create the object.
|
||||
md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, pipeReader, metadata)
|
||||
md5Sum, err := objectAPI.PutObject(bucket, object, size, pipeReader, metadata)
|
||||
if err != nil {
|
||||
// Close the this end of the pipe upon error in PutObject.
|
||||
pipeReader.CloseWithError(err)
|
||||
@@ -343,7 +358,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
// Explicitly close the reader, before fetching object info.
|
||||
pipeReader.Close()
|
||||
|
||||
objInfo, err = api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objInfo, err = objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -418,6 +433,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
// Make sure we hex encode md5sum here.
|
||||
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
var md5Sum string
|
||||
switch rAuthType {
|
||||
default:
|
||||
@@ -431,7 +451,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
// Create anonymous object.
|
||||
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, metadata)
|
||||
md5Sum, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata)
|
||||
case authTypeStreamingSigned:
|
||||
// Initialize stream signature verifier.
|
||||
reader, s3Error := newSignV4ChunkedReader(r)
|
||||
@@ -439,12 +459,12 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
writeErrorResponse(w, r, s3Error, r.URL.Path)
|
||||
return
|
||||
}
|
||||
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||
md5Sum, err = objectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||
case authTypePresigned, authTypeSigned:
|
||||
// Initialize signature verifier.
|
||||
reader := newSignVerify(r)
|
||||
// Create object.
|
||||
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||
md5Sum, err = objectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||
}
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to create an object.")
|
||||
@@ -458,7 +478,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
|
||||
if globalEventNotifier.IsBucketNotificationSet(bucket) {
|
||||
// Fetch object info for notifications.
|
||||
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
|
||||
return
|
||||
@@ -506,7 +526,12 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
|
||||
// Extract metadata that needs to be saved.
|
||||
metadata := extractMetadataFromHeader(r.Header)
|
||||
|
||||
uploadID, err := api.ObjectAPI.NewMultipartUpload(bucket, object, metadata)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
uploadID, err := objectAPI.NewMultipartUpload(bucket, object, metadata)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to initiate new multipart upload id.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -574,6 +599,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
return
|
||||
}
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
var partMD5 string
|
||||
incomingMD5 := hex.EncodeToString(md5Bytes)
|
||||
switch rAuthType {
|
||||
@@ -588,7 +618,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
return
|
||||
}
|
||||
// No need to verify signature, anonymous request access is already allowed.
|
||||
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5)
|
||||
partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5)
|
||||
case authTypeStreamingSigned:
|
||||
// Initialize stream signature verifier.
|
||||
reader, s3Error := newSignV4ChunkedReader(r)
|
||||
@@ -596,11 +626,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
writeErrorResponse(w, r, s3Error, r.URL.Path)
|
||||
return
|
||||
}
|
||||
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5)
|
||||
partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5)
|
||||
case authTypePresigned, authTypeSigned:
|
||||
// Initialize signature verifier.
|
||||
reader := newSignVerify(r)
|
||||
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5)
|
||||
partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5)
|
||||
}
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to create object part.")
|
||||
@@ -638,8 +668,13 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter,
|
||||
}
|
||||
}
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
||||
if err := api.ObjectAPI.AbortMultipartUpload(bucket, object, uploadID); err != nil {
|
||||
if err := objectAPI.AbortMultipartUpload(bucket, object, uploadID); err != nil {
|
||||
errorIf(err, "Unable to abort multipart upload.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
return
|
||||
@@ -680,7 +715,12 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
|
||||
writeErrorResponse(w, r, ErrInvalidMaxParts, r.URL.Path)
|
||||
return
|
||||
}
|
||||
listPartsInfo, err := api.ObjectAPI.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
listPartsInfo, err := objectAPI.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to list uploaded parts.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@@ -761,10 +801,15 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
return
|
||||
}
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
doneCh := make(chan struct{})
|
||||
// Signal that completeMultipartUpload is over via doneCh
|
||||
go func(doneCh chan<- struct{}) {
|
||||
md5Sum, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||
md5Sum, err = objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||
doneCh <- struct{}{}
|
||||
}(doneCh)
|
||||
|
||||
@@ -799,7 +844,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
|
||||
if globalEventNotifier.IsBucketNotificationSet(bucket) {
|
||||
// Fetch object info for notifications.
|
||||
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
|
||||
return
|
||||
@@ -842,10 +887,15 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
|
||||
return
|
||||
}
|
||||
}
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
/// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
|
||||
/// Ignore delete object errors, since we are suppposed to reply
|
||||
/// only 204.
|
||||
if err := api.ObjectAPI.DeleteObject(bucket, object); err != nil {
|
||||
if err := objectAPI.DeleteObject(bucket, object); err != nil {
|
||||
writeSuccessNoContent(w)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
// Initialize FS object layer.
|
||||
return newFSObjects(exportPath)
|
||||
}
|
||||
// TODO: use dsync to block other concurrently booting up nodes.
|
||||
// Initialize XL object layer.
|
||||
objAPI, err := newXLObjects(disks, ignoredDisks)
|
||||
if err == errXLWriteQuorum {
|
||||
@@ -40,65 +41,75 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
return objAPI, err
|
||||
}
|
||||
|
||||
func newObjectLayerFactory(disks, ignoredDisks []string) func() ObjectLayer {
|
||||
var objAPI ObjectLayer
|
||||
// FIXME: This needs to be go-routine safe.
|
||||
return func() ObjectLayer {
|
||||
var err error
|
||||
if objAPI != nil {
|
||||
return objAPI
|
||||
}
|
||||
objAPI, err = newObjectLayer(disks, ignoredDisks)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// Migrate bucket policy from configDir to .minio.sys/buckets/
|
||||
err = migrateBucketPolicyConfig(objAPI)
|
||||
fatalIf(err, "Unable to migrate bucket policy from config directory")
|
||||
|
||||
err = cleanupOldBucketPolicyConfigs()
|
||||
fatalIf(err, "Unable to clean up bucket policy from config directory.")
|
||||
|
||||
// Initialize and monitor shutdown signals.
|
||||
err = initGracefulShutdown(os.Exit)
|
||||
fatalIf(err, "Unable to initialize graceful shutdown operation")
|
||||
|
||||
// Register the callback that should be called when the process shuts down.
|
||||
globalShutdownCBs.AddObjectLayerCB(func() errCode {
|
||||
if sErr := objAPI.Shutdown(); sErr != nil {
|
||||
return exitFailure
|
||||
}
|
||||
return exitSuccess
|
||||
})
|
||||
|
||||
// Initialize a new event notifier.
|
||||
err = initEventNotifier(objAPI)
|
||||
fatalIf(err, "Unable to initialize event notification queue")
|
||||
|
||||
// Initialize and load bucket policies.
|
||||
err = initBucketPolicies(objAPI)
|
||||
fatalIf(err, "Unable to load all bucket policies")
|
||||
|
||||
return objAPI
|
||||
}
|
||||
}
|
||||
|
||||
// configureServer handler returns final handler for the http server.
|
||||
func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
|
||||
// Initialize name space lock.
|
||||
// Initialize Namespace locking.
|
||||
initNSLock()
|
||||
|
||||
objAPI, err := newObjectLayer(srvCmdConfig.disks, srvCmdConfig.ignoredDisks)
|
||||
fatalIf(err, "Unable to intialize object layer.")
|
||||
|
||||
// Migrate bucket policy from configDir to .minio.sys/buckets/
|
||||
err = migrateBucketPolicyConfig(objAPI)
|
||||
fatalIf(err, "Unable to migrate bucket policy from config directory")
|
||||
|
||||
err = cleanupOldBucketPolicyConfigs()
|
||||
fatalIf(err, "Unable to clean up bucket policy from config directory.")
|
||||
|
||||
// Initialize storage rpc server.
|
||||
storageRPC, err := newRPCServer(srvCmdConfig.disks[0]) // FIXME: should only have one path.
|
||||
// Initialize storage rpc servers for every disk that is hosted on this node.
|
||||
storageRPCs, err := newRPCServer(srvCmdConfig)
|
||||
fatalIf(err, "Unable to initialize storage RPC server.")
|
||||
|
||||
newObjectLayerFn := newObjectLayerFactory(srvCmdConfig.disks, srvCmdConfig.ignoredDisks)
|
||||
// Initialize API.
|
||||
apiHandlers := objectAPIHandlers{
|
||||
ObjectAPI: objAPI,
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
}
|
||||
|
||||
// Initialize Web.
|
||||
webHandlers := &webAPIHandlers{
|
||||
ObjectAPI: objAPI,
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
}
|
||||
|
||||
// Initialize Controller.
|
||||
ctrlHandlers := &controllerAPIHandlers{
|
||||
ObjectAPI: objAPI,
|
||||
}
|
||||
|
||||
// Initialize and monitor shutdown signals.
|
||||
err = initGracefulShutdown(os.Exit)
|
||||
fatalIf(err, "Unable to initialize graceful shutdown operation")
|
||||
|
||||
// Register the callback that should be called when the process shuts down.
|
||||
globalShutdownCBs.AddObjectLayerCB(func() errCode {
|
||||
if sErr := objAPI.Shutdown(); sErr != nil {
|
||||
return exitFailure
|
||||
}
|
||||
return exitSuccess
|
||||
})
|
||||
|
||||
// Initialize a new event notifier.
|
||||
err = initEventNotifier(objAPI)
|
||||
fatalIf(err, "Unable to initialize event notification queue")
|
||||
|
||||
// Initialize a new bucket policies.
|
||||
err = initBucketPolicies(objAPI)
|
||||
fatalIf(err, "Unable to load all bucket policies")
|
||||
|
||||
// Initialize router.
|
||||
mux := router.NewRouter()
|
||||
|
||||
// Register all routers.
|
||||
registerStorageRPCRouter(mux, storageRPC)
|
||||
registerStorageRPCRouters(mux, storageRPCs)
|
||||
initDistributedNSLock(mux, srvCmdConfig)
|
||||
|
||||
// FIXME: till net/rpc auth is brought in "minio control" can be enabled only though
|
||||
// this env variable.
|
||||
|
||||
@@ -19,6 +19,8 @@ package cmd
|
||||
import (
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@@ -78,12 +80,6 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
|
||||
// TODO validate netAddr and netPath.
|
||||
netAddr, netPath := splitNetPath(networkPath)
|
||||
|
||||
// Dial minio rpc storage http path.
|
||||
rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, storageRPCPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize http client.
|
||||
httpClient := &http.Client{
|
||||
// Setting a sensible time out of 6minutes to wait for
|
||||
@@ -93,6 +89,15 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
|
||||
Transport: http.DefaultTransport,
|
||||
}
|
||||
|
||||
// Dial minio rpc storage http path.
|
||||
rpcPath := path.Join(storageRPCPath, netPath)
|
||||
port := getPort(srvConfig.serverAddr)
|
||||
rpcAddr := netAddr + ":" + strconv.Itoa(port)
|
||||
rpcClient, err := rpc.DialHTTPPath("tcp", rpcAddr, rpcPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize network storage.
|
||||
ndisk := &networkStorage{
|
||||
netScheme: "http", // TODO: fix for ssl rpc support.
|
||||
@@ -108,6 +113,9 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
|
||||
|
||||
// MakeVol - make a volume.
|
||||
func (n networkStorage) MakeVol(volume string) error {
|
||||
if n.rpcClient == nil {
|
||||
return errVolumeBusy
|
||||
}
|
||||
reply := GenericReply{}
|
||||
if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil {
|
||||
return toStorageErr(err)
|
||||
@@ -117,6 +125,9 @@ func (n networkStorage) MakeVol(volume string) error {
|
||||
|
||||
// ListVols - List all volumes.
|
||||
func (n networkStorage) ListVols() (vols []VolInfo, err error) {
|
||||
if n.rpcClient == nil {
|
||||
return nil, errVolumeBusy
|
||||
}
|
||||
ListVols := ListVolsReply{}
|
||||
err = n.rpcClient.Call("Storage.ListVolsHandler", "", &ListVols)
|
||||
if err != nil {
|
||||
@@ -127,6 +138,9 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) {
|
||||
|
||||
// StatVol - get current Stat volume info.
|
||||
func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
if n.rpcClient == nil {
|
||||
return VolInfo{}, errVolumeBusy
|
||||
}
|
||||
if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil {
|
||||
return VolInfo{}, toStorageErr(err)
|
||||
}
|
||||
@@ -135,6 +149,9 @@ func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
|
||||
// DeleteVol - Delete a volume.
|
||||
func (n networkStorage) DeleteVol(volume string) error {
|
||||
if n.rpcClient == nil {
|
||||
return errVolumeBusy
|
||||
}
|
||||
reply := GenericReply{}
|
||||
if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil {
|
||||
return toStorageErr(err)
|
||||
@@ -146,6 +163,9 @@ func (n networkStorage) DeleteVol(volume string) error {
|
||||
|
||||
// CreateFile - create file.
|
||||
func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) {
|
||||
if n.rpcClient == nil {
|
||||
return errVolumeBusy
|
||||
}
|
||||
reply := GenericReply{}
|
||||
if err = n.rpcClient.Call("Storage.AppendFileHandler", AppendFileArgs{
|
||||
Vol: volume,
|
||||
@@ -184,6 +204,9 @@ func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) {
|
||||
|
||||
// ReadFile - reads a file.
|
||||
func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) {
|
||||
if n.rpcClient == nil {
|
||||
return 0, errVolumeBusy
|
||||
}
|
||||
if err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{
|
||||
Vol: volume,
|
||||
Path: path,
|
||||
@@ -197,6 +220,9 @@ func (n networkStorage) ReadFile(volume string, path string, offset int64, buffe
|
||||
|
||||
// ListDir - list all entries at prefix.
|
||||
func (n networkStorage) ListDir(volume, path string) (entries []string, err error) {
|
||||
if n.rpcClient == nil {
|
||||
return nil, errVolumeBusy
|
||||
}
|
||||
if err = n.rpcClient.Call("Storage.ListDirHandler", ListDirArgs{
|
||||
Vol: volume,
|
||||
Path: path,
|
||||
@@ -209,6 +235,9 @@ func (n networkStorage) ListDir(volume, path string) (entries []string, err erro
|
||||
|
||||
// DeleteFile - Delete a file at path.
|
||||
func (n networkStorage) DeleteFile(volume, path string) (err error) {
|
||||
if n.rpcClient == nil {
|
||||
return errVolumeBusy
|
||||
}
|
||||
reply := GenericReply{}
|
||||
if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{
|
||||
Vol: volume,
|
||||
@@ -221,6 +250,9 @@ func (n networkStorage) DeleteFile(volume, path string) (err error) {
|
||||
|
||||
// RenameFile - Rename file.
|
||||
func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
||||
if n.rpcClient == nil {
|
||||
return errVolumeBusy
|
||||
}
|
||||
reply := GenericReply{}
|
||||
if err = n.rpcClient.Call("Storage.RenameFileHandler", RenameFileArgs{
|
||||
SrcVol: srcVolume,
|
||||
|
||||
@@ -1,7 +1,25 @@
|
||||
package cmd
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
router "github.com/gorilla/mux"
|
||||
)
|
||||
@@ -10,6 +28,7 @@ import (
|
||||
// disk over a network.
|
||||
type storageServer struct {
|
||||
storage StorageAPI
|
||||
path string
|
||||
}
|
||||
|
||||
/// Volume operations handlers
|
||||
@@ -103,22 +122,50 @@ func (s *storageServer) RenameFileHandler(arg *RenameFileArgs, reply *GenericRep
|
||||
}
|
||||
|
||||
// Initialize new storage rpc.
|
||||
func newRPCServer(exportPath string) (*storageServer, error) {
|
||||
func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err error) {
|
||||
// Initialize posix storage API.
|
||||
storage, err := newPosix(exportPath)
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return nil, err
|
||||
exports := serverConfig.disks
|
||||
ignoredExports := serverConfig.ignoredDisks
|
||||
|
||||
// Save ignored disks in a map
|
||||
skipDisks := make(map[string]bool)
|
||||
for _, ignoredExport := range ignoredExports {
|
||||
skipDisks[ignoredExport] = true
|
||||
}
|
||||
return &storageServer{
|
||||
storage: storage,
|
||||
}, nil
|
||||
for _, export := range exports {
|
||||
if skipDisks[export] {
|
||||
continue
|
||||
}
|
||||
// e.g server:/mnt/disk1
|
||||
if isLocalStorage(export) {
|
||||
if idx := strings.LastIndex(export, ":"); idx != -1 {
|
||||
export = export[idx+1:]
|
||||
}
|
||||
var storage StorageAPI
|
||||
storage, err = newPosix(export)
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return nil, err
|
||||
}
|
||||
if idx := strings.LastIndex(export, ":"); idx != -1 {
|
||||
export = export[idx+1:]
|
||||
}
|
||||
servers = append(servers, &storageServer{
|
||||
storage: storage,
|
||||
path: export,
|
||||
})
|
||||
}
|
||||
}
|
||||
return servers, err
|
||||
}
|
||||
|
||||
// registerStorageRPCRouter - register storage rpc router.
|
||||
func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) {
|
||||
func registerStorageRPCRouters(mux *router.Router, stServers []*storageServer) {
|
||||
storageRPCServer := rpc.NewServer()
|
||||
storageRPCServer.RegisterName("Storage", stServer)
|
||||
storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
|
||||
// Add minio storage routes.
|
||||
storageRouter.Path("/storage").Handler(storageRPCServer)
|
||||
// Create a unique route for each disk exported from this node.
|
||||
for _, stServer := range stServers {
|
||||
storageRPCServer.RegisterName("Storage", stServer)
|
||||
// Add minio storage routes.
|
||||
storageRouter := mux.PathPrefix(reservedBucket).Subrouter()
|
||||
storageRouter.Path(path.Join("/storage", stServer.path)).Handler(storageRPCServer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/minio/cli"
|
||||
)
|
||||
|
||||
var srvConfig serverCmdConfig
|
||||
var serverCmd = cli.Command{
|
||||
Name: "server",
|
||||
Usage: "Start object storage server.",
|
||||
@@ -245,11 +246,13 @@ func serverMain(c *cli.Context) {
|
||||
disks := c.Args()
|
||||
|
||||
// Configure server.
|
||||
handler := configureServerHandler(serverCmdConfig{
|
||||
srvConfig = serverCmdConfig{
|
||||
serverAddr: serverAddress,
|
||||
disks: disks,
|
||||
ignoredDisks: ignoredDisks,
|
||||
})
|
||||
}
|
||||
// Configure server.
|
||||
handler := configureServerHandler(srvConfig)
|
||||
|
||||
apiServer := NewServerMux(serverAddress, handler)
|
||||
|
||||
|
||||
@@ -59,3 +59,6 @@ var errVolumeAccessDenied = errors.New("volume access denied")
|
||||
|
||||
// errVolumeAccessDenied - cannot access file, insufficient permissions.
|
||||
var errFileAccessDenied = errors.New("file access denied")
|
||||
|
||||
// errVolumeBusy - remote disk is not connected to yet.
|
||||
var errVolumeBusy = errors.New("volume is busy")
|
||||
|
||||
@@ -888,7 +888,7 @@ func initTestAPIEndPoints(objLayer ObjectLayer, apiFunctions []string) http.Hand
|
||||
// All object storage operations are registered as HTTP handlers on `objectAPIHandlers`.
|
||||
// When the handlers get a HTTP request they use the underlyting ObjectLayer to perform operations.
|
||||
api := objectAPIHandlers{
|
||||
ObjectAPI: objLayer,
|
||||
ObjectAPI: func() ObjectLayer { return objLayer },
|
||||
}
|
||||
// API Router.
|
||||
apiRouter := muxRouter.NewRoute().PathPrefix("/").Subrouter()
|
||||
|
||||
@@ -19,6 +19,7 @@ package cmd
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
@@ -124,7 +125,11 @@ func (web *webAPIHandlers) StorageInfo(r *http.Request, args *GenericArgs, reply
|
||||
return &json2.Error{Message: "Unauthorized request"}
|
||||
}
|
||||
reply.UIVersion = miniobrowser.UIVersion
|
||||
reply.StorageInfo = web.ObjectAPI.StorageInfo()
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
return &json2.Error{Message: "Volume not found"}
|
||||
}
|
||||
reply.StorageInfo = objectAPI.StorageInfo()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -139,7 +144,11 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep
|
||||
return &json2.Error{Message: "Unauthorized request"}
|
||||
}
|
||||
reply.UIVersion = miniobrowser.UIVersion
|
||||
if err := web.ObjectAPI.MakeBucket(args.BucketName); err != nil {
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
return &json2.Error{Message: "Volume not found"}
|
||||
}
|
||||
if err := objectAPI.MakeBucket(args.BucketName); err != nil {
|
||||
return &json2.Error{Message: err.Error()}
|
||||
}
|
||||
return nil
|
||||
@@ -164,7 +173,11 @@ func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, re
|
||||
if !isJWTReqAuthenticated(r) {
|
||||
return &json2.Error{Message: "Unauthorized request"}
|
||||
}
|
||||
buckets, err := web.ObjectAPI.ListBuckets()
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
return &json2.Error{Message: "Volume not found"}
|
||||
}
|
||||
buckets, err := objectAPI.ListBuckets()
|
||||
if err != nil {
|
||||
return &json2.Error{Message: err.Error()}
|
||||
}
|
||||
@@ -212,7 +225,11 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r
|
||||
return &json2.Error{Message: "Unauthorized request"}
|
||||
}
|
||||
for {
|
||||
lo, err := web.ObjectAPI.ListObjects(args.BucketName, args.Prefix, marker, "/", 1000)
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
return &json2.Error{Message: "Volume not found"}
|
||||
}
|
||||
lo, err := objectAPI.ListObjects(args.BucketName, args.Prefix, marker, "/", 1000)
|
||||
if err != nil {
|
||||
return &json2.Error{Message: err.Error()}
|
||||
}
|
||||
@@ -250,7 +267,11 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
|
||||
return &json2.Error{Message: "Unauthorized request"}
|
||||
}
|
||||
reply.UIVersion = miniobrowser.UIVersion
|
||||
if err := web.ObjectAPI.DeleteObject(args.BucketName, args.ObjectName); err != nil {
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
return &json2.Error{Message: "Volume not found"}
|
||||
}
|
||||
if err := objectAPI.DeleteObject(args.BucketName, args.ObjectName); err != nil {
|
||||
return &json2.Error{Message: err.Error()}
|
||||
}
|
||||
return nil
|
||||
@@ -384,13 +405,18 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
// Extract incoming metadata if any.
|
||||
metadata := extractMetadataFromHeader(r.Header)
|
||||
|
||||
if _, err := web.ObjectAPI.PutObject(bucket, object, -1, r.Body, metadata); err != nil {
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeWebErrorResponse(w, errors.New("Volume not found"))
|
||||
return
|
||||
}
|
||||
if _, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata); err != nil {
|
||||
writeWebErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch object info for notifications.
|
||||
objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
|
||||
return
|
||||
@@ -435,13 +461,18 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
|
||||
// Add content disposition.
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object)))
|
||||
|
||||
objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeWebErrorResponse(w, errors.New("Volume not found"))
|
||||
return
|
||||
}
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
writeWebErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
offset := int64(0)
|
||||
err = web.ObjectAPI.GetObject(bucket, object, offset, objInfo.Size, w)
|
||||
err = objectAPI.GetObject(bucket, object, offset, objInfo.Size, w)
|
||||
if err != nil {
|
||||
/// No need to print error, response writer already written to.
|
||||
return
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
|
||||
// webAPI container for Web API.
|
||||
type webAPIHandlers struct {
|
||||
ObjectAPI ObjectLayer
|
||||
ObjectAPI func() ObjectLayer
|
||||
}
|
||||
|
||||
// indexHandler - Handler to serve index.html
|
||||
|
||||
@@ -118,6 +118,10 @@ func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
// to handle these errors internally.
|
||||
storageDisks[index], err = newStorageAPI(disk)
|
||||
if err != nil && err != errDiskNotFound {
|
||||
switch diskType := storageDisks[index].(type) {
|
||||
case networkStorage:
|
||||
diskType.rpcClient.Close()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user