Merge pull request #319 from harshavardhana/pr_out_now_minioapi_handles_content_md5_set_during_putobject_

This commit is contained in:
Harshavardhana 2015-03-17 13:33:31 -07:00
commit c551757415
9 changed files with 118 additions and 33 deletions

View File

@ -61,7 +61,7 @@ func (s *MySuite) TestEmptyObject(c *C) {
buffer := bytes.NewBufferString("") buffer := bytes.NewBufferString("")
storage.CreateBucket("bucket") storage.CreateBucket("bucket")
storage.CreateObject("bucket", "object", "", buffer) storage.CreateObject("bucket", "object", "", "", buffer)
response, err := http.Get(testServer.URL + "/bucket/object") response, err := http.Get(testServer.URL + "/bucket/object")
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -86,7 +86,7 @@ func (s *MySuite) TestObject(c *C) {
buffer := bytes.NewBufferString("hello world") buffer := bytes.NewBufferString("hello world")
storage.CreateBucket("bucket") storage.CreateBucket("bucket")
storage.CreateObject("bucket", "object", "", buffer) storage.CreateObject("bucket", "object", "", "", buffer)
response, err := http.Get(testServer.URL + "/bucket/object") response, err := http.Get(testServer.URL + "/bucket/object")
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -112,9 +112,9 @@ func (s *MySuite) TestMultipleObjects(c *C) {
buffer3 := bytes.NewBufferString("hello three") buffer3 := bytes.NewBufferString("hello three")
storage.CreateBucket("bucket") storage.CreateBucket("bucket")
storage.CreateObject("bucket", "object1", "", buffer1) storage.CreateObject("bucket", "object1", "", "", buffer1)
storage.CreateObject("bucket", "object2", "", buffer2) storage.CreateObject("bucket", "object2", "", "", buffer2)
storage.CreateObject("bucket", "object3", "", buffer3) storage.CreateObject("bucket", "object3", "", "", buffer3)
// test non-existant object // test non-existant object
response, err := http.Get(testServer.URL + "/bucket/object") response, err := http.Get(testServer.URL + "/bucket/object")
@ -204,7 +204,7 @@ func (s *MySuite) TestHeader(c *C) {
buffer := bytes.NewBufferString("hello world") buffer := bytes.NewBufferString("hello world")
storage.CreateBucket("bucket") storage.CreateBucket("bucket")
storage.CreateObject("bucket", "object", "", buffer) storage.CreateObject("bucket", "object", "", "", buffer)
response, err = http.Get(testServer.URL + "/bucket/object") response, err = http.Get(testServer.URL + "/bucket/object")
c.Assert(err, IsNil) c.Assert(err, IsNil)

View File

@ -165,7 +165,9 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
return return
} }
err := server.storage.CreateObject(bucket, object, "", req.Body) // get Content-MD5 sent by client
md5 := req.Header.Get("Content-MD5")
err := server.storage.CreateObject(bucket, object, "", md5, req.Body)
switch err := err.(type) { switch err := err.(type) {
case nil: case nil:
w.Header().Set("Server", "Minio") w.Header().Set("Server", "Minio")
@ -200,6 +202,20 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
w.WriteHeader(error.HTTPStatusCode) w.WriteHeader(error.HTTPStatusCode)
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType)) w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
} }
case mstorage.BadDigest:
{
error := errorCodeError(BadDigest)
errorResponse := getErrorResponse(error, "/"+bucket+"/"+object)
w.WriteHeader(error.HTTPStatusCode)
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
}
case mstorage.InvalidDigest:
{
error := errorCodeError(InvalidDigest)
errorResponse := getErrorResponse(error, "/"+bucket+"/"+object)
w.WriteHeader(error.HTTPStatusCode)
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
}
} }
} }

View File

