From 097f70a3c56fd6b48dbc2d2477ac29623bee5091 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 1 Jun 2015 20:17:01 -0700 Subject: [PATCH] Enable filesystem for API tests, this patch also implements ListMultipartUploads() --- pkg/api/api_test.go | 33 ++-- pkg/storage/drivers/fs/fs_bucket.go | 19 +- pkg/storage/drivers/fs/fs_multipart.go | 231 +++++++++++++++++++------ pkg/storage/drivers/fs/fs_object.go | 1 - 4 files changed, 210 insertions(+), 74 deletions(-) diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index b0cd0a23d..b53dc2de6 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -35,6 +35,7 @@ import ( "github.com/minio/minio/pkg/featureflags" "github.com/minio/minio/pkg/storage/drivers" "github.com/minio/minio/pkg/storage/drivers/donut" + "github.com/minio/minio/pkg/storage/drivers/fs" "github.com/minio/minio/pkg/storage/drivers/memory" "github.com/minio/minio/pkg/storage/drivers/mocks" "github.com/stretchr/testify/mock" @@ -74,6 +75,14 @@ var _ = Suite(&MySuite{ }, }) +var _ = Suite(&MySuite{ + initDriver: func() (drivers.Driver, string) { + root, _ := ioutil.TempDir(os.TempDir(), "minio-fs-api") + _, _, driver := filesystem.Start(root) + return driver, root + }, +}) + func (s *MySuite) SetUpSuite(c *C) { driver, root := s.initDriver() if root != "" { @@ -1440,9 +1449,7 @@ func (s *MySuite) TestObjectMultipartAbort(c *C) { default: // Donut doesn't have multipart support yet { - if reflect.TypeOf(driver).String() == "*memory.memoryDriver" { - - } else { + if reflect.TypeOf(driver).String() == "*donut.donutDriver" { return } } @@ -1517,7 +1524,7 @@ func (s *MySuite) TestObjectMultipartAbort(c *C) { c.Assert(response3.StatusCode, Equals, http.StatusNoContent) } -func (s *MySuite) TestBuckeMultipartList(c *C) { +func (s *MySuite) TestBucketMultipartList(c *C) { switch driver := s.Driver.(type) { case *mocks.Driver: { @@ -1526,9 +1533,7 @@ func (s *MySuite) TestBuckeMultipartList(c *C) { default: // Donut doesn't have multipart support yet { - if reflect.TypeOf(driver).String() == "*memory.memoryDriver" { - - } else { + if reflect.TypeOf(driver).String() == "*donut.donutDriver" { return } } @@ -1601,6 +1606,12 @@ func (s *MySuite) TestBuckeMultipartList(c *C) { response3, err := client.Do(request) c.Assert(err, IsNil) c.Assert(response3.StatusCode, Equals, http.StatusOK) + + decoder = xml.NewDecoder(response3.Body) + newResponse3 := &ListMultipartUploadsResponse{} + err = decoder.Decode(newResponse3) + c.Assert(err, IsNil) + c.Assert(newResponse3.Bucket, Equals, "foo") } func (s *MySuite) TestObjectMultipartList(c *C) { @@ -1612,9 +1623,7 @@ func (s *MySuite) TestObjectMultipartList(c *C) { default: // Donut doesn't have multipart support yet { - if reflect.TypeOf(driver).String() == "*memory.memoryDriver" { - - } else { + if reflect.TypeOf(driver).String() == "*donut.donutDriver" { return } } @@ -1699,9 +1708,7 @@ func (s *MySuite) TestObjectMultipart(c *C) { default: // Donut doesn't have multipart support yet { - if reflect.TypeOf(driver).String() == "*memory.memoryDriver" { - - } else { + if reflect.TypeOf(driver).String() == "*donut.donutDriver" { return } } diff --git a/pkg/storage/drivers/fs/fs_bucket.go b/pkg/storage/drivers/fs/fs_bucket.go index 003b73b0a..25b1cf277 100644 --- a/pkg/storage/drivers/fs/fs_bucket.go +++ b/pkg/storage/drivers/fs/fs_bucket.go @@ -97,17 +97,20 @@ func (fs *fsDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, er // get bucket path bucketDir := path.Join(fs.root, bucket) bucketMetadata := drivers.BucketMetadata{} - fi, err := os.Stat(bucketDir) // check if bucket exists - if err == nil { - bucketMetadata.Name = fi.Name() - bucketMetadata.Created = fi.ModTime() - // TODO convert os.FileMode to meaningful ACL's - bucketMetadata.ACL = drivers.BucketACL("private") - return bucketMetadata, nil + if os.IsNotExist(err) { + return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } - return drivers.BucketMetadata{}, iodine.New(err, nil) + if err != nil { + return drivers.BucketMetadata{}, iodine.New(err, nil) + } + + bucketMetadata.Name = fi.Name() + bucketMetadata.Created = fi.ModTime() + // TODO convert os.FileMode to meaningful ACL's + bucketMetadata.ACL = drivers.BucketACL("private") + return bucketMetadata, nil } // aclToPerm - convert acl to filesystem mode diff --git a/pkg/storage/drivers/fs/fs_multipart.go b/pkg/storage/drivers/fs/fs_multipart.go index e73ddffef..ab048efa3 100644 --- a/pkg/storage/drivers/fs/fs_multipart.go +++ b/pkg/storage/drivers/fs/fs_multipart.go @@ -36,16 +36,96 @@ type Multiparts struct { ActiveSession map[string]*MultipartSession } +// byKey is a sortable interface for UploadMetadata slice +type byKey []*drivers.UploadMetadata + +func (a byKey) Len() int { return len(a) } +func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } + func (fs *fsDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { fs.lock.Lock() defer fs.lock.Unlock() - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) + if !drivers.IsValidBucket(bucket) { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + bucketPath := path.Join(fs.root, bucket) + _, err := os.Stat(bucketPath) + + // check bucket exists + if os.IsNotExist(err) { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + if err != nil { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.InternalError{}, nil) + } + + if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) { + return drivers.BucketMultipartResourcesMetadata{}, nil + } + if err != nil { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) + } + activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_RDONLY, 0600) + if err != nil { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) + } + defer activeSessionFile.Close() + + var deserializedActiveSession map[string]*MultipartSession + decoder := gob.NewDecoder(activeSessionFile) + err = decoder.Decode(&deserializedActiveSession) + if err != nil { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) + } + var uploads []*drivers.UploadMetadata + for key, session := range deserializedActiveSession { + if strings.HasPrefix(key, resources.Prefix) { + if len(uploads) > resources.MaxUploads { + sort.Sort(byKey(uploads)) + resources.Upload = uploads + resources.NextKeyMarker = key + resources.NextUploadIDMarker = session.UploadID + resources.IsTruncated = true + return resources, nil + } + // uploadIDMarker is ignored if KeyMarker is empty + switch { + case resources.KeyMarker != "" && resources.UploadIDMarker == "": + if key > resources.KeyMarker { + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.UploadID + upload.Initiated = session.Initiated + uploads = append(uploads, upload) + } + case resources.KeyMarker != "" && resources.UploadIDMarker != "": + if session.UploadID > resources.UploadIDMarker { + if key >= resources.KeyMarker { + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.UploadID + upload.Initiated = session.Initiated + uploads = append(uploads, upload) + } + } + default: + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.UploadID + upload.Initiated = session.Initiated + uploads = append(uploads, upload) + } + } + } + sort.Sort(byKey(uploads)) + resources.Upload = uploads + return resources, nil } func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { fs.lock.Lock() defer fs.lock.Unlock() - if !drivers.IsValidBucket(bucket) { return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } @@ -55,7 +135,6 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, bucketPath := path.Join(fs.root, bucket) _, err := os.Stat(bucketPath) - // check bucket exists if os.IsNotExist(err) { return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) @@ -64,6 +143,20 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, return "", iodine.New(drivers.InternalError{}, nil) } + var activeSessionFile *os.File + if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) { + activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + } else { + activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + } + defer activeSessionFile.Close() + objectPath := path.Join(bucketPath, key) objectDir := path.Dir(objectPath) if _, err := os.Stat(objectDir); os.IsNotExist(err) { @@ -94,23 +187,34 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, mpartSession := new(MultipartSession) mpartSession.TotalParts = 0 mpartSession.UploadID = uploadID - mpartSession.Initiated = time.Now() + mpartSession.Initiated = time.Now().UTC() var parts []*drivers.PartMetadata mpartSession.Parts = parts - fs.multiparts.ActiveSession[uploadID] = mpartSession + fs.multiparts.ActiveSession[key] = mpartSession - // serialize metadata to gob encoder := gob.NewEncoder(file) err = encoder.Encode(mpartSession) if err != nil { return "", iodine.New(err, nil) } + encoder = gob.NewEncoder(activeSessionFile) + err = encoder.Encode(fs.multiparts.ActiveSession) + if err != nil { + return "", iodine.New(err, nil) + } + return uploadID, nil } -func (fs *fsDriver) isValidUploadID(uploadID string) bool { - _, ok := fs.multiparts.ActiveSession[uploadID] - return ok +func (fs *fsDriver) isValidUploadID(key, uploadID string) bool { + s, ok := fs.multiparts.ActiveSession[key] + if !ok { + return false + } + if uploadID == s.UploadID { + return true + } + return false } func (fs *fsDriver) writePart(objectPath string, partID int, size int64, data io.Reader) (drivers.PartMetadata, error) { @@ -166,7 +270,7 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c return "", iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) } - if !fs.isValidUploadID(uploadID) { + if !fs.isValidUploadID(key, uploadID) { return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) } @@ -243,6 +347,34 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c return partMetadata.ETag, nil } +func (fs *fsDriver) concatParts(parts map[int]string, objectPath string, mw io.Writer) error { + for i := 1; i <= len(parts); i++ { + recvMD5 := parts[i] + partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", i), os.O_RDONLY, 0600) + if err != nil { + return iodine.New(err, nil) + } + obj, err := ioutil.ReadAll(partFile) + if err != nil { + return iodine.New(err, nil) + } + calcMD5Bytes := md5.Sum(obj) + // complete multi part request header md5sum per part is hex encoded + recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) + if err != nil { + return iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) + } + if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { + return iodine.New(drivers.BadDigest{Md5: recvMD5}, nil) + } + _, err = io.Copy(mw, bytes.NewBuffer(obj)) + if err != nil { + return iodine.New(err, nil) + } + } + return nil +} + func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -257,7 +389,7 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts return "", iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) } - if !fs.isValidUploadID(uploadID) { + if !fs.isValidUploadID(key, uploadID) { return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) } @@ -287,53 +419,48 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts defer file.Close() h := md5.New() mw := io.MultiWriter(file, h) - - for i := 1; i <= len(parts); i++ { - recvMD5 := parts[i] - partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", i), os.O_RDONLY, 0600) - if err != nil { - return "", iodine.New(err, nil) - } - obj, err := ioutil.ReadAll(partFile) - if err != nil { - return "", iodine.New(err, nil) - } - calcMD5Bytes := md5.Sum(obj) - // complete multi part request header md5sum per part is hex encoded - recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) - if err != nil { - return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) - } - if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { - return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: key}, nil) - } - _, err = io.Copy(mw, bytes.NewBuffer(obj)) - if err != nil { - return "", iodine.New(err, nil) - } + err = fs.concatParts(parts, objectPath, mw) + if err != nil { + return "", iodine.New(err, nil) } md5sum := hex.EncodeToString(h.Sum(nil)) - multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR, 0600) - if err != nil { - return "", iodine.New(err, nil) - } - var deserializedMultipartSession MultipartSession - decoder := gob.NewDecoder(multiPartfile) - err = decoder.Decode(&deserializedMultipartSession) - if err != nil { - return "", iodine.New(err, nil) - } - multiPartfile.Close() // close it right here, since we will delete it subsequently - delete(fs.multiparts.ActiveSession, uploadID) - for _, part := range deserializedMultipartSession.Parts { - err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber)) + for partNumber := range parts { + err = os.Remove(objectPath + fmt.Sprintf("$%d", partNumber)) if err != nil { return "", iodine.New(err, nil) } } - err = os.RemoveAll(objectPath + "$multiparts") + err = os.Remove(objectPath + "$multiparts") + if err != nil { + return "", iodine.New(err, nil) + } + + file, err = os.OpenFile(objectPath+"$metadata", os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + defer file.Close() + + metadata := &Metadata{ + ContentType: "application/octet-stream", + Md5sum: h.Sum(nil), + } + // serialize metadata to gob + encoder := gob.NewEncoder(file) + err = encoder.Encode(metadata) + if err != nil { + return "", iodine.New(err, nil) + } + + activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + defer activeSessionFile.Close() + encoder = gob.NewEncoder(activeSessionFile) + err = encoder.Encode(fs.multiparts.ActiveSession) if err != nil { return "", iodine.New(err, nil) } @@ -354,7 +481,7 @@ func (fs *fsDriver) ListObjectParts(bucket, key string, resources drivers.Object return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) } - if !fs.isValidUploadID(resources.UploadID) { + if !fs.isValidUploadID(key, resources.UploadID) { return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) } @@ -422,7 +549,7 @@ func (fs *fsDriver) AbortMultipartUpload(bucket, key, uploadID string) error { return iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) } - if !fs.isValidUploadID(uploadID) { + if !fs.isValidUploadID(key, uploadID) { return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) } diff --git a/pkg/storage/drivers/fs/fs_object.go b/pkg/storage/drivers/fs/fs_object.go index b599c33b2..e17ff5657 100644 --- a/pkg/storage/drivers/fs/fs_object.go +++ b/pkg/storage/drivers/fs/fs_object.go @@ -94,7 +94,6 @@ func (fs *fsDriver) GetObject(w io.Writer, bucket string, object string) (int64, if drivers.IsValidObjectName(object) == false { return 0, iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: object}, nil) } - objectPath := path.Join(fs.root, bucket, object) filestat, err := os.Stat(objectPath) switch err := err.(type) {