Added support for Haystack tracing
This commit is contained in:
parent
681892148e
commit
9cf6827ccc
274 changed files with 38070 additions and 13436 deletions
462
vendor/google.golang.org/grpc/server.go
generated
vendored
462
vendor/google.golang.org/grpc/server.go
generated
vendored
|
@ -19,7 +19,7 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -30,27 +30,25 @@ import (
|
|||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"io/ioutil"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/trace"
|
||||
|
||||
"google.golang.org/grpc/channelz"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/encoding"
|
||||
"google.golang.org/grpc/encoding/proto"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/binarylog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/tap"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -106,12 +104,8 @@ type Server struct {
|
|||
channelzRemoveOnce sync.Once
|
||||
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
|
||||
|
||||
channelzID int64 // channelz unique identification number
|
||||
czmu sync.RWMutex
|
||||
callsStarted int64
|
||||
callsFailed int64
|
||||
callsSucceeded int64
|
||||
lastCallStartedTime time.Time
|
||||
channelzID int64 // channelz unique identification number
|
||||
czData *channelzData
|
||||
}
|
||||
|
||||
type options struct {
|
||||
|
@ -126,7 +120,6 @@ type options struct {
|
|||
maxConcurrentStreams uint32
|
||||
maxReceiveMessageSize int
|
||||
maxSendMessageSize int
|
||||
useHandlerImpl bool // use http.Handler-based server
|
||||
unknownStreamDesc *StreamDesc
|
||||
keepaliveParams keepalive.ServerParameters
|
||||
keepalivePolicy keepalive.EnforcementPolicy
|
||||
|
@ -135,19 +128,25 @@ type options struct {
|
|||
writeBufferSize int
|
||||
readBufferSize int
|
||||
connectionTimeout time.Duration
|
||||
maxHeaderListSize *uint32
|
||||
}
|
||||
|
||||
var defaultServerOptions = options{
|
||||
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
|
||||
maxSendMessageSize: defaultServerMaxSendMessageSize,
|
||||
connectionTimeout: 120 * time.Second,
|
||||
writeBufferSize: defaultWriteBufSize,
|
||||
readBufferSize: defaultReadBufSize,
|
||||
}
|
||||
|
||||
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
|
||||
type ServerOption func(*options)
|
||||
|
||||
// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
|
||||
// before doing a write on the wire.
|
||||
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
|
||||
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
|
||||
// The default value for this buffer is 32KB.
|
||||
// Zero will disable the write buffer such that each write will be on underlying connection.
|
||||
// Note: A Send call may not directly translate to a write.
|
||||
func WriteBufferSize(s int) ServerOption {
|
||||
return func(o *options) {
|
||||
o.writeBufferSize = s
|
||||
|
@ -156,6 +155,9 @@ func WriteBufferSize(s int) ServerOption {
|
|||
|
||||
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
|
||||
// for one read syscall.
|
||||
// The default value for this buffer is 32KB.
|
||||
// Zero will disable read buffer for a connection so data framer can access the underlying
|
||||
// conn directly.
|
||||
func ReadBufferSize(s int) ServerOption {
|
||||
return func(o *options) {
|
||||
o.readBufferSize = s
|
||||
|
@ -180,6 +182,11 @@ func InitialConnWindowSize(s int32) ServerOption {
|
|||
|
||||
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
|
||||
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
|
||||
if kp.Time > 0 && kp.Time < time.Second {
|
||||
grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
|
||||
kp.Time = time.Second
|
||||
}
|
||||
|
||||
return func(o *options) {
|
||||
o.keepaliveParams = kp
|
||||
}
|
||||
|
@ -242,7 +249,7 @@ func MaxRecvMsgSize(m int) ServerOption {
|
|||
}
|
||||
|
||||
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
|
||||
// If this is not set, gRPC uses the default 4MB.
|
||||
// If this is not set, gRPC uses the default `math.MaxInt32`.
|
||||
func MaxSendMsgSize(m int) ServerOption {
|
||||
return func(o *options) {
|
||||
o.maxSendMessageSize = m
|
||||
|
@ -335,6 +342,14 @@ func ConnectionTimeout(d time.Duration) ServerOption {
|
|||
}
|
||||
}
|
||||
|
||||
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
|
||||
// of header list that the server is prepared to accept.
|
||||
func MaxHeaderListSize(s uint32) ServerOption {
|
||||
return func(o *options) {
|
||||
o.maxHeaderListSize = &s
|
||||
}
|
||||
}
|
||||
|
||||
// NewServer creates a gRPC server which has no service registered and has not
|
||||
// started to accept requests yet.
|
||||
func NewServer(opt ...ServerOption) *Server {
|
||||
|
@ -343,12 +358,13 @@ func NewServer(opt ...ServerOption) *Server {
|
|||
o(&opts)
|
||||
}
|
||||
s := &Server{
|
||||
lis: make(map[net.Listener]bool),
|
||||
opts: opts,
|
||||
conns: make(map[io.Closer]bool),
|
||||
m: make(map[string]*service),
|
||||
quit: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
lis: make(map[net.Listener]bool),
|
||||
opts: opts,
|
||||
conns: make(map[io.Closer]bool),
|
||||
m: make(map[string]*service),
|
||||
quit: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
czData: new(channelzData),
|
||||
}
|
||||
s.cv = sync.NewCond(&s.mu)
|
||||
if EnableTracing {
|
||||
|
@ -357,7 +373,7 @@ func NewServer(opt ...ServerOption) *Server {
|
|||
}
|
||||
|
||||
if channelz.IsOn() {
|
||||
s.channelzID = channelz.RegisterServer(s, "")
|
||||
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
@ -481,7 +497,8 @@ type listenSocket struct {
|
|||
|
||||
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
|
||||
return &channelz.SocketInternalMetric{
|
||||
LocalAddr: l.Listener.Addr(),
|
||||
SocketOptions: channelz.GetSocketOption(l.Listener),
|
||||
LocalAddr: l.Listener.Addr(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -525,7 +542,7 @@ func (s *Server) Serve(lis net.Listener) error {
|
|||
s.lis[ls] = true
|
||||
|
||||
if channelz.IsOn() {
|
||||
ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "")
|
||||
ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
|
@ -597,12 +614,13 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
|
|||
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
|
||||
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
|
||||
if err != nil {
|
||||
s.mu.Lock()
|
||||
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
|
||||
s.mu.Unlock()
|
||||
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
|
||||
// If serverHandshake returns ErrConnDispatched, keep rawConn open.
|
||||
// ErrConnDispatched means that the connection was dispatched away from
|
||||
// gRPC; those connections should be left open.
|
||||
if err != credentials.ErrConnDispatched {
|
||||
s.mu.Lock()
|
||||
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
|
||||
s.mu.Unlock()
|
||||
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
|
||||
rawConn.Close()
|
||||
}
|
||||
rawConn.SetDeadline(time.Time{})
|
||||
|
@ -617,27 +635,19 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
|
|||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
var serve func()
|
||||
c := conn.(io.Closer)
|
||||
if s.opts.useHandlerImpl {
|
||||
serve = func() { s.serveUsingHandler(conn) }
|
||||
} else {
|
||||
// Finish handshaking (HTTP2)
|
||||
st := s.newHTTP2Transport(conn, authInfo)
|
||||
if st == nil {
|
||||
return
|
||||
}
|
||||
c = st
|
||||
serve = func() { s.serveStreams(st) }
|
||||
// Finish handshaking (HTTP2)
|
||||
st := s.newHTTP2Transport(conn, authInfo)
|
||||
if st == nil {
|
||||
return
|
||||
}
|
||||
|
||||
rawConn.SetDeadline(time.Time{})
|
||||
if !s.addConn(c) {
|
||||
if !s.addConn(st) {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
serve()
|
||||
s.removeConn(c)
|
||||
s.serveStreams(st)
|
||||
s.removeConn(st)
|
||||
}()
|
||||
}
|
||||
|
||||
|
@ -656,6 +666,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
|
|||
WriteBufferSize: s.opts.writeBufferSize,
|
||||
ReadBufferSize: s.opts.readBufferSize,
|
||||
ChannelzParentID: s.channelzID,
|
||||
MaxHeaderListSize: s.opts.maxHeaderListSize,
|
||||
}
|
||||
st, err := transport.NewServerTransport("http2", c, config)
|
||||
if err != nil {
|
||||
|
@ -691,27 +702,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
|
|||
|
||||
var _ http.Handler = (*Server)(nil)
|
||||
|
||||
// serveUsingHandler is called from handleRawConn when s is configured
|
||||
// to handle requests via the http.Handler interface. It sets up a
|
||||
// net/http.Server to handle the just-accepted conn. The http.Server
|
||||
// is configured to route all incoming requests (all HTTP/2 streams)
|
||||
// to ServeHTTP, which creates a new ServerTransport for each stream.
|
||||
// serveUsingHandler blocks until conn closes.
|
||||
//
|
||||
// This codepath is only used when Server.TestingUseHandlerImpl has
|
||||
// been configured. This lets the end2end tests exercise the ServeHTTP
|
||||
// method as one of the environment types.
|
||||
//
|
||||
// conn is the *tls.Conn that's already been authenticated.
|
||||
func (s *Server) serveUsingHandler(conn net.Conn) {
|
||||
h2s := &http2.Server{
|
||||
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
|
||||
}
|
||||
h2s.ServeConn(conn, &http2.ServeConnOpts{
|
||||
Handler: s,
|
||||
})
|
||||
}
|
||||
|
||||
// ServeHTTP implements the Go standard library's http.Handler
|
||||
// interface by responding to the gRPC request r, by looking up
|
||||
// the requested gRPC method in the gRPC server s.
|
||||
|
@ -759,12 +749,13 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
|
|||
|
||||
trInfo = &traceInfo{
|
||||
tr: tr,
|
||||
firstLine: firstLine{
|
||||
client: false,
|
||||
remoteAddr: st.RemoteAddr(),
|
||||
},
|
||||
}
|
||||
trInfo.firstLine.client = false
|
||||
trInfo.firstLine.remoteAddr = st.RemoteAddr()
|
||||
|
||||
if dl, ok := stream.Context().Deadline(); ok {
|
||||
trInfo.firstLine.deadline = dl.Sub(time.Now())
|
||||
trInfo.firstLine.deadline = time.Until(dl)
|
||||
}
|
||||
return trInfo
|
||||
}
|
||||
|
@ -794,57 +785,47 @@ func (s *Server) removeConn(c io.Closer) {
|
|||
}
|
||||
}
|
||||
|
||||
// ChannelzMetric returns ServerInternalMetric of current server.
|
||||
// This is an EXPERIMENTAL API.
|
||||
func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric {
|
||||
s.czmu.RLock()
|
||||
defer s.czmu.RUnlock()
|
||||
func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
|
||||
return &channelz.ServerInternalMetric{
|
||||
CallsStarted: s.callsStarted,
|
||||
CallsSucceeded: s.callsSucceeded,
|
||||
CallsFailed: s.callsFailed,
|
||||
LastCallStartedTimestamp: s.lastCallStartedTime,
|
||||
CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
|
||||
CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
|
||||
CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
|
||||
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) incrCallsStarted() {
|
||||
s.czmu.Lock()
|
||||
s.callsStarted++
|
||||
s.lastCallStartedTime = time.Now()
|
||||
s.czmu.Unlock()
|
||||
atomic.AddInt64(&s.czData.callsStarted, 1)
|
||||
atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (s *Server) incrCallsSucceeded() {
|
||||
s.czmu.Lock()
|
||||
s.callsSucceeded++
|
||||
s.czmu.Unlock()
|
||||
atomic.AddInt64(&s.czData.callsSucceeded, 1)
|
||||
}
|
||||
|
||||
func (s *Server) incrCallsFailed() {
|
||||
s.czmu.Lock()
|
||||
s.callsFailed++
|
||||
s.czmu.Unlock()
|
||||
atomic.AddInt64(&s.czData.callsFailed, 1)
|
||||
}
|
||||
|
||||
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
|
||||
var (
|
||||
outPayload *stats.OutPayload
|
||||
)
|
||||
if s.opts.statsHandler != nil {
|
||||
outPayload = &stats.OutPayload{}
|
||||
}
|
||||
hdr, data, err := encode(s.getCodec(stream.ContentSubtype()), msg, cp, outPayload, comp)
|
||||
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
|
||||
if err != nil {
|
||||
grpclog.Errorln("grpc: server failed to encode response: ", err)
|
||||
return err
|
||||
}
|
||||
if len(data) > s.opts.maxSendMessageSize {
|
||||
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
|
||||
compData, err := compress(data, cp, comp)
|
||||
if err != nil {
|
||||
grpclog.Errorln("grpc: server failed to compress response: ", err)
|
||||
return err
|
||||
}
|
||||
err = t.Write(stream, hdr, data, opts)
|
||||
if err == nil && outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
|
||||
hdr, payload := msgHeader(data, compData)
|
||||
// TODO(dfawley): should we be checking len(data) instead?
|
||||
if len(payload) > s.opts.maxSendMessageSize {
|
||||
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
|
||||
}
|
||||
err = t.Write(stream, hdr, payload, opts)
|
||||
if err == nil && s.opts.statsHandler != nil {
|
||||
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -880,7 +861,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
}
|
||||
if trInfo != nil {
|
||||
defer trInfo.tr.Finish()
|
||||
trInfo.firstLine.client = false
|
||||
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
||||
defer func() {
|
||||
if err != nil && err != io.EOF {
|
||||
|
@ -890,6 +870,30 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
}()
|
||||
}
|
||||
|
||||
binlog := binarylog.GetMethodLogger(stream.Method())
|
||||
if binlog != nil {
|
||||
ctx := stream.Context()
|
||||
md, _ := metadata.FromIncomingContext(ctx)
|
||||
logEntry := &binarylog.ClientHeader{
|
||||
Header: md,
|
||||
MethodName: stream.Method(),
|
||||
PeerAddr: nil,
|
||||
}
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
logEntry.Timeout = time.Until(deadline)
|
||||
if logEntry.Timeout < 0 {
|
||||
logEntry.Timeout = 0
|
||||
}
|
||||
}
|
||||
if a := md[":authority"]; len(a) > 0 {
|
||||
logEntry.Authority = a[0]
|
||||
}
|
||||
if peer, ok := peer.FromContext(ctx); ok {
|
||||
logEntry.PeerAddr = peer.Addr
|
||||
}
|
||||
binlog.Log(logEntry)
|
||||
}
|
||||
|
||||
// comp and cp are used for compression. decomp and dc are used for
|
||||
// decompression. If comp and decomp are both set, they are the same;
|
||||
// however they are kept separate to ensure that at most one of the
|
||||
|
@ -926,81 +930,38 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
}
|
||||
}
|
||||
|
||||
p := &parser{r: stream}
|
||||
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
|
||||
if err == io.EOF {
|
||||
// The entire stream is done (for unary RPC only).
|
||||
return err
|
||||
}
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
||||
var payInfo *payloadInfo
|
||||
if sh != nil || binlog != nil {
|
||||
payInfo = &payloadInfo{}
|
||||
}
|
||||
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
|
||||
if err != nil {
|
||||
if st, ok := status.FromError(err); ok {
|
||||
if e := t.WriteStatus(stream, st); e != nil {
|
||||
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
} else {
|
||||
switch st := err.(type) {
|
||||
case transport.ConnectionError:
|
||||
// Nothing to do here.
|
||||
case transport.StreamError:
|
||||
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
|
||||
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
t.IncrMsgRecv()
|
||||
}
|
||||
if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
|
||||
if e := t.WriteStatus(stream, st); e != nil {
|
||||
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
return st.Err()
|
||||
}
|
||||
var inPayload *stats.InPayload
|
||||
if sh != nil {
|
||||
inPayload = &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
}
|
||||
}
|
||||
df := func(v interface{}) error {
|
||||
if inPayload != nil {
|
||||
inPayload.WireLength = len(req)
|
||||
}
|
||||
if pf == compressionMade {
|
||||
var err error
|
||||
if dc != nil {
|
||||
req, err = dc.Do(bytes.NewReader(req))
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
} else {
|
||||
tmp, _ := decomp.Decompress(bytes.NewReader(req))
|
||||
req, err = ioutil.ReadAll(tmp)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(req) > s.opts.maxReceiveMessageSize {
|
||||
// TODO: Revisit the error code. Currently keep it consistent with
|
||||
// java implementation.
|
||||
return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
|
||||
}
|
||||
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
|
||||
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
|
||||
}
|
||||
if inPayload != nil {
|
||||
inPayload.Payload = v
|
||||
inPayload.Data = req
|
||||
inPayload.Length = len(req)
|
||||
sh.HandleRPC(stream.Context(), inPayload)
|
||||
if sh != nil {
|
||||
sh.HandleRPC(stream.Context(), &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
Payload: v,
|
||||
Data: d,
|
||||
Length: len(d),
|
||||
})
|
||||
}
|
||||
if binlog != nil {
|
||||
binlog.Log(&binarylog.ClientMessage{
|
||||
Message: d,
|
||||
})
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
|
||||
|
@ -1023,15 +984,25 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
if e := t.WriteStatus(stream, appStatus); e != nil {
|
||||
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
|
||||
}
|
||||
if binlog != nil {
|
||||
if h, _ := stream.Header(); h.Len() > 0 {
|
||||
// Only log serverHeader if there was header. Otherwise it can
|
||||
// be trailer only.
|
||||
binlog.Log(&binarylog.ServerHeader{
|
||||
Header: h,
|
||||
})
|
||||
}
|
||||
binlog.Log(&binarylog.ServerTrailer{
|
||||
Trailer: stream.Trailer(),
|
||||
Err: appErr,
|
||||
})
|
||||
}
|
||||
return appErr
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(stringer("OK"), false)
|
||||
}
|
||||
opts := &transport.Options{
|
||||
Last: true,
|
||||
Delay: false,
|
||||
}
|
||||
opts := &transport.Options{Last: true}
|
||||
|
||||
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
|
||||
if err == io.EOF {
|
||||
|
@ -1046,16 +1017,31 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
switch st := err.(type) {
|
||||
case transport.ConnectionError:
|
||||
// Nothing to do here.
|
||||
case transport.StreamError:
|
||||
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
|
||||
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
|
||||
}
|
||||
}
|
||||
if binlog != nil {
|
||||
h, _ := stream.Header()
|
||||
binlog.Log(&binarylog.ServerHeader{
|
||||
Header: h,
|
||||
})
|
||||
binlog.Log(&binarylog.ServerTrailer{
|
||||
Trailer: stream.Trailer(),
|
||||
Err: appErr,
|
||||
})
|
||||
}
|
||||
return err
|
||||
}
|
||||
if binlog != nil {
|
||||
h, _ := stream.Header()
|
||||
binlog.Log(&binarylog.ServerHeader{
|
||||
Header: h,
|
||||
})
|
||||
binlog.Log(&binarylog.ServerMessage{
|
||||
Message: reply,
|
||||
})
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
t.IncrMsgSent()
|
||||
}
|
||||
|
@ -1065,7 +1051,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
// TODO: Should we be logging if writing status failed here, like above?
|
||||
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
|
||||
// error or allow the stats handler to see it?
|
||||
return t.WriteStatus(stream, status.New(codes.OK, ""))
|
||||
err = t.WriteStatus(stream, status.New(codes.OK, ""))
|
||||
if binlog != nil {
|
||||
binlog.Log(&binarylog.ServerTrailer{
|
||||
Trailer: stream.Trailer(),
|
||||
Err: appErr,
|
||||
})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
|
||||
|
@ -1099,17 +1092,40 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|||
}
|
||||
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
|
||||
ss := &serverStream{
|
||||
ctx: ctx,
|
||||
t: t,
|
||||
s: stream,
|
||||
p: &parser{r: stream},
|
||||
codec: s.getCodec(stream.ContentSubtype()),
|
||||
ctx: ctx,
|
||||
t: t,
|
||||
s: stream,
|
||||
p: &parser{r: stream},
|
||||
codec: s.getCodec(stream.ContentSubtype()),
|
||||
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
|
||||
maxSendMessageSize: s.opts.maxSendMessageSize,
|
||||
trInfo: trInfo,
|
||||
statsHandler: sh,
|
||||
}
|
||||
|
||||
ss.binlog = binarylog.GetMethodLogger(stream.Method())
|
||||
if ss.binlog != nil {
|
||||
md, _ := metadata.FromIncomingContext(ctx)
|
||||
logEntry := &binarylog.ClientHeader{
|
||||
Header: md,
|
||||
MethodName: stream.Method(),
|
||||
PeerAddr: nil,
|
||||
}
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
logEntry.Timeout = time.Until(deadline)
|
||||
if logEntry.Timeout < 0 {
|
||||
logEntry.Timeout = 0
|
||||
}
|
||||
}
|
||||
if a := md[":authority"]; len(a) > 0 {
|
||||
logEntry.Authority = a[0]
|
||||
}
|
||||
if peer, ok := peer.FromContext(ss.Context()); ok {
|
||||
logEntry.PeerAddr = peer.Addr
|
||||
}
|
||||
ss.binlog.Log(logEntry)
|
||||
}
|
||||
|
||||
// If dc is set and matches the stream's compression, use it. Otherwise, try
|
||||
// to find a matching registered compressor for decomp.
|
||||
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
|
||||
|
@ -1169,12 +1185,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|||
if appErr != nil {
|
||||
appStatus, ok := status.FromError(appErr)
|
||||
if !ok {
|
||||
switch err := appErr.(type) {
|
||||
case transport.StreamError:
|
||||
appStatus = status.New(err.Code, err.Desc)
|
||||
default:
|
||||
appStatus = status.New(codes.Unknown, appErr.Error())
|
||||
}
|
||||
appStatus = status.New(codes.Unknown, appErr.Error())
|
||||
appErr = appStatus.Err()
|
||||
}
|
||||
if trInfo != nil {
|
||||
|
@ -1184,6 +1195,12 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|||
ss.mu.Unlock()
|
||||
}
|
||||
t.WriteStatus(ss.s, appStatus)
|
||||
if ss.binlog != nil {
|
||||
ss.binlog.Log(&binarylog.ServerTrailer{
|
||||
Trailer: ss.s.Trailer(),
|
||||
Err: appErr,
|
||||
})
|
||||
}
|
||||
// TODO: Should we log an error from WriteStatus here and below?
|
||||
return appErr
|
||||
}
|
||||
|
@ -1192,7 +1209,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|||
ss.trInfo.tr.LazyLog(stringer("OK"), false)
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
return t.WriteStatus(ss.s, status.New(codes.OK, ""))
|
||||
err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
|
||||
if ss.binlog != nil {
|
||||
ss.binlog.Log(&binarylog.ServerTrailer{
|
||||
Trailer: ss.s.Trailer(),
|
||||
Err: appErr,
|
||||
})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
|
||||
|
@ -1221,47 +1245,33 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
|||
}
|
||||
service := sm[:pos]
|
||||
method := sm[pos+1:]
|
||||
srv, ok := s.m[service]
|
||||
if !ok {
|
||||
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
||||
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
||||
|
||||
srv, knownService := s.m[service]
|
||||
if knownService {
|
||||
if md, ok := srv.md[method]; ok {
|
||||
s.processUnaryRPC(t, stream, srv, md, trInfo)
|
||||
return
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
|
||||
trInfo.tr.SetError()
|
||||
if sd, ok := srv.sd[method]; ok {
|
||||
s.processStreamingRPC(t, stream, srv, sd, trInfo)
|
||||
return
|
||||
}
|
||||
errDesc := fmt.Sprintf("unknown service %v", service)
|
||||
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.Finish()
|
||||
}
|
||||
return
|
||||
}
|
||||
// Unary RPC or Streaming RPC?
|
||||
if md, ok := srv.md[method]; ok {
|
||||
s.processUnaryRPC(t, stream, srv, md, trInfo)
|
||||
return
|
||||
}
|
||||
if sd, ok := srv.sd[method]; ok {
|
||||
s.processStreamingRPC(t, stream, srv, sd, trInfo)
|
||||
return
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
// Unknown service, or known server unknown method.
|
||||
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
||||
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
||||
return
|
||||
}
|
||||
errDesc := fmt.Sprintf("unknown method %v", method)
|
||||
var errDesc string
|
||||
if !knownService {
|
||||
errDesc = fmt.Sprintf("unknown service %v", service)
|
||||
} else {
|
||||
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyPrintf("%s", errDesc)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
|
@ -1410,12 +1420,6 @@ func (s *Server) GracefulStop() {
|
|||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func init() {
|
||||
internal.TestingUseHandlerImpl = func(arg interface{}) {
|
||||
arg.(*Server).opts.useHandlerImpl = true
|
||||
}
|
||||
}
|
||||
|
||||
// contentSubtype must be lowercase
|
||||
// cannot return nil
|
||||
func (s *Server) getCodec(contentSubtype string) baseCodec {
|
||||
|
@ -1484,3 +1488,11 @@ func Method(ctx context.Context) (string, bool) {
|
|||
}
|
||||
return s.Method(), true
|
||||
}
|
||||
|
||||
type channelzServer struct {
|
||||
s *Server
|
||||
}
|
||||
|
||||
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
|
||||
return c.s.channelzMetric()
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue