gateway-gcs: remove files older than 2 weeks in minio.sys.temp (#4599).

Rename ##minio## to {minio}.
This commit is contained in:
Krishna Srinivas 2017-07-19 19:33:10 -07:00 committed by deekoder
parent 0a1501bc1b
commit eb787d8613
2 changed files with 85 additions and 31 deletions

View File

@ -28,6 +28,7 @@ import (
"math" "math"
"regexp" "regexp"
"strings" "strings"
"time"
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
@ -41,29 +42,38 @@ import (
) )
const ( const (
// gcsMinioMeta is used for multiparts. We have "minio.sys.temp" prefix so that // gcsMinioSysTmp is used for multiparts. We have "minio.sys.tmp" prefix so that
// listing on the GCS lists this entry in the end. Also in the gateway // listing on the GCS lists this entry in the end. Also in the gateway
// ListObjects we filter out this entry. // ListObjects we filter out this entry.
gcsMinioPath = "minio.sys.temp/" gcsMinioSysTmp = "minio.sys.tmp/"
// Path where multipart objects are saved. // Path where multipart objects are saved.
// If we change the backend format we will use a different url path like /multipart/v2 // If we change the backend format we will use a different url path like /multipart/v2
// but we will not migrate old data. // but we will not migrate old data.
gcsMinioMultipartPathV1 = gcsMinioPath + "multipart/v1" gcsMinioMultipartPathV1 = gcsMinioSysTmp + "multipart/v1"
// Multipart meta file. // Multipart meta file.
gcsMinioMultipartMeta = "gcs.json" gcsMinioMultipartMeta = "gcs.json"
// gcs.json version number // gcs.json version number
gcsMinioMultipartMetaCurrentVersion = "1" gcsMinioMultipartMetaCurrentVersion = "1"
// token prefixed with GCS returned marker to differentiate // token prefixed with GCS returned marker to differentiate
// from user supplied marker. // from user supplied marker.
gcsTokenPrefix = "##minio" gcsTokenPrefix = "{minio}"
// maxComponents - maximum component object count to create a composite object. // Maximum component object count to create a composite object.
// Refer https://cloud.google.com/storage/docs/composite-objects // Refer https://cloud.google.com/storage/docs/composite-objects
maxComponents = 32 gcsMaxComponents = 32
// maxPartCount - maximum multipart parts GCS supports which is 32 x 32 = 1024. // gcsMaxPartCount - maximum multipart parts GCS supports which is 32 x 32 = 1024.
maxPartCount = 1024 gcsMaxPartCount = 1024
// Every 24 hours we scan minio.sys.tmp to delete expired multiparts in minio.sys.tmp
gcsCleanupInterval = time.Hour * 24
// The cleanup routine deletes files older than 2 weeks in minio.sys.tmp
gcsMultipartExpiry = time.Hour * 24 * 14
) )
// Stored in gcs.json - Contents of this file is not used anywhere. It can be // Stored in gcs.json - Contents of this file is not used anywhere. It can be
@ -268,12 +278,56 @@ func newGCSGateway(projectID string) (GatewayLayer, error) {
return nil, err return nil, err
} }
return &gcsGateway{ gateway := &gcsGateway{
client: client, client: client,
projectID: projectID, projectID: projectID,
ctx: ctx, ctx: ctx,
anonClient: anonClient, anonClient: anonClient,
}, nil }
// Start background process to cleanup old files in minio.sys.tmp
go gateway.CleanupGCSMinioSysTmp()
return gateway, nil
}
// Cleanup old files in minio.sys.tmp of the given bucket.
func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) {
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: gcsMinioSysTmp, Versions: false})
for {
attrs, err := it.Next()
if err != nil {
if err != iterator.Done {
errorIf(err, "Object listing error on bucket %s during purging of old files in minio.sys.tmp", bucket)
}
return
}
if time.Since(attrs.Updated) > gcsMultipartExpiry {
// Delete files older than 2 weeks.
err := l.client.Bucket(bucket).Object(attrs.Name).Delete(l.ctx)
if err != nil {
errorIf(err, "Unable to delete %s/%s during purging of old files in minio.sys.tmp", bucket, attrs.Name)
return
}
}
}
}
// Cleanup old files in minio.sys.tmp of all buckets.
func (l *gcsGateway) CleanupGCSMinioSysTmp() {
for {
it := l.client.Buckets(l.ctx, l.projectID)
for {
attrs, err := it.Next()
if err != nil {
if err != iterator.Done {
errorIf(err, "Bucket listing error during purging of old files in minio.sys.tmp")
}
break
}
l.CleanupGCSMinioSysTmpBucket(attrs.Name)
}
// Run the cleanup loop every 1 day.
time.Sleep(gcsCleanupInterval)
}
} }
// Shutdown - save any gateway metadata to disk // Shutdown - save any gateway metadata to disk
@ -344,7 +398,7 @@ func (l *gcsGateway) ListBuckets() (buckets []BucketInfo, err error) {
func (l *gcsGateway) DeleteBucket(bucket string) error { func (l *gcsGateway) DeleteBucket(bucket string) error {
itObject := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: slashSeparator, Versions: false}) itObject := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: slashSeparator, Versions: false})
// We list the bucket and if we find any objects we return BucketNotEmpty error. If we // We list the bucket and if we find any objects we return BucketNotEmpty error. If we
// find only "minio.sys.temp/" then we remove it before deleting the bucket. // find only "minio.sys.tmp/" then we remove it before deleting the bucket.
gcsMinioPathFound := false gcsMinioPathFound := false
nonGCSMinioPathFound := false nonGCSMinioPathFound := false
for { for {
@ -355,7 +409,7 @@ func (l *gcsGateway) DeleteBucket(bucket string) error {
if err != nil { if err != nil {
return gcsToObjectError(traceError(err)) return gcsToObjectError(traceError(err))
} }
if objAttrs.Prefix == gcsMinioPath { if objAttrs.Prefix == gcsMinioSysTmp {
gcsMinioPathFound = true gcsMinioPathFound = true
continue continue
} }
@ -366,8 +420,8 @@ func (l *gcsGateway) DeleteBucket(bucket string) error {
return gcsToObjectError(traceError(BucketNotEmpty{})) return gcsToObjectError(traceError(BucketNotEmpty{}))
} }
if gcsMinioPathFound { if gcsMinioPathFound {
// Remove minio.sys.temp before deleting the bucket. // Remove minio.sys.tmp before deleting the bucket.
itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: gcsMinioPath}) itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: gcsMinioSysTmp})
for { for {
objAttrs, err := itObject.Next() objAttrs, err := itObject.Next()
if err == iterator.Done { if err == iterator.Done {
@ -452,7 +506,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
// metadata folder, then just break // metadata folder, then just break
// otherwise we've truncated the output // otherwise we've truncated the output
attrs, _ := it.Next() attrs, _ := it.Next()
if attrs != nil && attrs.Prefix == gcsMinioPath { if attrs != nil && attrs.Prefix == gcsMinioSysTmp {
break break
} }
@ -470,16 +524,16 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
nextMarker = toGCSPageToken(attrs.Name) nextMarker = toGCSPageToken(attrs.Name)
if attrs.Prefix == gcsMinioPath { if attrs.Prefix == gcsMinioSysTmp {
// We don't return our metadata prefix. // We don't return our metadata prefix.
continue continue
} }
if !strings.HasPrefix(prefix, gcsMinioPath) { if !strings.HasPrefix(prefix, gcsMinioSysTmp) {
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
// which will be helpful to observe the "directory structure" for debugging purposes. // which will be helpful to observe the "directory structure" for debugging purposes.
if strings.HasPrefix(attrs.Prefix, gcsMinioPath) || if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) ||
strings.HasPrefix(attrs.Name, gcsMinioPath) { strings.HasPrefix(attrs.Name, gcsMinioSysTmp) {
continue continue
} }
} }
@ -552,16 +606,16 @@ func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fet
return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix) return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix)
} }
if attrs.Prefix == gcsMinioPath { if attrs.Prefix == gcsMinioSysTmp {
// We don't return our metadata prefix. // We don't return our metadata prefix.
continue continue
} }
if !strings.HasPrefix(prefix, gcsMinioPath) { if !strings.HasPrefix(prefix, gcsMinioSysTmp) {
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
// which will be helpful to observe the "directory structure" for debugging purposes. // which will be helpful to observe the "directory structure" for debugging purposes.
if strings.HasPrefix(attrs.Prefix, gcsMinioPath) || if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) ||
strings.HasPrefix(attrs.Name, gcsMinioPath) { strings.HasPrefix(attrs.Name, gcsMinioSysTmp) {
continue continue
} }
} }
@ -792,7 +846,7 @@ func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBuck
return PartInfo{}, traceError(NotSupported{}) return PartInfo{}, traceError(NotSupported{})
} }
// Checks if minio.sys.temp/multipart/v1/<upload-id>/gcs.json exists, returns // Checks if minio.sys.tmp/multipart/v1/<upload-id>/gcs.json exists, returns
// an object layer compatible error upon any error. // an object layer compatible error upon any error.
func (l *gcsGateway) checkUploadIDExists(bucket string, key string, uploadID string) error { func (l *gcsGateway) checkUploadIDExists(bucket string, key string, uploadID string) error {
_, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(l.ctx) _, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(l.ctx)
@ -948,18 +1002,18 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
// Returns name of the composed object. // Returns name of the composed object.
gcsMultipartComposeName := func(uploadID string, composeNumber int) string { gcsMultipartComposeName := func(uploadID string, composeNumber int) string {
return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", gcsMinioPath, uploadID, composeNumber) return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", gcsMinioSysTmp, uploadID, composeNumber)
} }
composeCount := int(math.Ceil(float64(len(parts)) / float64(maxComponents))) composeCount := int(math.Ceil(float64(len(parts)) / float64(gcsMaxComponents)))
if composeCount > 1 { if composeCount > 1 {
// Create composes of every 32 parts. // Create composes of every 32 parts.
composeParts := make([]*storage.ObjectHandle, composeCount) composeParts := make([]*storage.ObjectHandle, composeCount)
for i := 0; i < composeCount; i++ { for i := 0; i < composeCount; i++ {
// Create 'composed-object-N' using next 32 parts. // Create 'composed-object-N' using next 32 parts.
composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartComposeName(uploadID, i)) composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartComposeName(uploadID, i))
start := i * maxComponents start := i * gcsMaxComponents
end := start + maxComponents end := start + gcsMaxComponents
if end > len(parts) { if end > len(parts) {
end = len(parts) end = len(parts)
} }

View File

@ -108,15 +108,15 @@ func TestIsGCSMarker(t *testing.T) {
expected bool expected bool
}{ }{
{ {
marker: "##miniogcs123", marker: "{minio}gcs123",
expected: true, expected: true,
}, },
{ {
marker: "##mini_notgcs123", marker: "{mini_no}tgcs123",
expected: false, expected: false,
}, },
{ {
marker: "#minioagainnotgcs123", marker: "{minioagainnotgcs123",
expected: false, expected: false,
}, },
{ {