Upgrade dependencies
This commit is contained in:
parent
d6d795e286
commit
83e09acc9f
177 changed files with 59841 additions and 39358 deletions
397
vendor/google.golang.org/grpc/transport/transport.go
generated
vendored
397
vendor/google.golang.org/grpc/transport/transport.go
generated
vendored
|
@ -17,17 +17,19 @@
|
|||
*/
|
||||
|
||||
// Package transport defines and implements message oriented communication
|
||||
// channel to complete various transactions (e.g., an RPC).
|
||||
package transport // import "google.golang.org/grpc/transport"
|
||||
// channel to complete various transactions (e.g., an RPC). It is meant for
|
||||
// grpc-internal usage and is not intended to be imported directly by users.
|
||||
package transport // externally used as import "google.golang.org/grpc/transport"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
@ -56,6 +58,7 @@ type recvBuffer struct {
|
|||
c chan recvMsg
|
||||
mu sync.Mutex
|
||||
backlog []recvMsg
|
||||
err error
|
||||
}
|
||||
|
||||
func newRecvBuffer() *recvBuffer {
|
||||
|
@ -67,20 +70,27 @@ func newRecvBuffer() *recvBuffer {
|
|||
|
||||
func (b *recvBuffer) put(r recvMsg) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.err != nil {
|
||||
b.mu.Unlock()
|
||||
// An error had occurred earlier, don't accept more
|
||||
// data or errors.
|
||||
return
|
||||
}
|
||||
b.err = r.err
|
||||
if len(b.backlog) == 0 {
|
||||
select {
|
||||
case b.c <- r:
|
||||
b.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
b.backlog = append(b.backlog, r)
|
||||
b.mu.Unlock()
|
||||
}
|
||||
|
||||
func (b *recvBuffer) load() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if len(b.backlog) > 0 {
|
||||
select {
|
||||
case b.c <- b.backlog[0]:
|
||||
|
@ -89,6 +99,7 @@ func (b *recvBuffer) load() {
|
|||
default:
|
||||
}
|
||||
}
|
||||
b.mu.Unlock()
|
||||
}
|
||||
|
||||
// get returns the channel that receives a recvMsg in the buffer.
|
||||
|
@ -99,14 +110,15 @@ func (b *recvBuffer) get() <-chan recvMsg {
|
|||
return b.c
|
||||
}
|
||||
|
||||
//
|
||||
// recvBufferReader implements io.Reader interface to read the data from
|
||||
// recvBuffer.
|
||||
type recvBufferReader struct {
|
||||
ctx context.Context
|
||||
goAway chan struct{}
|
||||
recv *recvBuffer
|
||||
last []byte // Stores the remaining data in the previous calls.
|
||||
err error
|
||||
ctx context.Context
|
||||
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
|
||||
recv *recvBuffer
|
||||
last []byte // Stores the remaining data in the previous calls.
|
||||
err error
|
||||
}
|
||||
|
||||
// Read reads the next len(p) bytes from last. If last is drained, it tries to
|
||||
|
@ -116,7 +128,11 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
|
|||
if r.err != nil {
|
||||
return 0, r.err
|
||||
}
|
||||
defer func() { r.err = err }()
|
||||
n, r.err = r.read(p)
|
||||
return n, r.err
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) read(p []byte) (n int, err error) {
|
||||
if r.last != nil && len(r.last) > 0 {
|
||||
// Read remaining data left in last call.
|
||||
copied := copy(p, r.last)
|
||||
|
@ -124,10 +140,8 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
|
|||
return copied, nil
|
||||
}
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
case <-r.ctxDone:
|
||||
return 0, ContextErr(r.ctx.Err())
|
||||
case <-r.goAway:
|
||||
return 0, ErrStreamDrain
|
||||
case m := <-r.recv.get():
|
||||
r.recv.load()
|
||||
if m.err != nil {
|
||||
|
@ -139,60 +153,7 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
// All items in an out of a controlBuffer should be the same type.
|
||||
type item interface {
|
||||
item()
|
||||
}
|
||||
|
||||
// controlBuffer is an unbounded channel of item.
|
||||
type controlBuffer struct {
|
||||
c chan item
|
||||
mu sync.Mutex
|
||||
backlog []item
|
||||
}
|
||||
|
||||
func newControlBuffer() *controlBuffer {
|
||||
b := &controlBuffer{
|
||||
c: make(chan item, 1),
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *controlBuffer) put(r item) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if len(b.backlog) == 0 {
|
||||
select {
|
||||
case b.c <- r:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
b.backlog = append(b.backlog, r)
|
||||
}
|
||||
|
||||
func (b *controlBuffer) load() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if len(b.backlog) > 0 {
|
||||
select {
|
||||
case b.c <- b.backlog[0]:
|
||||
b.backlog[0] = nil
|
||||
b.backlog = b.backlog[1:]
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// get returns the channel that receives an item in the buffer.
|
||||
//
|
||||
// Upon receipt of an item, the caller should call load to send another
|
||||
// item onto the channel if there is any.
|
||||
func (b *controlBuffer) get() <-chan item {
|
||||
return b.c
|
||||
}
|
||||
|
||||
type streamState uint8
|
||||
type streamState uint32
|
||||
|
||||
const (
|
||||
streamActive streamState = iota
|
||||
|
@ -203,65 +164,75 @@ const (
|
|||
|
||||
// Stream represents an RPC in the transport layer.
|
||||
type Stream struct {
|
||||
id uint32
|
||||
// nil for client side Stream.
|
||||
st ServerTransport
|
||||
// ctx is the associated context of the stream.
|
||||
ctx context.Context
|
||||
// cancel is always nil for client side Stream.
|
||||
cancel context.CancelFunc
|
||||
// done is closed when the final status arrives.
|
||||
done chan struct{}
|
||||
// goAway is closed when the server sent GoAways signal before this stream was initiated.
|
||||
goAway chan struct{}
|
||||
// method records the associated RPC method of the stream.
|
||||
method string
|
||||
id uint32
|
||||
st ServerTransport // nil for client side Stream
|
||||
ctx context.Context // the associated context of the stream
|
||||
cancel context.CancelFunc // always nil for client side Stream
|
||||
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
|
||||
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
|
||||
method string // the associated RPC method of the stream
|
||||
recvCompress string
|
||||
sendCompress string
|
||||
buf *recvBuffer
|
||||
trReader io.Reader
|
||||
fc *inFlow
|
||||
recvQuota uint32
|
||||
|
||||
// TODO: Remote this unused variable.
|
||||
// The accumulated inbound quota pending for window update.
|
||||
updateQuota uint32
|
||||
wq *writeQuota
|
||||
|
||||
// Callback to state application's intentions to read data. This
|
||||
// is used to adjust flow control, if need be.
|
||||
// is used to adjust flow control, if needed.
|
||||
requestRead func(int)
|
||||
|
||||
sendQuotaPool *quotaPool
|
||||
// Close headerChan to indicate the end of reception of header metadata.
|
||||
headerChan chan struct{}
|
||||
// header caches the received header metadata.
|
||||
header metadata.MD
|
||||
// The key-value map of trailer metadata.
|
||||
trailer metadata.MD
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
header metadata.MD // the received header metadata.
|
||||
trailer metadata.MD // the key-value map of trailer metadata.
|
||||
|
||||
mu sync.RWMutex // guard the following
|
||||
// headerOK becomes true from the first header is about to send.
|
||||
headerOk bool
|
||||
headerOk bool // becomes true from the first header is about to send
|
||||
state streamState
|
||||
// true iff headerChan is closed. Used to avoid closing headerChan
|
||||
// multiple times.
|
||||
headerDone bool
|
||||
// the status error received from the server.
|
||||
status *status.Status
|
||||
// rstStream indicates whether a RST_STREAM frame needs to be sent
|
||||
// to the server to signify that this stream is closing.
|
||||
rstStream bool
|
||||
// rstError is the error that needs to be sent along with the RST_STREAM frame.
|
||||
rstError http2.ErrCode
|
||||
// bytesSent and bytesReceived indicates whether any bytes have been sent or
|
||||
// received on this stream.
|
||||
bytesSent bool
|
||||
bytesReceived bool
|
||||
|
||||
status *status.Status // the status error received from the server
|
||||
|
||||
bytesReceived uint32 // indicates whether any bytes have been received on this stream
|
||||
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
|
||||
|
||||
// contentSubtype is the content-subtype for requests.
|
||||
// this must be lowercase or the behavior is undefined.
|
||||
contentSubtype string
|
||||
}
|
||||
|
||||
func (s *Stream) swapState(st streamState) streamState {
|
||||
return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
|
||||
}
|
||||
|
||||
func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
|
||||
return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
|
||||
}
|
||||
|
||||
func (s *Stream) getState() streamState {
|
||||
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
|
||||
}
|
||||
|
||||
func (s *Stream) waitOnHeader() error {
|
||||
if s.headerChan == nil {
|
||||
// On the server headerChan is always nil since a stream originates
|
||||
// only after having received headers.
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return ContextErr(s.ctx.Err())
|
||||
case <-s.headerChan:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
// message. It is empty string if there is no compression applied.
|
||||
func (s *Stream) RecvCompress() string {
|
||||
if err := s.waitOnHeader(); err != nil {
|
||||
return ""
|
||||
}
|
||||
return s.recvCompress
|
||||
}
|
||||
|
||||
|
@ -276,28 +247,17 @@ func (s *Stream) Done() <-chan struct{} {
|
|||
return s.done
|
||||
}
|
||||
|
||||
// GoAway returns a channel which is closed when the server sent GoAways signal
|
||||
// before this stream was initiated.
|
||||
func (s *Stream) GoAway() <-chan struct{} {
|
||||
return s.goAway
|
||||
}
|
||||
|
||||
// Header acquires the key-value pairs of header metadata once it
|
||||
// is available. It blocks until i) the metadata is ready or ii) there is no
|
||||
// header metadata or iii) the stream is canceled/expired.
|
||||
func (s *Stream) Header() (metadata.MD, error) {
|
||||
var err error
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
err = ContextErr(s.ctx.Err())
|
||||
case <-s.goAway:
|
||||
err = ErrStreamDrain
|
||||
case <-s.headerChan:
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
err := s.waitOnHeader()
|
||||
// Even if the stream is closed, header is returned if available.
|
||||
select {
|
||||
case <-s.headerChan:
|
||||
if s.header == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return s.header.Copy(), nil
|
||||
default:
|
||||
}
|
||||
|
@ -307,10 +267,11 @@ func (s *Stream) Header() (metadata.MD, error) {
|
|||
// Trailer returns the cached trailer metedata. Note that if it is not called
|
||||
// after the entire stream is done, it could return an empty MD. Client
|
||||
// side only.
|
||||
// It can be safely read only after stream has ended that is either read
|
||||
// or write have returned io.EOF.
|
||||
func (s *Stream) Trailer() metadata.MD {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.trailer.Copy()
|
||||
c := s.trailer.Copy()
|
||||
return c
|
||||
}
|
||||
|
||||
// ServerTransport returns the underlying ServerTransport for the stream.
|
||||
|
@ -319,6 +280,15 @@ func (s *Stream) ServerTransport() ServerTransport {
|
|||
return s.st
|
||||
}
|
||||
|
||||
// ContentSubtype returns the content-subtype for a request. For example, a
|
||||
// content-subtype of "proto" will result in a content-type of
|
||||
// "application/grpc+proto". This will always be lowercase. See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
||||
// more details.
|
||||
func (s *Stream) ContentSubtype() string {
|
||||
return s.contentSubtype
|
||||
}
|
||||
|
||||
// Context returns the context of the stream.
|
||||
func (s *Stream) Context() context.Context {
|
||||
return s.ctx
|
||||
|
@ -330,33 +300,41 @@ func (s *Stream) Method() string {
|
|||
}
|
||||
|
||||
// Status returns the status received from the server.
|
||||
// Status can be read safely only after the stream has ended,
|
||||
// that is, read or write has returned io.EOF.
|
||||
func (s *Stream) Status() *status.Status {
|
||||
return s.status
|
||||
}
|
||||
|
||||
// SetHeader sets the header metadata. This can be called multiple times.
|
||||
// Server side only.
|
||||
// This should not be called in parallel to other data writes.
|
||||
func (s *Stream) SetHeader(md metadata.MD) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.headerOk || s.state == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.header = metadata.Join(s.header, md)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendHeader sends the given header metadata. The given metadata is
|
||||
// combined with any metadata set by previous calls to SetHeader and
|
||||
// then written to the transport stream.
|
||||
func (s *Stream) SendHeader(md metadata.MD) error {
|
||||
t := s.ServerTransport()
|
||||
return t.WriteHeader(s, md)
|
||||
}
|
||||
|
||||
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
||||
// by the server. This can be called multiple times. Server side only.
|
||||
// This should not be called parallel to other data writes.
|
||||
func (s *Stream) SetTrailer(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.trailer = metadata.Join(s.trailer, md)
|
||||
return nil
|
||||
}
|
||||
|
@ -397,26 +375,15 @@ func (t *transportReader) Read(p []byte) (n int, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// finish sets the stream's state and status, and closes the done channel.
|
||||
// s.mu must be held by the caller. st must always be non-nil.
|
||||
func (s *Stream) finish(st *status.Status) {
|
||||
s.status = st
|
||||
s.state = streamDone
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
// BytesSent indicates whether any bytes have been sent on this stream.
|
||||
func (s *Stream) BytesSent() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.bytesSent
|
||||
}
|
||||
|
||||
// BytesReceived indicates whether any bytes have been received on this stream.
|
||||
func (s *Stream) BytesReceived() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.bytesReceived
|
||||
return atomic.LoadUint32(&s.bytesReceived) == 1
|
||||
}
|
||||
|
||||
// Unprocessed indicates whether the server did not process this stream --
|
||||
// i.e. it sent a refused stream or GOAWAY including this stream ID.
|
||||
func (s *Stream) Unprocessed() bool {
|
||||
return atomic.LoadUint32(&s.unprocessed) == 1
|
||||
}
|
||||
|
||||
// GoString is implemented by Stream so context.String() won't
|
||||
|
@ -425,27 +392,11 @@ func (s *Stream) GoString() string {
|
|||
return fmt.Sprintf("<stream: %p, %v>", s, s.method)
|
||||
}
|
||||
|
||||
// The key to save transport.Stream in the context.
|
||||
type streamKey struct{}
|
||||
|
||||
// newContextWithStream creates a new context from ctx and attaches stream
|
||||
// to it.
|
||||
func newContextWithStream(ctx context.Context, stream *Stream) context.Context {
|
||||
return context.WithValue(ctx, streamKey{}, stream)
|
||||
}
|
||||
|
||||
// StreamFromContext returns the stream saved in ctx.
|
||||
func StreamFromContext(ctx context.Context) (s *Stream, ok bool) {
|
||||
s, ok = ctx.Value(streamKey{}).(*Stream)
|
||||
return
|
||||
}
|
||||
|
||||
// state of transport
|
||||
type transportState int
|
||||
|
||||
const (
|
||||
reachable transportState = iota
|
||||
unreachable
|
||||
closing
|
||||
draining
|
||||
)
|
||||
|
@ -460,6 +411,9 @@ type ServerConfig struct {
|
|||
KeepalivePolicy keepalive.EnforcementPolicy
|
||||
InitialWindowSize int32
|
||||
InitialConnWindowSize int32
|
||||
WriteBufferSize int
|
||||
ReadBufferSize int
|
||||
ChannelzParentID int64
|
||||
}
|
||||
|
||||
// NewServerTransport creates a ServerTransport with conn or non-nil error
|
||||
|
@ -487,22 +441,29 @@ type ConnectOptions struct {
|
|||
KeepaliveParams keepalive.ClientParameters
|
||||
// StatsHandler stores the handler for stats.
|
||||
StatsHandler stats.Handler
|
||||
// InitialWindowSize sets the intial window size for a stream.
|
||||
// InitialWindowSize sets the initial window size for a stream.
|
||||
InitialWindowSize int32
|
||||
// InitialConnWindowSize sets the intial window size for a connection.
|
||||
// InitialConnWindowSize sets the initial window size for a connection.
|
||||
InitialConnWindowSize int32
|
||||
// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
|
||||
WriteBufferSize int
|
||||
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
|
||||
ReadBufferSize int
|
||||
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
|
||||
ChannelzParentID int64
|
||||
}
|
||||
|
||||
// TargetInfo contains the information of the target such as network address and metadata.
|
||||
type TargetInfo struct {
|
||||
Addr string
|
||||
Metadata interface{}
|
||||
Addr string
|
||||
Metadata interface{}
|
||||
Authority string
|
||||
}
|
||||
|
||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||
// and returns it to the caller.
|
||||
func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
|
||||
return newHTTP2Client(ctx, target, opts)
|
||||
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
|
||||
}
|
||||
|
||||
// Options provides additional hints and information for message
|
||||
|
@ -514,7 +475,7 @@ type Options struct {
|
|||
|
||||
// Delay is a hint to the transport implementation for whether
|
||||
// the data could be buffered for a batching write. The
|
||||
// Transport implementation may ignore the hint.
|
||||
// transport implementation may ignore the hint.
|
||||
Delay bool
|
||||
}
|
||||
|
||||
|
@ -526,10 +487,6 @@ type CallHdr struct {
|
|||
// Method specifies the operation to perform.
|
||||
Method string
|
||||
|
||||
// RecvCompress specifies the compression algorithm applied on
|
||||
// inbound messages.
|
||||
RecvCompress string
|
||||
|
||||
// SendCompress specifies the compression algorithm applied on
|
||||
// outbound message.
|
||||
SendCompress string
|
||||
|
@ -544,6 +501,14 @@ type CallHdr struct {
|
|||
// for performance purposes.
|
||||
// If it's false, new stream will never be flushed.
|
||||
Flush bool
|
||||
|
||||
// ContentSubtype specifies the content-subtype for a request. For example, a
|
||||
// content-subtype of "proto" will result in a content-type of
|
||||
// "application/grpc+proto". The value of ContentSubtype must be all
|
||||
// lowercase, otherwise the behavior is undefined. See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
|
||||
// for more details.
|
||||
ContentSubtype string
|
||||
}
|
||||
|
||||
// ClientTransport is the common interface for all gRPC client-side transport
|
||||
|
@ -560,7 +525,7 @@ type ClientTransport interface {
|
|||
|
||||
// Write sends the data for the given stream. A nil stream indicates
|
||||
// the write is to be performed on the transport as a whole.
|
||||
Write(s *Stream, data []byte, opts *Options) error
|
||||
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
|
||||
|
||||
// NewStream creates a Stream for an RPC.
|
||||
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
|
||||
|
@ -585,6 +550,12 @@ type ClientTransport interface {
|
|||
|
||||
// GetGoAwayReason returns the reason why GoAway frame was received.
|
||||
GetGoAwayReason() GoAwayReason
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
||||
// IncrMsgRecv increments the number of message received through this transport.
|
||||
IncrMsgRecv()
|
||||
}
|
||||
|
||||
// ServerTransport is the common interface for all gRPC server-side transport
|
||||
|
@ -602,7 +573,7 @@ type ServerTransport interface {
|
|||
|
||||
// Write sends the data for the given stream.
|
||||
// Write may not be called on all streams.
|
||||
Write(s *Stream, data []byte, opts *Options) error
|
||||
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
|
||||
|
||||
// WriteStatus sends the status of a stream to the client. WriteStatus is
|
||||
// the final call made on a stream and always occurs.
|
||||
|
@ -618,6 +589,12 @@ type ServerTransport interface {
|
|||
|
||||
// Drain notifies the client this ServerTransport stops accepting new RPCs.
|
||||
Drain()
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
||||
// IncrMsgRecv increments the number of message received through this transport.
|
||||
IncrMsgRecv()
|
||||
}
|
||||
|
||||
// streamErrorf creates an StreamError with the specified error code and description.
|
||||
|
@ -667,9 +644,16 @@ func (e ConnectionError) Origin() error {
|
|||
var (
|
||||
// ErrConnClosing indicates that the transport is closing.
|
||||
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
|
||||
// ErrStreamDrain indicates that the stream is rejected by the server because
|
||||
// the server stops accepting new RPCs.
|
||||
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
|
||||
// errStreamDrain indicates that the stream is rejected because the
|
||||
// connection is draining. This could be caused by goaway or balancer
|
||||
// removing the address.
|
||||
errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
|
||||
// errStreamDone is returned from write at the client side to indiacte application
|
||||
// layer of an error.
|
||||
errStreamDone = errors.New("the stream is done")
|
||||
// StatusGoAway indicates that the server sent a GOAWAY that included this
|
||||
// stream's ID in unprocessed RPCs.
|
||||
statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
|
||||
)
|
||||
|
||||
// TODO: See if we can replace StreamError with status package errors.
|
||||
|
@ -684,43 +668,16 @@ func (e StreamError) Error() string {
|
|||
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
|
||||
}
|
||||
|
||||
// wait blocks until it can receive from ctx.Done, closing, or proceed.
|
||||
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
|
||||
// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
|
||||
// it return the StreamError for ctx.Err.
|
||||
// If it receives from goAway, it returns 0, ErrStreamDrain.
|
||||
// If it receives from closing, it returns 0, ErrConnClosing.
|
||||
// If it receives from proceed, it returns the received integer, nil.
|
||||
func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ContextErr(ctx.Err())
|
||||
case <-done:
|
||||
// User cancellation has precedence.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ContextErr(ctx.Err())
|
||||
default:
|
||||
}
|
||||
return 0, io.EOF
|
||||
case <-goAway:
|
||||
return 0, ErrStreamDrain
|
||||
case <-closing:
|
||||
return 0, ErrConnClosing
|
||||
case i := <-proceed:
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
|
||||
// GoAwayReason contains the reason for the GoAway frame received.
|
||||
type GoAwayReason uint8
|
||||
|
||||
const (
|
||||
// Invalid indicates that no GoAway frame is received.
|
||||
Invalid GoAwayReason = 0
|
||||
// NoReason is the default value when GoAway frame is received.
|
||||
NoReason GoAwayReason = 1
|
||||
// TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
|
||||
// was recieved and that the debug data said "too_many_pings".
|
||||
TooManyPings GoAwayReason = 2
|
||||
// GoAwayInvalid indicates that no GoAway frame is received.
|
||||
GoAwayInvalid GoAwayReason = 0
|
||||
// GoAwayNoReason is the default value when GoAway frame is received.
|
||||
GoAwayNoReason GoAwayReason = 1
|
||||
// GoAwayTooManyPings indicates that a GoAway frame with
|
||||
// ErrCodeEnhanceYourCalm was received and that the debug data said
|
||||
// "too_many_pings".
|
||||
GoAwayTooManyPings GoAwayReason = 2
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue