Implement bucket caching for b2 gateway (#8820)

fixes #8739 #6806
This commit is contained in:
Forest Lovewood 2020-01-20 16:43:38 +00:00 committed by Nitish Tiwari
parent 7a400542ae
commit dd93eee1e3

View File

@ -127,6 +127,7 @@ type b2Objects struct {
b2Client *b2.B2
httpClient *http.Client
ctx context.Context
buckets []*b2.Bucket
}
// Convert B2 errors to minio object layer errors.
@ -257,11 +258,14 @@ func (l *b2Objects) listBuckets(ctx context.Context, err error) ([]*b2.Bucket, e
return nil, rerr
}
}
bktList, lerr := l.b2Client.ListBuckets(l.ctx)
if lerr != nil {
return l.listBuckets(ctx, lerr)
if len(l.buckets) == 0 {
bktList, lerr := l.b2Client.ListBuckets(l.ctx)
if lerr != nil {
return l.listBuckets(ctx, lerr)
}
l.buckets = bktList
}
return bktList, nil
return l.buckets, nil
}
// Bucket - is a helper which provides a *Bucket instance
@ -316,6 +320,20 @@ func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string) error {
return err
}
err = bkt.DeleteBucket(l.ctx)
if err != nil {
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
err = bkt.DeleteBucket(l.ctx)
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket)
}
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket)
}
@ -328,8 +346,18 @@ func (l *b2Objects) ListObjects(ctx context.Context, bucket string, prefix strin
}
files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter)
if lerr != nil {
logger.LogIf(ctx, lerr)
return loi, b2ToObjectError(lerr, bucket)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return loi, err
}
files, next, lerr = bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter)
if lerr != nil {
logger.LogIf(ctx, lerr)
return loi, b2ToObjectError(lerr, bucket)
}
}
loi.IsTruncated = next != ""
loi.NextMarker = next
@ -370,8 +398,18 @@ func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat
}
files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter)
if lerr != nil {
logger.LogIf(ctx, lerr)
return loi, b2ToObjectError(lerr, bucket)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return loi, err
}
files, next, lerr = bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter)
if lerr != nil {
logger.LogIf(ctx, lerr)
return loi, b2ToObjectError(lerr, bucket)
}
}
loi.IsTruncated = next != ""
loi.ContinuationToken = continuationToken
@ -433,8 +471,18 @@ func (l *b2Objects) GetObject(ctx context.Context, bucket string, object string,
}
reader, err := bkt.DownloadFileByName(l.ctx, object, startOffset, length)
if err != nil {
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket, object)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
reader, err = bkt.DownloadFileByName(l.ctx, object, startOffset, length)
if err != nil {
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket, object)
}
}
defer reader.Close()
_, err = io.Copy(writer, reader)
@ -451,8 +499,18 @@ func (l *b2Objects) GetObjectInfo(ctx context.Context, bucket string, object str
f, _, err := bkt.ListFileNames(l.ctx, 1, object, "", "")
if err != nil {
logger.LogIf(ctx, err)
return objInfo, b2ToObjectError(err, bucket, object)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return objInfo, err
}
f, _, err = bkt.ListFileNames(l.ctx, 1, object, "", "")
if err != nil {
logger.LogIf(ctx, err)
return objInfo, b2ToObjectError(err, bucket, object)
}
}
// B2's list will return the next item in the bucket if the object doesn't
@ -466,8 +524,18 @@ func (l *b2Objects) GetObjectInfo(ctx context.Context, bucket string, object str
fi, err := bkt.File(f[0].ID, object).GetFileInfo(l.ctx)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, b2ToObjectError(err, bucket, object)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return objInfo, err
}
fi, err = bkt.File(f[0].ID, object).GetFileInfo(l.ctx)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, b2ToObjectError(err, bucket, object)
}
}
return minio.ObjectInfo{
Bucket: bucket,
@ -557,8 +625,18 @@ func (l *b2Objects) PutObject(ctx context.Context, bucket string, object string,
var u *b2.URL
u, err = bkt.GetUploadURL(l.ctx)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, b2ToObjectError(err, bucket, object)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return objInfo, err
}
u, err = bkt.GetUploadURL(l.ctx)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, b2ToObjectError(err, bucket, object)
}
}
hr := newB2Reader(data, data.Size())
@ -597,6 +675,19 @@ func (l *b2Objects) DeleteObject(ctx context.Context, bucket string, object stri
// If we hide the file we'll conform to B2's versioning policy, it also
// saves an additional call to check if the file exists first
_, err = bkt.HideFile(l.ctx, object)
if err != nil {
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
_, err = bkt.HideFile(l.ctx, object)
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket, object)
}
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket, object)
}
@ -625,8 +716,18 @@ func (l *b2Objects) ListMultipartUploads(ctx context.Context, bucket string, pre
}
largeFiles, nextMarker, err := bkt.ListUnfinishedLargeFiles(l.ctx, maxUploads, uploadIDMarker)
if err != nil {
logger.LogIf(ctx, err)
return lmi, b2ToObjectError(err, bucket)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return lmi, err
}
largeFiles, nextMarker, err = bkt.ListUnfinishedLargeFiles(l.ctx, maxUploads, uploadIDMarker)
if err != nil {
logger.LogIf(ctx, err)
return lmi, b2ToObjectError(err, bucket)
}
}
lmi = minio.ListMultipartsInfo{
MaxUploads: maxUploads,
@ -661,8 +762,18 @@ func (l *b2Objects) NewMultipartUpload(ctx context.Context, bucket string, objec
delete(opts.UserDefined, "content-type")
lf, err := bkt.StartLargeFile(l.ctx, object, contentType, opts.UserDefined)
if err != nil {
logger.LogIf(ctx, err)
return uploadID, b2ToObjectError(err, bucket, object)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return uploadID, err
}
lf, err = bkt.StartLargeFile(l.ctx, object, contentType, opts.UserDefined)
if err != nil {
logger.LogIf(ctx, err)
return uploadID, b2ToObjectError(err, bucket, object)
}
}
return lf.ID, nil
@ -678,8 +789,18 @@ func (l *b2Objects) PutObjectPart(ctx context.Context, bucket string, object str
fc, err := bkt.File(uploadID, object).CompileParts(0, nil).GetUploadPartURL(l.ctx)
if err != nil {
logger.LogIf(ctx, err)
return pi, b2ToObjectError(err, bucket, object, uploadID)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return pi, err
}
fc, err = bkt.File(uploadID, object).CompileParts(0, nil).GetUploadPartURL(l.ctx)
if err != nil {
logger.LogIf(ctx, err)
return pi, b2ToObjectError(err, bucket, object, uploadID)
}
}
hr := newB2Reader(data, data.Size())
@ -714,8 +835,18 @@ func (l *b2Objects) ListObjectParts(ctx context.Context, bucket string, object s
partNumberMarker++
partsList, next, err := bkt.File(uploadID, object).ListParts(l.ctx, partNumberMarker, maxParts)
if err != nil {
logger.LogIf(ctx, err)
return lpi, b2ToObjectError(err, bucket, object, uploadID)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return lpi, err
}
partsList, next, err = bkt.File(uploadID, object).ListParts(l.ctx, partNumberMarker, maxParts)
if err != nil {
logger.LogIf(ctx, err)
return lpi, b2ToObjectError(err, bucket, object, uploadID)
}
}
if next != 0 {
lpi.IsTruncated = true
@ -738,6 +869,19 @@ func (l *b2Objects) AbortMultipartUpload(ctx context.Context, bucket string, obj
return err
}
err = bkt.File(uploadID, object).CompileParts(0, nil).CancelLargeFile(l.ctx)
if err != nil {
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
err = bkt.File(uploadID, object).CompileParts(0, nil).CancelLargeFile(l.ctx)
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket, object, uploadID)
}
logger.LogIf(ctx, err)
return b2ToObjectError(err, bucket, object, uploadID)
}
@ -762,8 +906,17 @@ func (l *b2Objects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
if _, err = bkt.File(uploadID, object).CompileParts(0, hashes).FinishLargeFile(l.ctx); err != nil {
logger.LogIf(ctx, err)
return oi, b2ToObjectError(err, bucket, object, uploadID)
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return oi, err
}
if _, err = bkt.File(uploadID, object).CompileParts(0, hashes).FinishLargeFile(l.ctx); err != nil {
logger.LogIf(ctx, err)
return oi, b2ToObjectError(err, bucket, object, uploadID)
}
}
return l.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
@ -806,6 +959,20 @@ func (l *b2Objects) SetBucketPolicy(ctx context.Context, bucket string, bucketPo
}
bkt.Type = bucketTypeReadOnly
_, err = bkt.Update(l.ctx)
if err != nil {
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
bkt.Type = bucketTypeReadOnly
_, err = bkt.Update(l.ctx)
logger.LogIf(ctx, err)
return b2ToObjectError(err)
}
logger.LogIf(ctx, err)
return b2ToObjectError(err)
}
@ -854,6 +1021,20 @@ func (l *b2Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error
}
bkt.Type = bucketTypePrivate
_, err = bkt.Update(l.ctx)
if err != nil {
l.buckets = []*b2.Bucket{}
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
bkt.Type = bucketTypePrivate
_, err = bkt.Update(l.ctx)
logger.LogIf(ctx, err)
return b2ToObjectError(err)
}
logger.LogIf(ctx, err)
return b2ToObjectError(err)
}