From 0d3710cb9b2a280e503221b8ead865f07166e23a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 8 Jun 2015 12:08:01 -0700 Subject: [PATCH 1/2] Store in json file instead of gob for fs metadata --- pkg/storage/drivers/fs/fs_multipart.go | 22 +++++++++++----------- pkg/storage/drivers/fs/fs_object.go | 8 ++++---- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/storage/drivers/fs/fs_multipart.go b/pkg/storage/drivers/fs/fs_multipart.go index ab048efa3..b23c6a49d 100644 --- a/pkg/storage/drivers/fs/fs_multipart.go +++ b/pkg/storage/drivers/fs/fs_multipart.go @@ -5,8 +5,8 @@ import ( "crypto/md5" "crypto/sha512" "encoding/base64" - "encoding/gob" "encoding/hex" + "encoding/json" "errors" "fmt" "io" @@ -73,7 +73,7 @@ func (fs *fsDriver) ListMultipartUploads(bucket string, resources drivers.Bucket defer activeSessionFile.Close() var deserializedActiveSession map[string]*MultipartSession - decoder := gob.NewDecoder(activeSessionFile) + decoder := json.NewDecoder(activeSessionFile) err = decoder.Decode(&deserializedActiveSession) if err != nil { return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) @@ -192,12 +192,12 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, mpartSession.Parts = parts fs.multiparts.ActiveSession[key] = mpartSession - encoder := gob.NewEncoder(file) + encoder := json.NewEncoder(file) err = encoder.Encode(mpartSession) if err != nil { return "", iodine.New(err, nil) } - encoder = gob.NewEncoder(activeSessionFile) + encoder = json.NewEncoder(activeSessionFile) err = encoder.Encode(fs.multiparts.ActiveSession) if err != nil { return "", iodine.New(err, nil) @@ -329,7 +329,7 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c defer multiPartfile.Close() var deserializedMultipartSession MultipartSession - decoder := gob.NewDecoder(multiPartfile) + decoder := json.NewDecoder(multiPartfile) err = decoder.Decode(&deserializedMultipartSession) if err != nil { return "", iodine.New(err, nil) @@ -339,7 +339,7 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c fs.multiparts.ActiveSession[uploadID] = &deserializedMultipartSession sort.Sort(partNumber(deserializedMultipartSession.Parts)) - encoder := gob.NewEncoder(multiPartfile) + encoder := json.NewEncoder(multiPartfile) err = encoder.Encode(&deserializedMultipartSession) if err != nil { return "", iodine.New(err, nil) @@ -447,8 +447,8 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts ContentType: "application/octet-stream", Md5sum: h.Sum(nil), } - // serialize metadata to gob - encoder := gob.NewEncoder(file) + // serialize metadata to json + encoder := json.NewEncoder(file) err = encoder.Encode(metadata) if err != nil { return "", iodine.New(err, nil) @@ -459,7 +459,7 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts return "", iodine.New(err, nil) } defer activeSessionFile.Close() - encoder = gob.NewEncoder(activeSessionFile) + encoder = json.NewEncoder(activeSessionFile) err = encoder.Encode(fs.multiparts.ActiveSession) if err != nil { return "", iodine.New(err, nil) @@ -514,7 +514,7 @@ func (fs *fsDriver) ListObjectParts(bucket, key string, resources drivers.Object defer multiPartfile.Close() var deserializedMultipartSession MultipartSession - decoder := gob.NewDecoder(multiPartfile) + decoder := json.NewDecoder(multiPartfile) err = decoder.Decode(&deserializedMultipartSession) if err != nil { return drivers.ObjectResourcesMetadata{}, iodine.New(err, nil) @@ -570,7 +570,7 @@ func (fs *fsDriver) AbortMultipartUpload(bucket, key, uploadID string) error { } var deserializedMultipartSession MultipartSession - decoder := gob.NewDecoder(multiPartfile) + decoder := json.NewDecoder(multiPartfile) err = decoder.Decode(&deserializedMultipartSession) if err != nil { return iodine.New(err, nil) diff --git a/pkg/storage/drivers/fs/fs_object.go b/pkg/storage/drivers/fs/fs_object.go index e17ff5657..5437e3dae 100644 --- a/pkg/storage/drivers/fs/fs_object.go +++ b/pkg/storage/drivers/fs/fs_object.go @@ -25,8 +25,8 @@ import ( "crypto/md5" "encoding/base64" - "encoding/gob" "encoding/hex" + "encoding/json" "errors" "github.com/minio/minio/pkg/iodine" @@ -154,7 +154,7 @@ func (fs *fsDriver) GetObjectMetadata(bucket, object string) (drivers.ObjectMeta } var deserializedMetadata Metadata - decoder := gob.NewDecoder(file) + decoder := json.NewDecoder(file) err = decoder.Decode(&deserializedMetadata) if err != nil { return drivers.ObjectMetadata{}, iodine.New(err, nil) @@ -280,8 +280,8 @@ func (fs *fsDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string ContentType: contentType, Md5sum: h.Sum(nil), } - // serialize metadata to gob - encoder := gob.NewEncoder(file) + // serialize metadata to json + encoder := json.NewEncoder(file) err = encoder.Encode(metadata) md5Sum := hex.EncodeToString(metadata.Md5sum) From e08d59be005d4f20c797d573963650f0f30dfd91 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 8 Jun 2015 12:25:49 -0700 Subject: [PATCH 2/2] Make sure to delete uploadid's from active session file properly --- pkg/storage/drivers/fs/fs_multipart.go | 38 +++++++++++++------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/storage/drivers/fs/fs_multipart.go b/pkg/storage/drivers/fs/fs_multipart.go index b23c6a49d..de4a3ac47 100644 --- a/pkg/storage/drivers/fs/fs_multipart.go +++ b/pkg/storage/drivers/fs/fs_multipart.go @@ -142,21 +142,6 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, if err != nil { return "", iodine.New(drivers.InternalError{}, nil) } - - var activeSessionFile *os.File - if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) { - activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return "", iodine.New(err, nil) - } - } else { - activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600) - if err != nil { - return "", iodine.New(err, nil) - } - } - defer activeSessionFile.Close() - objectPath := path.Join(bucketPath, key) objectDir := path.Dir(objectPath) if _, err := os.Stat(objectDir); os.IsNotExist(err) { @@ -174,6 +159,20 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, }, nil) } + var activeSessionFile *os.File + if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) { + activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + } else { + activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + } + defer activeSessionFile.Close() + id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] @@ -336,7 +335,7 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c } deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata) deserializedMultipartSession.TotalParts++ - fs.multiparts.ActiveSession[uploadID] = &deserializedMultipartSession + fs.multiparts.ActiveSession[key] = &deserializedMultipartSession sort.Sort(partNumber(deserializedMultipartSession.Parts)) encoder := json.NewEncoder(multiPartfile) @@ -425,7 +424,7 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts } md5sum := hex.EncodeToString(h.Sum(nil)) - delete(fs.multiparts.ActiveSession, uploadID) + delete(fs.multiparts.ActiveSession, key) for partNumber := range parts { err = os.Remove(objectPath + fmt.Sprintf("$%d", partNumber)) if err != nil { @@ -454,11 +453,12 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts return "", iodine.New(err, nil) } - activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600) + activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_TRUNC, 0600) if err != nil { return "", iodine.New(err, nil) } defer activeSessionFile.Close() + fmt.Println(fs.multiparts.ActiveSession) encoder = json.NewEncoder(activeSessionFile) err = encoder.Encode(fs.multiparts.ActiveSession) if err != nil { @@ -577,7 +577,7 @@ func (fs *fsDriver) AbortMultipartUpload(bucket, key, uploadID string) error { } multiPartfile.Close() // close it right here, since we will delete it subsequently - delete(fs.multiparts.ActiveSession, uploadID) + delete(fs.multiparts.ActiveSession, key) for _, part := range deserializedMultipartSession.Parts { err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber)) if err != nil {