Adding error handling for network errors in the SFTP layer (#18442)

This commit is contained in:
Sveinn 2023-11-14 17:31:00 +00:00 committed by GitHub
parent a3c2f7b0e8
commit f3367a1b20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -201,23 +201,40 @@ func (f *sftpDriver) Fileread(r *sftp.Request) (ra io.ReaderAt, err error) {
return obj, nil return obj, nil
} }
// TransferError will catch network errors during transfer.
// When TransferError() is called Close() will also
// be called, so we do not need to Wait() here.
func (w *writerAt) TransferError(err error) {
_ = w.w.CloseWithError(err)
_ = w.r.CloseWithError(err)
w.err = err
}
func (w *writerAt) Close() (err error) { func (w *writerAt) Close() (err error) {
if len(w.buffer) > 0 { switch {
err = w.w.CloseWithError(errors.New("some file segments were not flushed from the queue")) case len(w.buffer) > 0:
for i := range w.buffer { err = errors.New("some file segments were not flushed from the queue")
delete(w.buffer, i) _ = w.w.CloseWithError(err)
} case w.err != nil:
} else { // No need to close here since both pipes were
// closing inside TransferError()
err = w.err
default:
err = w.w.Close() err = w.w.Close()
} }
for i := range w.buffer {
delete(w.buffer, i)
}
w.wg.Wait() w.wg.Wait()
return err return err
} }
type writerAt struct { type writerAt struct {
w *io.PipeWriter w *io.PipeWriter
r *io.PipeReader
wg *sync.WaitGroup wg *sync.WaitGroup
buffer map[int64][]byte buffer map[int64][]byte
err error
nextOffset int64 nextOffset int64
m sync.Mutex m sync.Mutex
@ -279,6 +296,7 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
wa := &writerAt{ wa := &writerAt{
buffer: make(map[int64][]byte), buffer: make(map[int64][]byte),
w: pw, w: pw,
r: pr,
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
} }
wa.wg.Add(1) wa.wg.Add(1)