@ -66,6 +66,13 @@ func (server *minioAPI) putBucketPolicyHandler(w http.ResponseWriter, req *http.
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType)) w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
} }
case mstorage.BackendCorrupted: case mstorage.BackendCorrupted:
{
log.Println(err)
error := errorCodeError(InternalError)
errorResponse := getErrorResponse(error, bucket)
w.WriteHeader(error.HTTPStatusCode)
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
}
case mstorage.ImplementationError: case mstorage.ImplementationError:
{ {
log.Println(err) log.Println(err)
@ -123,6 +130,13 @@ func (server *minioAPI) getBucketPolicyHandler(w http.ResponseWriter, req *http.
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType)) w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
} }
case mstorage.BackendCorrupted: case mstorage.BackendCorrupted:
{
log.Println(err)
error := errorCodeError(InternalError)
errorResponse := getErrorResponse(error, bucket)
w.WriteHeader(error.HTTPStatusCode)
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
}
case mstorage.ImplementationError: case mstorage.ImplementationError:
{ {
log.Println(err) log.Println(err)

View File

@ -18,8 +18,6 @@ package encoded
import ( import (
"bytes" "bytes"
"crypto/md5"
"encoding/hex"
"errors" "errors"
"io" "io"
"sort" "sort"
@ -27,6 +25,9 @@ import (
"strings" "strings"
"time" "time"
"crypto/md5"
"encoding/hex"
"github.com/minio-io/minio/pkg/donutbox" "github.com/minio-io/minio/pkg/donutbox"
"github.com/minio-io/minio/pkg/encoding/erasure" "github.com/minio-io/minio/pkg/encoding/erasure"
"github.com/minio-io/minio/pkg/storage" "github.com/minio-io/minio/pkg/storage"
@ -285,7 +286,7 @@ func beforeDelimiter(inputs []string, delim string) (results []string) {
} }
// CreateObject creates a new object // CreateObject creates a new object
func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string, contentType string, reader io.Reader) error { func (diskStorage StorageDriver) CreateObject(bucketKey, objectKey, contentType, md5sum string, reader io.Reader) error {
// set defaults // set defaults
if contentType == "" { if contentType == "" {
contentType = "application/octet-stream" contentType = "application/octet-stream"

View File

@ -17,12 +17,14 @@
package file package file
import ( import (
"bytes"
"io" "io"
"os" "os"
"path" "path"
"strings" "strings"
"crypto/md5" "crypto/md5"
"encoding/base64"
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
@ -182,7 +184,7 @@ func (storage *Storage) GetObjectMetadata(bucket, object, prefix string) (mstora
} }
// CreateObject - PUT object // CreateObject - PUT object
func (storage *Storage) CreateObject(bucket, key, contentType string, data io.Reader) error { func (storage *Storage) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error {
// TODO Commits should stage then move instead of writing directly // TODO Commits should stage then move instead of writing directly
storage.lock.Lock() storage.lock.Lock()
defer storage.lock.Unlock() defer storage.lock.Unlock()
@ -248,15 +250,27 @@ func (storage *Storage) CreateObject(bucket, key, contentType string, data io.Re
return mstorage.EmbedError(bucket, key, err) return mstorage.EmbedError(bucket, key, err)
} }
// serialize metadata to gob metadata := &Metadata{
encoder := gob.NewEncoder(file)
err = encoder.Encode(&Metadata{
ContentType: contentType, ContentType: contentType,
Md5sum: h.Sum(nil), Md5sum: h.Sum(nil),
}) }
// serialize metadata to gob
encoder := gob.NewEncoder(file)
err = encoder.Encode(metadata)
if err != nil { if err != nil {
return mstorage.EmbedError(bucket, key, err) return mstorage.EmbedError(bucket, key, err)
} }
// Verify data received to be correct, Content-MD5 received
if md5sum != "" {
var data []byte
data, err = base64.StdEncoding.DecodeString(md5sum)
if err != nil {
return mstorage.InvalidDigest{Bucket: bucket, Key: key, Md5: md5sum}
}
if !bytes.Equal(metadata.Md5sum, data) {
return mstorage.BadDigest{Bucket: bucket, Key: key, Md5: md5sum}
}
}
return nil return nil
} }

View File

@ -93,7 +93,7 @@ func (storage *Storage) GetBucketPolicy(bucket string) (mstorage.BucketPolicy, e
} }
// CreateObject - PUT object to memory buffer // CreateObject - PUT object to memory buffer
func (storage *Storage) CreateObject(bucket, key, contentType string, data io.Reader) error { func (storage *Storage) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error {
storage.lock.Lock() storage.lock.Lock()
defer storage.lock.Unlock() defer storage.lock.Unlock()

View File

@ -36,7 +36,7 @@ type Storage interface {
GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error)
GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error) GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error)
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
CreateObject(bucket string, key string, contentType string, data io.Reader) error CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) error
} }
// BucketMetadata - name and create date // BucketMetadata - name and create date

View File

@ -39,10 +39,13 @@ func APITestSuite(c *check.C, create func() Storage) {
testNonExistantObjectInBucket(c, create) testNonExistantObjectInBucket(c, create)
testGetDirectoryReturnsObjectNotFound(c, create) testGetDirectoryReturnsObjectNotFound(c, create)
testDefaultContentType(c, create) testDefaultContentType(c, create)
//testContentMd5Set(c, create) TODO
} }
func testCreateBucket(c *check.C, create func() Storage) { func testCreateBucket(c *check.C, create func() Storage) {
// TODO storage := create()
err := storage.CreateBucket("bucket")
c.Assert(err, check.IsNil)
} }
func testMultipleObjectCreation(c *check.C, create func() Storage) { func testMultipleObjectCreation(c *check.C, create func() Storage) {
@ -58,7 +61,7 @@ func testMultipleObjectCreation(c *check.C, create func() Storage) {
} }
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
objects[key] = []byte(randomString) objects[key] = []byte(randomString)
err := storage.CreateObject("bucket", key, "", bytes.NewBufferString(randomString)) err := storage.CreateObject("bucket", key, "", "", bytes.NewBufferString(randomString))
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
} }
@ -91,7 +94,7 @@ func testPaging(c *check.C, create func() Storage) {
// check before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
storage.CreateObject("bucket", key, "", bytes.NewBufferString(key)) storage.CreateObject("bucket", key, "", "", bytes.NewBufferString(key))
resources.Maxkeys = 5 resources.Maxkeys = 5
objects, resources, err = storage.ListObjects("bucket", resources) objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(len(objects), check.Equals, i+1) c.Assert(len(objects), check.Equals, i+1)
@ -101,7 +104,7 @@ func testPaging(c *check.C, create func() Storage) {
// check after paging occurs pages work // check after paging occurs pages work
for i := 6; i <= 10; i++ { for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
storage.CreateObject("bucket", key, "", bytes.NewBufferString(key)) storage.CreateObject("bucket", key, "", "", bytes.NewBufferString(key))
resources.Maxkeys = 5 resources.Maxkeys = 5
objects, resources, err = storage.ListObjects("bucket", resources) objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(len(objects), check.Equals, 5) c.Assert(len(objects), check.Equals, 5)
@ -110,8 +113,8 @@ func testPaging(c *check.C, create func() Storage) {
} }
// check paging with prefix at end returns less objects // check paging with prefix at end returns less objects
{ {
storage.CreateObject("bucket", "newPrefix", "", bytes.NewBufferString("prefix1")) storage.CreateObject("bucket", "newPrefix", "", "", bytes.NewBufferString("prefix1"))
storage.CreateObject("bucket", "newPrefix2", "", bytes.NewBufferString("prefix2")) storage.CreateObject("bucket", "newPrefix2", "", "", bytes.NewBufferString("prefix2"))
resources.Prefix = "new" resources.Prefix = "new"
resources.Maxkeys = 5 resources.Maxkeys = 5
objects, resources, err = storage.ListObjects("bucket", resources) objects, resources, err = storage.ListObjects("bucket", resources)
@ -132,8 +135,8 @@ func testPaging(c *check.C, create func() Storage) {
// check delimited results with delimiter and prefix // check delimited results with delimiter and prefix
{ {
storage.CreateObject("bucket", "this/is/delimited", "", bytes.NewBufferString("prefix1")) storage.CreateObject("bucket", "this/is/delimited", "", "", bytes.NewBufferString("prefix1"))
storage.CreateObject("bucket", "this/is/also/delimited", "", bytes.NewBufferString("prefix2")) storage.CreateObject("bucket", "this/is/also/delimited", "", "", bytes.NewBufferString("prefix2"))
var prefixes []string var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/" resources.Delimiter = "/"
@ -186,9 +189,9 @@ func testPaging(c *check.C, create func() Storage) {
func testObjectOverwriteFails(c *check.C, create func() Storage) { func testObjectOverwriteFails(c *check.C, create func() Storage) {
storage := create() storage := create()
storage.CreateBucket("bucket") storage.CreateBucket("bucket")
err := storage.CreateObject("bucket", "object", "", bytes.NewBufferString("one")) err := storage.CreateObject("bucket", "object", "", "", bytes.NewBufferString("one"))
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
err = storage.CreateObject("bucket", "object", "", bytes.NewBufferString("three")) err = storage.CreateObject("bucket", "object", "", "", bytes.NewBufferString("three"))
c.Assert(err, check.Not(check.IsNil)) c.Assert(err, check.Not(check.IsNil))
var bytesBuffer bytes.Buffer var bytesBuffer bytes.Buffer
length, err := storage.GetObject(&bytesBuffer, "bucket", "object") length, err := storage.GetObject(&bytesBuffer, "bucket", "object")
@ -199,7 +202,7 @@ func testObjectOverwriteFails(c *check.C, create func() Storage) {
func testNonExistantBucketOperations(c *check.C, create func() Storage) { func testNonExistantBucketOperations(c *check.C, create func() Storage) {
storage := create() storage := create()
err := storage.CreateObject("bucket", "object", "", bytes.NewBufferString("one")) err := storage.CreateObject("bucket", "object", "", "", bytes.NewBufferString("one"))
c.Assert(err, check.Not(check.IsNil)) c.Assert(err, check.Not(check.IsNil))
} }
@ -215,7 +218,7 @@ func testPutObjectInSubdir(c *check.C, create func() Storage) {
storage := create() storage := create()
err := storage.CreateBucket("bucket") err := storage.CreateBucket("bucket")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
err = storage.CreateObject("bucket", "dir1/dir2/object", "", bytes.NewBufferString("hello world")) err = storage.CreateObject("bucket", "dir1/dir2/object", "", "", bytes.NewBufferString("hello world"))
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var bytesBuffer bytes.Buffer var bytesBuffer bytes.Buffer
length, err := storage.GetObject(&bytesBuffer, "bucket", "dir1/dir2/object") length, err := storage.GetObject(&bytesBuffer, "bucket", "dir1/dir2/object")
@ -309,7 +312,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() Storage) {
err := storage.CreateBucket("bucket") err := storage.CreateBucket("bucket")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
err = storage.CreateObject("bucket", "dir1/dir2/object", "", bytes.NewBufferString("hello world")) err = storage.CreateObject("bucket", "dir1/dir2/object", "", "", bytes.NewBufferString("hello world"))
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var byteBuffer bytes.Buffer var byteBuffer bytes.Buffer
@ -353,20 +356,34 @@ func testDefaultContentType(c *check.C, create func() Storage) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
// test empty // test empty
err = storage.CreateObject("bucket", "one", "", bytes.NewBufferString("one")) err = storage.CreateObject("bucket", "one", "", "", bytes.NewBufferString("one"))
metadata, err := storage.GetObjectMetadata("bucket", "one", "") metadata, err := storage.GetObjectMetadata("bucket", "one", "")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/octet-stream") c.Assert(metadata.ContentType, check.Equals, "application/octet-stream")
// test custom // test custom
storage.CreateObject("bucket", "two", "application/text", bytes.NewBufferString("two")) storage.CreateObject("bucket", "two", "application/text", "", bytes.NewBufferString("two"))
metadata, err = storage.GetObjectMetadata("bucket", "two", "") metadata, err = storage.GetObjectMetadata("bucket", "two", "")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/text") c.Assert(metadata.ContentType, check.Equals, "application/text")
// test trim space // test trim space
storage.CreateObject("bucket", "three", "\tapplication/json ", bytes.NewBufferString("three")) storage.CreateObject("bucket", "three", "\tapplication/json ", "", bytes.NewBufferString("three"))
metadata, err = storage.GetObjectMetadata("bucket", "three", "") metadata, err = storage.GetObjectMetadata("bucket", "three", "")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/json") c.Assert(metadata.ContentType, check.Equals, "application/json")
} }
/*
func testContentMd5Set(c *check.C, create func() Storage) {
storage := create()
err := storage.CreateBucket("bucket")
c.Assert(err, check.IsNil)
// test md5 invalid
err = storage.CreateObject("bucket", "one", "", "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA", bytes.NewBufferString("one"))
c.Assert(err, check.Not(check.IsNil))
err = storage.CreateObject("bucket", "two", "", "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA=", bytes.NewBufferString("one"))
c.Assert(err, check.IsNil)
}
*/

View File

@ -47,6 +47,13 @@ type ImplementationError struct {
Err error Err error
} }
// DigestError - Generic Md5 error
type DigestError struct {
Bucket string
Key string
Md5 string
}
/// Bucket related errors /// Bucket related errors
// BucketPolicyNotFound - missing bucket policy // BucketPolicyNotFound - missing bucket policy
@ -72,6 +79,12 @@ type ObjectExists GenericObjectError
// ObjectNameInvalid - object name provided is invalid // ObjectNameInvalid - object name provided is invalid
type ObjectNameInvalid GenericObjectError type ObjectNameInvalid GenericObjectError
// BadDigest - md5 mismatch from data received
type BadDigest DigestError
// InvalidDigest - md5 in request header invalid
type InvalidDigest DigestError
// Return string an error formatted as the given text // Return string an error formatted as the given text
func (e ImplementationError) Error() string { func (e ImplementationError) Error() string {
error := "" error := ""
@ -138,3 +151,13 @@ func (e ObjectNameInvalid) Error() string {
func (e BackendCorrupted) Error() string { func (e BackendCorrupted) Error() string {
return "Backend corrupted: " + e.Path return "Backend corrupted: " + e.Path
} }
// Return string an error formatted as the given text
func (e BadDigest) Error() string {
return "Md5 provided " + e.Md5 + " mismatches for: " + e.Bucket + "#" + e.Key
}
// Return string an error formatted as the given text
func (e InvalidDigest) Error() string {
return "Md5 provided " + e.Md5 + " is invalid"
}