mirror of
https://github.com/minio/minio.git
synced 2025-02-23 03:22:30 -05:00
fix: file consistency issue on SFTP upload (#18422)
* creating a byte buffer for SFTP file segments * Adding an error condition for when there are remaining segments in the queue * Simplification of the queue using a map
This commit is contained in:
parent
9569a85cee
commit
9afdb05bf4
@ -201,19 +201,57 @@ func (f *sftpDriver) Fileread(r *sftp.Request) (ra io.ReaderAt, err error) {
|
|||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type writerAt struct {
|
func (w *writerAt) Close() (err error) {
|
||||||
w *io.PipeWriter
|
if len(w.buffer) > 0 {
|
||||||
wg *sync.WaitGroup
|
err = w.w.CloseWithError(errors.New("some file segments were not flushed from the queue"))
|
||||||
}
|
for i := range w.buffer {
|
||||||
|
delete(w.buffer, i)
|
||||||
func (w *writerAt) Close() error {
|
}
|
||||||
err := w.w.Close()
|
} else {
|
||||||
|
err = w.w.Close()
|
||||||
|
}
|
||||||
w.wg.Wait()
|
w.wg.Wait()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type writerAt struct {
|
||||||
|
w *io.PipeWriter
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
buffer map[int64][]byte
|
||||||
|
|
||||||
|
nextOffset int64
|
||||||
|
m sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
func (w *writerAt) WriteAt(b []byte, offset int64) (n int, err error) {
|
func (w *writerAt) WriteAt(b []byte, offset int64) (n int, err error) {
|
||||||
return w.w.Write(b)
|
w.m.Lock()
|
||||||
|
defer w.m.Unlock()
|
||||||
|
|
||||||
|
if w.nextOffset == offset {
|
||||||
|
n, err = w.w.Write(b)
|
||||||
|
w.nextOffset += int64(n)
|
||||||
|
} else {
|
||||||
|
w.buffer[offset] = make([]byte, len(b))
|
||||||
|
copy(w.buffer[offset], b)
|
||||||
|
n = len(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
again:
|
||||||
|
nextOut, ok := w.buffer[w.nextOffset]
|
||||||
|
if ok {
|
||||||
|
n, err = w.w.Write(nextOut)
|
||||||
|
delete(w.buffer, w.nextOffset)
|
||||||
|
w.nextOffset += int64(n)
|
||||||
|
if n != len(nextOut) {
|
||||||
|
return 0, fmt.Errorf("expected write size %d but wrote %d bytes", len(nextOut), n)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
goto again
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(b), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
|
func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
|
||||||
@ -238,7 +276,11 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
|
|||||||
|
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
|
||||||
wa := &writerAt{w: pw, wg: &sync.WaitGroup{}}
|
wa := &writerAt{
|
||||||
|
buffer: make(map[int64][]byte),
|
||||||
|
w: pw,
|
||||||
|
wg: &sync.WaitGroup{},
|
||||||
|
}
|
||||||
wa.wg.Add(1)
|
wa.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
_, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{SendContentMd5: true})
|
_, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{SendContentMd5: true})
|
||||||
|
Loading…
x
Reference in New Issue
Block a user