diff --git a/cmd/sftp-server-driver.go b/cmd/sftp-server-driver.go index f87bdc217..dcd7e2c8f 100644 --- a/cmd/sftp-server-driver.go +++ b/cmd/sftp-server-driver.go @@ -201,19 +201,57 @@ func (f *sftpDriver) Fileread(r *sftp.Request) (ra io.ReaderAt, err error) { return obj, nil } -type writerAt struct { - w *io.PipeWriter - wg *sync.WaitGroup -} - -func (w *writerAt) Close() error { - err := w.w.Close() +func (w *writerAt) Close() (err error) { + if len(w.buffer) > 0 { + err = w.w.CloseWithError(errors.New("some file segments were not flushed from the queue")) + for i := range w.buffer { + delete(w.buffer, i) + } + } else { + err = w.w.Close() + } w.wg.Wait() return err } +type writerAt struct { + w *io.PipeWriter + wg *sync.WaitGroup + buffer map[int64][]byte + + nextOffset int64 + m sync.Mutex +} + func (w *writerAt) WriteAt(b []byte, offset int64) (n int, err error) { - return w.w.Write(b) + w.m.Lock() + defer w.m.Unlock() + + if w.nextOffset == offset { + n, err = w.w.Write(b) + w.nextOffset += int64(n) + } else { + w.buffer[offset] = make([]byte, len(b)) + copy(w.buffer[offset], b) + n = len(b) + } + +again: + nextOut, ok := w.buffer[w.nextOffset] + if ok { + n, err = w.w.Write(nextOut) + delete(w.buffer, w.nextOffset) + w.nextOffset += int64(n) + if n != len(nextOut) { + return 0, fmt.Errorf("expected write size %d but wrote %d bytes", len(nextOut), n) + } + if err != nil { + return 0, err + } + goto again + } + + return len(b), nil } func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) { @@ -238,7 +276,11 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) { pr, pw := io.Pipe() - wa := &writerAt{w: pw, wg: &sync.WaitGroup{}} + wa := &writerAt{ + buffer: make(map[int64][]byte), + w: pw, + wg: &sync.WaitGroup{}, + } wa.wg.Add(1) go func() { _, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{SendContentMd5: true})