mirror of
https://github.com/minio/minio.git
synced 2025-01-26 06:03:17 -05:00
e29009d347
Commit 5c1376516867aeca06f114868e329e1d710f7148 removed postgre registration triggerd by the automatic gofmt command but it was the only where pg is registered. This commit fixes behavior and adds unit tests to check whether postgre & sql are registered or not.
130 lines
2.6 KiB
Go
130 lines
2.6 KiB
Go
package pq
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
)
|
|
|
|
// Implement the "QueryerContext" interface
|
|
func (cn *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
|
|
list := make([]driver.Value, len(args))
|
|
for i, nv := range args {
|
|
list[i] = nv.Value
|
|
}
|
|
finish := cn.watchCancel(ctx)
|
|
r, err := cn.query(query, list)
|
|
if err != nil {
|
|
if finish != nil {
|
|
finish()
|
|
}
|
|
return nil, err
|
|
}
|
|
r.finish = finish
|
|
return r, nil
|
|
}
|
|
|
|
// Implement the "ExecerContext" interface
|
|
func (cn *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
|
|
list := make([]driver.Value, len(args))
|
|
for i, nv := range args {
|
|
list[i] = nv.Value
|
|
}
|
|
|
|
if finish := cn.watchCancel(ctx); finish != nil {
|
|
defer finish()
|
|
}
|
|
|
|
return cn.Exec(query, list)
|
|
}
|
|
|
|
// Implement the "ConnBeginTx" interface
|
|
func (cn *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
|
|
var mode string
|
|
|
|
switch sql.IsolationLevel(opts.Isolation) {
|
|
case sql.LevelDefault:
|
|
// Don't touch mode: use the server's default
|
|
case sql.LevelReadUncommitted:
|
|
mode = " ISOLATION LEVEL READ UNCOMMITTED"
|
|
case sql.LevelReadCommitted:
|
|
mode = " ISOLATION LEVEL READ COMMITTED"
|
|
case sql.LevelRepeatableRead:
|
|
mode = " ISOLATION LEVEL REPEATABLE READ"
|
|
case sql.LevelSerializable:
|
|
mode = " ISOLATION LEVEL SERIALIZABLE"
|
|
default:
|
|
return nil, fmt.Errorf("pq: isolation level not supported: %d", opts.Isolation)
|
|
}
|
|
|
|
if opts.ReadOnly {
|
|
mode += " READ ONLY"
|
|
} else {
|
|
mode += " READ WRITE"
|
|
}
|
|
|
|
tx, err := cn.begin(mode)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cn.txnFinish = cn.watchCancel(ctx)
|
|
return tx, nil
|
|
}
|
|
|
|
func (cn *conn) watchCancel(ctx context.Context) func() {
|
|
if done := ctx.Done(); done != nil {
|
|
finished := make(chan struct{})
|
|
go func() {
|
|
select {
|
|
case <-done:
|
|
_ = cn.cancel()
|
|
finished <- struct{}{}
|
|
case <-finished:
|
|
}
|
|
}()
|
|
return func() {
|
|
select {
|
|
case <-finished:
|
|
case finished <- struct{}{}:
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cn *conn) cancel() error {
|
|
c, err := dial(cn.dialer, cn.opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.Close()
|
|
|
|
{
|
|
can := conn{
|
|
c: c,
|
|
}
|
|
err = can.ssl(cn.opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
w := can.writeBuf(0)
|
|
w.int32(80877102) // cancel request code
|
|
w.int32(cn.processID)
|
|
w.int32(cn.secretKey)
|
|
|
|
if err := can.sendStartupPacket(w); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Read until EOF to ensure that the server received the cancel.
|
|
{
|
|
_, err := io.Copy(ioutil.Discard, c)
|
|
return err
|
|
}
|
|
}
|