Allow for proper garbage collection pooling bytes.Buffer (#6266)

This commit is contained in:
Harshavardhana
2018-08-16 18:37:43 -07:00
committed by kannappanr
parent 1103ad2d08
commit 65de2d68c0
4 changed files with 89 additions and 35 deletions

View File

@@ -40,14 +40,10 @@ var errorType = reflect.TypeOf((*error)(nil)).Elem()
// reflect.Type of Authenticator interface.
var authenticatorType = reflect.TypeOf((*Authenticator)(nil)).Elem()
func gobEncode(e interface{}) ([]byte, error) {
var buf bytes.Buffer
var bufPool = NewPool()
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
return nil, err
}
return buf.Bytes(), nil
func gobEncodeBuf(e interface{}, buf *bytes.Buffer) error {
return gob.NewEncoder(buf).Encode(e)
}
func gobDecode(data []byte, e interface{}) error {
@@ -146,21 +142,21 @@ func (server *Server) RegisterName(name string, receiver interface{}) error {
}
// call - call service method in receiver.
func (server *Server) call(serviceMethod string, argBytes []byte) (replyBytes []byte, err error) {
func (server *Server) call(serviceMethod string, argBytes []byte, replyBytes *bytes.Buffer) (err error) {
tokens := strings.SplitN(serviceMethod, ".", 2)
if len(tokens) != 2 {
return nil, fmt.Errorf("invalid service/method request ill-formed %v", serviceMethod)
return fmt.Errorf("invalid service/method request ill-formed %v", serviceMethod)
}
serviceName := tokens[0]
if serviceName != server.serviceName {
return nil, fmt.Errorf("can't find service %v", serviceName)
return fmt.Errorf("can't find service %v", serviceName)
}
methodName := tokens[1]
method, found := server.methodMap[methodName]
if !found {
return nil, fmt.Errorf("can't find method %v", methodName)
return fmt.Errorf("can't find method %v", methodName)
}
var argv reflect.Value
@@ -175,7 +171,7 @@ func (server *Server) call(serviceMethod string, argBytes []byte) (replyBytes []
}
if err = gobDecode(argBytes, argv.Interface()); err != nil {
return nil, err
return err
}
if argIsValue {
@@ -193,7 +189,7 @@ func (server *Server) call(serviceMethod string, argBytes []byte) (replyBytes []
err = errInter.(error)
}
if err != nil {
return nil, err
return err
}
replyv := reflect.New(method.Type.In(2).Elem())
@@ -211,10 +207,10 @@ func (server *Server) call(serviceMethod string, argBytes []byte) (replyBytes []
err = errInter.(error)
}
if err != nil {
return nil, err
return err
}
return gobEncode(replyv.Interface())
return gobEncodeBuf(replyv.Interface(), replyBytes)
}
// CallRequest - RPC call request parameters.
@@ -242,20 +238,18 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
var callResponse CallResponse
var err error
callResponse.ReplyBytes, err = server.call(callRequest.Method, callRequest.ArgBytes)
if err != nil {
callResponse := CallResponse{}
buf := bufPool.Get()
defer bufPool.Put(buf)
if err := server.call(callRequest.Method, callRequest.ArgBytes, buf); err != nil {
callResponse.Error = err.Error()
}
callResponse.ReplyBytes = buf.Bytes()
data, err := gobEncode(callResponse)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
gob.NewEncoder(w).Encode(callResponse)
w.Write(data)
w.(http.Flusher).Flush()
}
// NewServer - returns new RPC server.