Add e2e tests

This commit is contained in:
Manuel de Brito Fontes 2017-10-17 19:50:27 -03:00
parent 99a355f25d
commit 601fb7dacf
1163 changed files with 289217 additions and 14195 deletions

View file

@ -59,7 +59,7 @@ type bdpEstimator struct {
sample uint32
// bwMax is the maximum bandwidth noted so far (bytes/sec).
bwMax float64
// bool to keep track of the begining of a new measurement cycle.
// bool to keep track of the beginning of a new measurement cycle.
isSent bool
// Callback to update the window sizes.
updateFlowControl func(n uint32)
@ -70,7 +70,7 @@ type bdpEstimator struct {
}
// timesnap registers the time bdp ping was sent out so that
// network rtt can be calculated when its ack is recieved.
// network rtt can be calculated when its ack is received.
// It is called (by controller) when the bdpPing is
// being written on the wire.
func (b *bdpEstimator) timesnap(d [8]byte) {
@ -119,7 +119,7 @@ func (b *bdpEstimator) calculate(d [8]byte) {
b.rtt += (rttSample - b.rtt) * float64(alpha)
}
b.isSent = false
// The number of bytes accumalated so far in the sample is smaller
// The number of bytes accumulated so far in the sample is smaller
// than or equal to 1.5 times the real BDP on a saturated connection.
bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
if bwCurrent > b.bwMax {

View file

@ -22,9 +22,11 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const (
@ -44,15 +46,44 @@ const (
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
// max window limit set by HTTP2 Specs.
maxWindowSize = math.MaxInt32
// defaultLocalSendQuota sets is default value for number of data
// bytes that each stream can schedule before some of it being
// flushed out.
defaultLocalSendQuota = 64 * 1024
)
// The following defines various control items which could flow through
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
endStream bool
}
func (*headerFrame) item() {}
type continuationFrame struct {
streamID uint32
endHeaders bool
headerBlockFragment []byte
}
type dataFrame struct {
streamID uint32
endStream bool
d []byte
f func()
}
func (*dataFrame) item() {}
func (*continuationFrame) item() {}
type windowUpdate struct {
streamID uint32
increment uint32
flush bool
}
func (*windowUpdate) item() {}
@ -97,8 +128,9 @@ func (*ping) item() {}
type quotaPool struct {
c chan int
mu sync.Mutex
quota int
mu sync.Mutex
version uint32
quota int
}
// newQuotaPool creates a quotaPool which has quota q available to consume.
@ -119,6 +151,10 @@ func newQuotaPool(q int) *quotaPool {
func (qb *quotaPool) add(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
qb.lockedAdd(v)
}
func (qb *quotaPool) lockedAdd(v int) {
select {
case n := <-qb.c:
qb.quota += n
@ -139,6 +175,35 @@ func (qb *quotaPool) add(v int) {
}
}
func (qb *quotaPool) addAndUpdate(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
qb.lockedAdd(v)
// Update the version only after having added to the quota
// so that if acquireWithVesrion sees the new vesrion it is
// guaranteed to have seen the updated quota.
// Also, still keep this inside of the lock, so that when
// compareAndExecute is processing, this function doesn't
// get executed partially (quota gets updated but the version
// doesn't).
atomic.AddUint32(&(qb.version), 1)
}
func (qb *quotaPool) acquireWithVersion() (<-chan int, uint32) {
return qb.c, atomic.LoadUint32(&(qb.version))
}
func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
qb.mu.Lock()
defer qb.mu.Unlock()
if version == atomic.LoadUint32(&(qb.version)) {
success()
return true
}
failure()
return false
}
// acquire returns the channel on which available quota amounts are sent.
func (qb *quotaPool) acquire() <-chan int {
return qb.c

View file

@ -1,45 +0,0 @@
// +build go1.6,!go1.7
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
"net"
"google.golang.org/grpc/codes"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
}
// ContextErr converts the error from context package into a StreamError.
func ContextErr(err error) StreamError {
switch err {
case context.DeadlineExceeded:
return streamErrorf(codes.DeadlineExceeded, "%v", err)
case context.Canceled:
return streamErrorf(codes.Canceled, "%v", err)
}
return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
}

View file

@ -1,46 +0,0 @@
// +build go1.7
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
"context"
"net"
"google.golang.org/grpc/codes"
netctx "golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, address)
}
// ContextErr converts the error from context package into a StreamError.
func ContextErr(err error) StreamError {
switch err {
case context.DeadlineExceeded, netctx.DeadlineExceeded:
return streamErrorf(codes.DeadlineExceeded, "%v", err)
case context.Canceled, netctx.Canceled:
return streamErrorf(codes.Canceled, "%v", err)
}
return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
}

View file

@ -173,7 +173,6 @@ func (ht *serverHandlerTransport) do(fn func()) error {
case <-ht.closedCh:
return ErrConnClosing
}
}
}
@ -183,6 +182,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
ht.mu.Unlock()
return nil
}
ht.streamDone = true
ht.mu.Unlock()
err := ht.do(func() {
ht.writeCommonHeaders(s)
@ -223,9 +223,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
})
close(ht.writes)
ht.mu.Lock()
ht.streamDone = true
ht.mu.Unlock()
return err
}

View file

@ -26,6 +26,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"sync"
"testing"
"time"
@ -390,6 +391,30 @@ func TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
}
}
func TestHandlerTransport_HandleStreams_MultiWriteStatus(t *testing.T) {
st := newHandleStreamTest(t)
handleStream := func(s *Stream) {
if want := "/service/foo.bar"; s.method != want {
t.Errorf("stream method = %q; want %q", s.method, want)
}
st.bodyw.Close() // no body
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
st.ht.WriteStatus(s, status.New(codes.OK, ""))
}()
}
wg.Wait()
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
func(ctx context.Context, method string) context.Context { return ctx },
)
}
func TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
errDetails := []proto.Message{
&epb.RetryInfo{

View file

@ -43,6 +43,7 @@ import (
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
ctx context.Context
cancel context.CancelFunc
target string // server name/addr
userAgent string
md interface{}
@ -52,17 +53,6 @@ type http2Client struct {
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by sending a value on writableChan
// and releases it by receiving from writableChan.
writableChan chan int
// shutdownChan is closed when Close is called.
// Blocking operations should select on shutdownChan to avoid
// blocking forever after Close.
// TODO(zhaoq): Maybe have a channel context?
shutdownChan chan struct{}
// errorChan is closed to notify the I/O error to the caller.
errorChan chan struct{}
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
@ -119,7 +109,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
if fn != nil {
return fn(ctx, addr)
}
return dialContext(ctx, "tcp", addr)
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
}
func isTemporary(err error) bool {
@ -153,9 +143,18 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, timeout time.Duration) (_ ClientTransport, err error) {
scheme := "http"
conn, err := dial(ctx, opts.Dialer, addr.Addr)
ctx, cancel := context.WithCancel(ctx)
connectCtx, connectCancel := context.WithTimeout(ctx, timeout)
defer func() {
connectCancel()
if err != nil {
cancel()
}
}()
conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
@ -174,7 +173,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
)
if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Addr, conn)
if err != nil {
// Credentials handshake errors are typically considered permanent
// to avoid retrying on e.g. bad certificates.
@ -198,8 +197,17 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
dynamicWindow = false
}
var buf bytes.Buffer
writeBufSize := defaultWriteBufSize
if opts.WriteBufferSize > 0 {
writeBufSize = opts.WriteBufferSize
}
readBufSize := defaultReadBufSize
if opts.ReadBufferSize > 0 {
readBufSize = opts.ReadBufferSize
}
t := &http2Client{
ctx: ctx,
cancel: cancel,
target: addr.Addr,
userAgent: opts.UserAgent,
md: addr.Metadata,
@ -209,14 +217,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
nextID: 1,
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
framer: newFramer(conn, writeBufSize, readBufSize),
controlBuf: newControlBuffer(),
fc: &inFlow{limit: uint32(icwz)},
sendQuotaPool: newQuotaPool(defaultWindowSize),
@ -270,12 +275,12 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
if t.initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{
err = t.framer.fr.WriteSettings(http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(t.initialWindowSize),
})
} else {
err = t.framer.writeSettings(true)
err = t.framer.fr.WriteSettings()
}
if err != nil {
t.Close()
@ -283,31 +288,35 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
}
}
go t.controller()
t.framer.writer.Flush()
go func() {
loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
t.Close()
}()
if t.kp.Time != infinity {
go t.keepalive()
}
t.writableChan <- 0
return t, nil
}
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
id: t.nextID,
done: make(chan struct{}),
goAway: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
fc: &inFlow{limit: uint32(t.initialWindowSize)},
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
headerChan: make(chan struct{}),
id: t.nextID,
done: make(chan struct{}),
goAway: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
fc: &inFlow{limit: uint32(t.initialWindowSize)},
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
localSendQuota: newQuotaPool(defaultLocalSendQuota),
headerChan: make(chan struct{}),
}
t.nextID += 2
s.requestRead = func(n int) {
@ -368,13 +377,13 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
authData[k] = v
}
}
callAuthData := make(map[string]string)
callAuthData := map[string]string{}
// Check if credentials.PerRPCCredentials were provided via call options.
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
if callCreds := callHdr.Creds; callCreds != nil {
if !t.isSecure && callCreds.RequireTransportSecurity() {
return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton")
return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
}
data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil {
@ -400,7 +409,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
t.mu.Unlock()
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
sq, err := wait(ctx, t.ctx, nil, nil, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
@ -408,19 +417,66 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok {
t.streamsQuota.add(1)
// TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
// Make the slice of certain predictable size to reduce allocations made by append.
hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
hfLen += len(authData) + len(callAuthData)
headerFields := make([]hpack.HeaderField, 0, hfLen)
headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
timeout := dl.Sub(time.Now())
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
for k, v := range authData {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
for k, v := range callAuthData {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
if b := stats.OutgoingTags(ctx); b != nil {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
}
if b := stats.OutgoingTrace(ctx); b != nil {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
return nil, err
}
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
t.streamsQuota.add(1)
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
}
if t.state != reachable {
@ -434,7 +490,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if len(t.activeStreams) == 1 {
select {
case t.awakenKeepalive <- struct{}{}:
t.framer.writePing(false, false, [8]byte{})
t.controlBuf.put(&ping{data: [8]byte{}})
// Fill the awakenKeepalive channel again as this channel must be
// kept non-writable except at the point that the keepalive()
// goroutine is waiting either to be awaken or shutdown.
@ -442,102 +498,13 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
default:
}
}
t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
})
t.mu.Unlock()
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
// because hpack.Encoder is stateful.
t.hBuf.Reset()
t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
if callHdr.SendCompress != "" {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
timeout := dl.Sub(time.Now())
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
for k, v := range authData {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
for k, v := range callAuthData {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
var (
endHeaders bool
)
if b := stats.OutgoingTags(ctx); b != nil {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
}
if b := stats.OutgoingTrace(ctx); b != nil {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
if isReservedHeader(k) {
continue
}
for _, v := range vv {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
first := true
bufLen := t.hBuf.Len()
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
size := t.hBuf.Len()
if size > http2MaxFrameLen {
size = http2MaxFrameLen
} else {
endHeaders = true
}
var flush bool
if callHdr.Flush && endHeaders {
flush = true
}
if first {
// Sends a HeadersFrame to server to start a new stream.
p := http2.HeadersFrameParam{
StreamID: s.id,
BlockFragment: t.hBuf.Next(size),
EndStream: false,
EndHeaders: endHeaders,
}
// Do a force flush for the buffered frames iff it is the last headers frame
// and there is header metadata to be sent. Otherwise, there is flushing until
// the corresponding data frame is written.
err = t.framer.writeHeaders(flush, p)
first = false
} else {
// Sends Continuation frames for the leftover headers.
err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
}
if err != nil {
t.notifyError(err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
s.mu.Lock()
s.bytesSent = true
s.mu.Unlock()
@ -545,7 +512,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if t.statsHandler != nil {
outHeader := &stats.OutHeader{
Client: true,
WireLength: bufLen,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
@ -553,7 +519,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
t.writableChan <- 0
return s, nil
}
@ -623,12 +588,9 @@ func (t *http2Client) Close() (err error) {
t.mu.Unlock()
return
}
if t.state == reachable || t.state == draining {
close(t.errorChan)
}
t.state = closing
t.mu.Unlock()
close(t.shutdownChan)
t.cancel()
err = t.conn.Close()
t.mu.Lock()
streams := t.activeStreams
@ -650,23 +612,18 @@ func (t *http2Client) Close() (err error) {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return
return err
}
// GracefulClose sets the state to draining, which prevents new streams from
// being created and causes the transport to be closed when the last active
// stream is closed. If there are no active streams, the transport is closed
// immediately. This does nothing if the transport is already draining or
// closing.
func (t *http2Client) GracefulClose() error {
t.mu.Lock()
switch t.state {
case unreachable:
// The server may close the connection concurrently. t is not available for
// any streams. Close it now.
t.mu.Unlock()
t.Close()
return nil
case closing:
t.mu.Unlock()
return nil
}
if t.state == draining {
case closing, draining:
t.mu.Unlock()
return nil
}
@ -681,32 +638,38 @@ func (t *http2Client) GracefulClose() error {
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
// if it improves the performance.
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
secondStart := http2MaxFrameLen - len(hdr)%http2MaxFrameLen
if len(data) < secondStart {
secondStart = len(data)
select {
case <-s.ctx.Done():
return ContextErr(s.ctx.Err())
case <-t.ctx.Done():
return ErrConnClosing
default:
}
hdr = append(hdr, data[:secondStart]...)
data = data[secondStart:]
isLastSlice := (len(data) == 0)
r := bytes.NewBuffer(hdr)
var (
p []byte
oqv uint32
)
for {
oqv = atomic.LoadUint32(&t.outQuotaVersion)
if r.Len() > 0 || p != nil {
if hdr == nil && data == nil && opts.Last {
// stream.CloseSend uses this to send an empty frame with endStream=True
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: true, f: func() {}})
return nil
}
// Add data to header frame so that we can equally distribute data across frames.
emptyLen := http2MaxFrameLen - len(hdr)
if emptyLen > len(data) {
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
for idx, r := range [][]byte{hdr, data} {
for len(r) > 0 {
size := http2MaxFrameLen
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
quotaChan, quotaVer := s.sendQuotaPool.acquireWithVersion()
sq, err := wait(s.ctx, t.ctx, s.done, s.goAway, quotaChan)
if err != nil {
return err
}
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
tq, err := wait(s.ctx, t.ctx, s.done, s.goAway, t.sendQuotaPool.acquire())
if err != nil {
return err
}
@ -716,93 +679,51 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
if tq < size {
size = tq
}
if p == nil {
p = r.Next(size)
if size > len(r) {
size = len(r)
}
p := r[:size]
ps := len(p)
if ps < sq {
// Overbooked stream quota. Return it back.
s.sendQuotaPool.add(sq - ps)
}
if ps < tq {
// Overbooked transport quota. Return it back.
t.sendQuotaPool.add(tq - ps)
}
}
var (
endStream bool
forceFlush bool
)
// Indicate there is a writer who is about to write a data frame.
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the transport.
if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok || err == io.EOF {
// Return the connection quota back.
t.sendQuotaPool.add(len(p))
}
if t.framer.adjustNumWriters(-1) == 0 {
// This writer is the last one in this batch and has the
// responsibility to flush the buffered frames. It queues
// a flush request to controlBuf instead of flushing directly
// in order to avoid the race with other writing or flushing.
t.controlBuf.put(&flushIO{})
}
return err
}
select {
case <-s.ctx.Done():
t.sendQuotaPool.add(len(p))
if t.framer.adjustNumWriters(-1) == 0 {
t.controlBuf.put(&flushIO{})
}
t.writableChan <- 0
return ContextErr(s.ctx.Err())
default:
}
if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
// InitialWindowSize settings frame must have been received after we
// acquired send quota but before we got the writable channel.
// We must forsake this write.
t.sendQuotaPool.add(len(p))
s.sendQuotaPool.add(len(p))
if t.framer.adjustNumWriters(-1) == 0 {
t.controlBuf.put(&flushIO{})
}
t.writableChan <- 0
continue
}
if r.Len() == 0 {
if isLastSlice {
if opts.Last {
endStream = true
// Acquire local send quota to be able to write to the controlBuf.
ltq, err := wait(s.ctx, t.ctx, s.done, s.goAway, s.localSendQuota.acquire())
if err != nil {
if _, ok := err.(ConnectionError); !ok {
t.sendQuotaPool.add(ps)
}
if t.framer.adjustNumWriters(0) == 1 {
// Do a force flush iff this is last frame for the entire gRPC message
// and the caller is the only writer at this moment.
forceFlush = true
}
} else {
isLastSlice = true
if len(data) != 0 {
r = bytes.NewBuffer(data)
return err
}
s.localSendQuota.add(ltq - ps) // It's ok if we make it negative.
var endStream bool
// See if this is the last frame to be written.
if opts.Last {
if len(r)-size == 0 { // No more data in r after this iteration.
if idx == 0 { // We're writing data header.
if len(data) == 0 { // There's no data to follow.
endStream = true
}
} else { // We're writing data.
endStream = true
}
}
}
}
// If WriteData fails, all the pending streams will be handled
// by http2Client.Close(). No explicit CloseStream() needs to be
// invoked.
if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
t.notifyError(err)
return connectionErrorf(true, err, "transport: %v", err)
}
p = nil
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
}
t.writableChan <- 0
if r.Len() == 0 {
break
success := func() {
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: endStream, d: p, f: func() { s.localSendQuota.add(ps) }})
if ps < sq {
s.sendQuotaPool.lockedAdd(sq - ps)
}
r = r[ps:]
}
failure := func() {
s.sendQuotaPool.lockedAdd(sq)
}
if !s.sendQuotaPool.compareAndExecute(quotaVer, success, failure) {
t.sendQuotaPool.add(ps)
s.localSendQuota.add(ps)
}
}
}
if !opts.Last {
@ -833,11 +754,11 @@ func (t *http2Client) adjustWindow(s *Stream, n uint32) {
return
}
if w := s.fc.maybeAdjust(n); w > 0 {
// Piggyback conneciton's window update along.
// Piggyback connection's window update along.
if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
t.controlBuf.put(&windowUpdate{0, cw})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
@ -852,9 +773,9 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
}
if w := s.fc.onRead(n); w > 0 {
if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
t.controlBuf.put(&windowUpdate{0, cw})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
@ -868,7 +789,7 @@ func (t *http2Client) updateFlowControl(n uint32) {
}
t.initialWindowSize = int32(n)
t.mu.Unlock()
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
t.controlBuf.put(&settings{
ack: false,
ss: []http2.Setting{
@ -898,15 +819,17 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// Furthermore, if a bdpPing is being sent out we can piggyback
// connection's window update for the bytes we just received.
if sendBDPPing {
t.controlBuf.put(&windowUpdate{0, uint32(size), false})
if size != 0 { // Could've been an empty data frame.
t.controlBuf.put(&windowUpdate{0, uint32(size)})
}
t.controlBuf.put(bdpPing)
} else {
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(connectionErrorf(true, err, "%v", err))
t.Close()
return
}
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w, true})
t.controlBuf.put(&windowUpdate{0, w})
}
}
// Select the right stream to dispatch.
@ -930,7 +853,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w, true})
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
s.mu.Unlock()
@ -1019,10 +942,10 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 != 1 {
t.mu.Unlock()
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
t.Close()
return
}
// A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
// A client can receive multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
// The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
// with the ID of the last stream the server will process.
// Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
@ -1033,7 +956,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
t.Close()
return
}
default:
@ -1177,22 +1100,22 @@ func handleMalformedHTTP2(s *Stream, err error) {
// TODO(zhaoq): Check the validity of the incoming frame sequence.
func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.readFrame()
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.notifyError(err)
t.Close()
return
}
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
t.Close()
return
}
t.handleSettings(sf)
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.readFrame()
frame, err := t.framer.fr.ReadFrame()
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil {
// Abort an active stream if the http2.Framer returns a
@ -1204,12 +1127,12 @@ func (t *http2Client) reader() {
t.mu.Unlock()
if s != nil {
// use error detail to provide better err message
handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()))
}
continue
} else {
// Transport error.
t.notifyError(err)
t.Close()
return
}
}
@ -1253,61 +1176,86 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
t.mu.Lock()
for _, stream := range t.activeStreams {
// Adjust the sending quota for each stream.
stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
}
t.streamSendQuota = s.Val
t.mu.Unlock()
atomic.AddUint32(&t.outQuotaVersion, 1)
}
}
}
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() {
for {
select {
case i := <-t.controlBuf.get():
t.controlBuf.load()
select {
case <-t.writableChan:
switch i := i.(type) {
case *windowUpdate:
t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)
t.applySettings(i.ss)
} else {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
// If the server needs to be to intimated about stream closing,
// then we need to make sure the RST_STREAM frame is written to
// the wire before the headers of the next stream waiting on
// streamQuota. We ensure this by adding to the streamsQuota pool
// only after having acquired the writableChan to send RST_STREAM.
t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
case *ping:
if !i.ack {
t.bdpEst.timesnap(i.data)
}
t.framer.writePing(true, i.ack, i.data)
default:
errorf("transport: http2Client.controller got unexpected item type %v\n", i)
}
t.writableChan <- 0
continue
case <-t.shutdownChan:
return
}
case <-t.shutdownChan:
return
// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
// is duplicated between the client and the server.
// The transport layer needs to be refactored to take care of this.
func (t *http2Client) itemHandler(i item) error {
var err error
switch i := i.(type) {
case *dataFrame:
err = t.framer.fr.WriteData(i.streamID, i.endStream, i.d)
if err == nil {
i.f()
}
case *headerFrame:
t.hBuf.Reset()
for _, f := range i.hf {
t.hEnc.WriteField(f)
}
endHeaders := false
first := true
for !endHeaders {
size := t.hBuf.Len()
if size > http2MaxFrameLen {
size = http2MaxFrameLen
} else {
endHeaders = true
}
if first {
first = false
err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
StreamID: i.streamID,
BlockFragment: t.hBuf.Next(size),
EndStream: i.endStream,
EndHeaders: endHeaders,
})
} else {
err = t.framer.fr.WriteContinuation(
i.streamID,
endHeaders,
t.hBuf.Next(size),
)
}
if err != nil {
return err
}
}
case *windowUpdate:
err = t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
case *settings:
if i.ack {
t.applySettings(i.ss)
err = t.framer.fr.WriteSettingsAck()
} else {
err = t.framer.fr.WriteSettings(i.ss...)
}
case *resetStream:
// If the server needs to be to intimated about stream closing,
// then we need to make sure the RST_STREAM frame is written to
// the wire before the headers of the next stream waiting on
// streamQuota. We ensure this by adding to the streamsQuota pool
// only after having acquired the writableChan to send RST_STREAM.
err = t.framer.fr.WriteRSTStream(i.streamID, i.code)
t.streamsQuota.add(1)
case *flushIO:
err = t.framer.writer.Flush()
case *ping:
if !i.ack {
t.bdpEst.timesnap(i.data)
}
err = t.framer.fr.WritePing(i.ack, i.data)
default:
errorf("transport: http2Client.controller got unexpected item type %v\n", i)
}
return err
}
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
@ -1331,7 +1279,7 @@ func (t *http2Client) keepalive() {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.shutdownChan:
case <-t.ctx.Done():
return
}
} else {
@ -1350,13 +1298,13 @@ func (t *http2Client) keepalive() {
}
t.Close()
return
case <-t.shutdownChan:
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
case <-t.shutdownChan:
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
@ -1366,25 +1314,9 @@ func (t *http2Client) keepalive() {
}
func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
return t.ctx.Done()
}
func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
func (t *http2Client) notifyError(err error) {
t.mu.Lock()
// make sure t.errorChan is closed only once.
if t.state == draining {
t.mu.Unlock()
t.Close()
return
}
if t.state == reachable {
t.state = unreachable
close(t.errorChan)
infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
}
t.mu.Unlock()
}

View file

@ -21,6 +21,7 @@ package transport
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"math/rand"
@ -51,23 +52,16 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
cancel context.CancelFunc
conn net.Conn
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by receiving a value on writableChan
// and releases it by sending on writableChan.
writableChan chan int
// shutdownChan is closed when Close is called.
// Blocking operations should select on shutdownChan to avoid
// blocking forever after Close.
shutdownChan chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
hEnc *hpack.Encoder // HPACK encoder
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
hEnc *hpack.Encoder // HPACK encoder
// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
@ -96,8 +90,6 @@ type http2Server struct {
initialWindowSize int32
bdpEst *bdpEstimator
outQuotaVersion uint32
mu sync.Mutex // guard the following
// drainChan is initialized when drain(...) is called the first time.
@ -112,7 +104,7 @@ type http2Server struct {
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
// idle is the time instant when the connection went idle.
// This is either the begining of the connection or when the number of
// This is either the beginning of the connection or when the number of
// RPCs go down to 0.
// When the connection is busy, this value is set to 0.
idle time.Time
@ -121,7 +113,15 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
framer := newFramer(conn)
writeBufSize := defaultWriteBufSize
if config.WriteBufferSize > 0 {
writeBufSize = config.WriteBufferSize
}
readBufSize := defaultReadBufSize
if config.ReadBufferSize > 0 {
readBufSize = config.ReadBufferSize
}
framer := newFramer(conn, writeBufSize, readBufSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
@ -151,12 +151,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
ID: http2.SettingInitialWindowSize,
Val: uint32(iwz)})
}
if err := framer.writeSettings(true, isettings...); err != nil {
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
@ -183,8 +183,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
kep.MinTime = defaultKeepalivePolicyMinTime
}
var buf bytes.Buffer
ctx, cancel := context.WithCancel(context.Background())
t := &http2Server{
ctx: context.Background(),
ctx: ctx,
cancel: cancel,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@ -198,8 +200,6 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
fc: &inFlow{limit: uint32(icwz)},
sendQuotaPool: newQuotaPool(defaultWindowSize),
state: reachable,
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
@ -222,9 +222,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
go t.controller()
t.framer.writer.Flush()
go func() {
loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
t.Close()
}()
go t.keepalive()
t.writableChan <- 0
return t, nil
}
@ -313,6 +316,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
t.maxStreamID = streamID
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
s.localSendQuota = newQuotaPool(defaultLocalSendQuota)
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
@ -366,7 +370,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
return
}
frame, err := t.framer.readFrame()
frame, err := t.framer.fr.ReadFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
@ -386,7 +390,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
t.handleSettings(sf)
for {
frame, err := t.framer.readFrame()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
@ -457,9 +461,9 @@ func (t *http2Server) adjustWindow(s *Stream, n uint32) {
}
if w := s.fc.maybeAdjust(n); w > 0 {
if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
t.controlBuf.put(&windowUpdate{0, cw})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
@ -474,9 +478,9 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
}
if w := s.fc.onRead(n); w > 0 {
if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
t.controlBuf.put(&windowUpdate{0, cw})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
@ -490,7 +494,7 @@ func (t *http2Server) updateFlowControl(n uint32) {
}
t.initialWindowSize = int32(n)
t.mu.Unlock()
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
t.controlBuf.put(&settings{
ack: false,
ss: []http2.Setting{
@ -521,7 +525,9 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// Furthermore, if a bdpPing is being sent out we can piggyback
// connection's window update for the bytes we just received.
if sendBDPPing {
t.controlBuf.put(&windowUpdate{0, uint32(size), false})
if size != 0 { // Could be an empty frame.
t.controlBuf.put(&windowUpdate{0, uint32(size)})
}
t.controlBuf.put(bdpPing)
} else {
if err := t.fc.onData(uint32(size)); err != nil {
@ -530,7 +536,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w, true})
t.controlBuf.put(&windowUpdate{0, w})
}
}
// Select the right stream to dispatch.
@ -552,7 +558,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w, true})
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
s.mu.Unlock()
@ -593,10 +599,23 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
ss = append(ss, s)
return nil
})
// The settings will be applied once the ack is sent.
t.controlBuf.put(&settings{ack: true, ss: ss})
}
func (t *http2Server) applySettings(ss []http2.Setting) {
for _, s := range ss {
if s.ID == http2.SettingInitialWindowSize {
t.mu.Lock()
for _, stream := range t.activeStreams {
stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
}
t.streamSendQuota = s.Val
t.mu.Unlock()
}
}
}
const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time.Hour
@ -634,7 +653,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
t.mu.Unlock()
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after atleast defaultPingTimeout.
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
@ -647,6 +666,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
errorf("transport: Got to too many pings from the client, closing the connection.")
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
@ -663,47 +683,16 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
}
}
func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
first := true
endHeaders := false
var err error
defer func() {
if err == nil {
// Reset ping strikes when seding headers since that might cause the
// peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
}()
// Sends the headers in a single batch.
for !endHeaders {
size := t.hBuf.Len()
if size > http2MaxFrameLen {
size = http2MaxFrameLen
} else {
endHeaders = true
}
if first {
p := http2.HeadersFrameParam{
StreamID: s.id,
BlockFragment: b.Next(size),
EndStream: endStream,
EndHeaders: endHeaders,
}
err = t.framer.writeHeaders(endHeaders, p)
first = false
} else {
err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
}
if err != nil {
t.Close()
return connectionErrorf(true, err, "transport: %v", err)
}
}
return nil
}
// WriteHeader sends the header metedata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
select {
case <-s.ctx.Done():
return ContextErr(s.ctx.Err())
case <-t.ctx.Done():
return ErrConnClosing
default:
}
s.mu.Lock()
if s.headerOk || s.state == streamDone {
s.mu.Unlock()
@ -719,14 +708,13 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
md = s.header
s.mu.Unlock()
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
// TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
if s.sendCompress != "" {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
for k, vv := range md {
if isReservedHeader(k) {
@ -734,20 +722,20 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
continue
}
for _, v := range vv {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
return err
}
t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
})
if t.stats != nil {
outHeader := &stats.OutHeader{
WireLength: bufLen,
//WireLength: // TODO(mmukhi): Revisit this later, if needed.
}
t.stats.HandleRPC(s.Context(), outHeader)
}
t.writableChan <- 0
return nil
}
@ -756,6 +744,12 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
select {
case <-t.ctx.Done():
return ErrConnClosing
default:
}
var headersSent, hasHeader bool
s.mu.Lock()
if s.state == streamDone {
@ -775,25 +769,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
headersSent = true
}
// Always write a status regardless of context cancellation unless the stream
// is terminated (e.g. by a RST_STREAM, GOAWAY, or transport error). The
// server's application code is already done so it is fine to ignore s.ctx.
select {
case <-t.shutdownChan:
return ErrConnClosing
case <-t.writableChan:
}
t.hBuf.Reset()
// TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !headersSent {
t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
}
t.hEnc.WriteField(
hpack.HeaderField{
Name: "grpc-status",
Value: strconv.Itoa(int(st.Code())),
})
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
if p := st.Proto(); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
@ -802,7 +786,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
panic(err)
}
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
// Attach the trailer metadata.
@ -812,36 +796,32 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
continue
}
for _, v := range vv {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
t.Close()
return err
}
t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
endStream: true,
})
if t.stats != nil {
outTrailer := &stats.OutTrailer{
WireLength: bufLen,
}
t.stats.HandleRPC(s.Context(), outTrailer)
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
t.closeStream(s)
t.writableChan <- 0
return nil
}
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) {
// TODO(zhaoq): Support multi-writers for a single stream.
secondStart := http2MaxFrameLen - len(hdr)%http2MaxFrameLen
if len(data) < secondStart {
secondStart = len(data)
select {
case <-s.ctx.Done():
return ContextErr(s.ctx.Err())
case <-t.ctx.Done():
return ErrConnClosing
default:
}
hdr = append(hdr, data[:secondStart]...)
data = data[secondStart:]
isLastSlice := (len(data) == 0)
var writeHeaderFrame bool
s.mu.Lock()
if s.state == streamDone {
@ -855,124 +835,74 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (
if writeHeaderFrame {
t.WriteHeader(s, nil)
}
r := bytes.NewBuffer(hdr)
var (
p []byte
oqv uint32
)
for {
if r.Len() == 0 && p == nil {
return nil
}
oqv = atomic.LoadUint32(&t.outQuotaVersion)
size := http2MaxFrameLen
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
return err
}
if sq < size {
size = sq
}
if tq < size {
size = tq
}
if p == nil {
p = r.Next(size)
}
ps := len(p)
if ps < sq {
// Overbooked stream quota. Return it back.
s.sendQuotaPool.add(sq - ps)
}
if ps < tq {
// Overbooked transport quota. Return it back.
t.sendQuotaPool.add(tq - ps)
}
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the
// transport.
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok {
// Return the connection quota back.
t.sendQuotaPool.add(ps)
// Add data to header frame so that we can equally distribute data across frames.
emptyLen := http2MaxFrameLen - len(hdr)
if emptyLen > len(data) {
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
for _, r := range [][]byte{hdr, data} {
for len(r) > 0 {
size := http2MaxFrameLen
// Wait until the stream has some quota to send the data.
quotaChan, quotaVer := s.sendQuotaPool.acquireWithVersion()
sq, err := wait(s.ctx, t.ctx, nil, nil, quotaChan)
if err != nil {
return err
}
if t.framer.adjustNumWriters(-1) == 0 {
// This writer is the last one in this batch and has the
// responsibility to flush the buffered frames. It queues
// a flush request to controlBuf instead of flushing directly
// in order to avoid the race with other writing or flushing.
t.controlBuf.put(&flushIO{})
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, t.ctx, nil, nil, t.sendQuotaPool.acquire())
if err != nil {
return err
}
return err
}
select {
case <-s.ctx.Done():
t.sendQuotaPool.add(ps)
if t.framer.adjustNumWriters(-1) == 0 {
t.controlBuf.put(&flushIO{})
if sq < size {
size = sq
}
t.writableChan <- 0
return ContextErr(s.ctx.Err())
default:
}
if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
// InitialWindowSize settings frame must have been received after we
// acquired send quota but before we got the writable channel.
// We must forsake this write.
t.sendQuotaPool.add(ps)
s.sendQuotaPool.add(ps)
if t.framer.adjustNumWriters(-1) == 0 {
t.controlBuf.put(&flushIO{})
if tq < size {
size = tq
}
t.writableChan <- 0
continue
}
var forceFlush bool
if r.Len() == 0 {
if isLastSlice {
if t.framer.adjustNumWriters(0) == 1 && !opts.Last {
forceFlush = true
if size > len(r) {
size = len(r)
}
p := r[:size]
ps := len(p)
if ps < tq {
// Overbooked transport quota. Return it back.
t.sendQuotaPool.add(tq - ps)
}
// Acquire local send quota to be able to write to the controlBuf.
ltq, err := wait(s.ctx, t.ctx, nil, nil, s.localSendQuota.acquire())
if err != nil {
if _, ok := err.(ConnectionError); !ok {
t.sendQuotaPool.add(ps)
}
} else {
r = bytes.NewBuffer(data)
isLastSlice = true
return err
}
s.localSendQuota.add(ltq - ps) // It's ok we make this negative.
// Reset ping strikes when sending data since this might cause
// the peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
success := func() {
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
s.localSendQuota.add(ps)
}})
if ps < sq {
// Overbooked stream quota. Return it back.
s.sendQuotaPool.lockedAdd(sq - ps)
}
r = r[ps:]
}
failure := func() {
s.sendQuotaPool.lockedAdd(sq)
}
if !s.sendQuotaPool.compareAndExecute(quotaVer, success, failure) {
t.sendQuotaPool.add(ps)
s.localSendQuota.add(ps)
}
}
// Reset ping strikes when sending data since this might cause
// the peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
t.Close()
return connectionErrorf(true, err, "transport: %v", err)
}
p = nil
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
}
t.writableChan <- 0
}
}
func (t *http2Server) applySettings(ss []http2.Setting) {
for _, s := range ss {
if s.ID == http2.SettingInitialWindowSize {
t.mu.Lock()
defer t.mu.Unlock()
for _, stream := range t.activeStreams {
stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
}
t.streamSendQuota = s.Val
atomic.AddUint32(&t.outQuotaVersion, 1)
}
}
return nil
}
// keepalive running in a separate goroutine does the following:
@ -988,7 +918,7 @@ func (t *http2Server) keepalive() {
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
keepalive := time.NewTimer(t.kp.Time)
// NOTE: All exit paths of this function should reset their
// respecitve timers. A failure to do so will cause the
// respective timers. A failure to do so will cause the
// following clean-up to deadlock and eventually leak.
defer func() {
if !maxIdle.Stop() {
@ -1031,7 +961,7 @@ func (t *http2Server) keepalive() {
t.Close()
// Reseting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.shutdownChan:
case <-t.ctx.Done():
}
return
case <-keepalive.C:
@ -1049,7 +979,7 @@ func (t *http2Server) keepalive() {
pingSent = true
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.shutdownChan:
case <-t.ctx.Done():
return
}
}
@ -1057,93 +987,129 @@ func (t *http2Server) keepalive() {
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
for {
select {
case i := <-t.controlBuf.get():
t.controlBuf.load()
// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
// is duplicated between the client and the server.
// The transport layer needs to be refactored to take care of this.
func (t *http2Server) itemHandler(i item) error {
switch i := i.(type) {
case *dataFrame:
if err := t.framer.fr.WriteData(i.streamID, i.endStream, i.d); err != nil {
return err
}
i.f()
return nil
case *headerFrame:
t.hBuf.Reset()
for _, f := range i.hf {
t.hEnc.WriteField(f)
}
first := true
endHeaders := false
for !endHeaders {
size := t.hBuf.Len()
if size > http2MaxFrameLen {
size = http2MaxFrameLen
} else {
endHeaders = true
}
var err error
if first {
first = false
err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
StreamID: i.streamID,
BlockFragment: t.hBuf.Next(size),
EndStream: i.endStream,
EndHeaders: endHeaders,
})
} else {
err = t.framer.fr.WriteContinuation(
i.streamID,
endHeaders,
t.hBuf.Next(size),
)
}
if err != nil {
return err
}
}
atomic.StoreUint32(&t.resetPingStrikes, 1)
return nil
case *windowUpdate:
return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
case *settings:
if i.ack {
t.applySettings(i.ss)
return t.framer.fr.WriteSettingsAck()
}
return t.framer.fr.WriteSettings(i.ss...)
case *resetStream:
return t.framer.fr.WriteRSTStream(i.streamID, i.code)
case *goAway:
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
// The transport is closing.
return fmt.Errorf("transport: Connection closing")
}
sid := t.maxStreamID
if !i.headsUp {
// Stop accepting more streams now.
t.state = draining
t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, i.code, i.debugData); err != nil {
return err
}
if i.closeConn {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return fmt.Errorf("transport: Connection closing")
}
return nil
}
t.mu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
return err
}
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
return err
}
go func() {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-t.writableChan:
switch i := i.(type) {
case *windowUpdate:
t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)
t.applySettings(i.ss)
} else {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
case *goAway:
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
// The transport is closing.
return
}
sid := t.maxStreamID
if !i.headsUp {
// Stop accepting more streams now.
t.state = draining
activeStreams := len(t.activeStreams)
t.mu.Unlock()
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.closeConn || activeStreams == 0 {
// Abruptly close the connection following the GoAway.
t.Close()
}
t.writableChan <- 0
continue
}
t.mu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
t.framer.writePing(true, false, goAwayPing.data)
go func() {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-t.drainChan:
case <-timer.C:
case <-t.shutdownChan:
return
}
t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
}()
case *flushIO:
t.framer.flushWrite()
case *ping:
if !i.ack {
t.bdpEst.timesnap(i.data)
}
t.framer.writePing(true, i.ack, i.data)
default:
errorf("transport: http2Server.controller got unexpected item type %v\n", i)
}
t.writableChan <- 0
continue
case <-t.shutdownChan:
case <-t.drainChan:
case <-timer.C:
case <-t.ctx.Done():
return
}
case <-t.shutdownChan:
return
t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
}()
return nil
case *flushIO:
return t.framer.writer.Flush()
case *ping:
if !i.ack {
t.bdpEst.timesnap(i.data)
}
return t.framer.fr.WritePing(i.ack, i.data)
default:
err := status.Errorf(codes.Internal, "transport: http2Server.controller got unexpected item type %t", i)
errorf("%v", err)
return err
}
}
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func (t *http2Server) Close() (err error) {
func (t *http2Server) Close() error {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
@ -1153,8 +1119,8 @@ func (t *http2Server) Close() (err error) {
streams := t.activeStreams
t.activeStreams = nil
t.mu.Unlock()
close(t.shutdownChan)
err = t.conn.Close()
t.cancel()
err := t.conn.Close()
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@ -1163,7 +1129,7 @@ func (t *http2Server) Close() (err error) {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
}
return
return err
}
// closeStream clears the footprint of a stream when the stream is not needed

View file

@ -28,7 +28,6 @@ import (
"net/http"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
@ -45,7 +44,8 @@ const (
// http://http2.github.io/http2-spec/#SettingValues
http2InitHeaderTableSize = 4096
// http2IOBufSize specifies the buffer size for sending frames.
http2IOBufSize = 32 * 1024
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
var (
@ -475,10 +475,10 @@ type framer struct {
fr *http2.Framer
}
func newFramer(conn net.Conn) *framer {
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
f := &framer{
reader: bufio.NewReaderSize(conn, http2IOBufSize),
writer: bufio.NewWriterSize(conn, http2IOBufSize),
reader: bufio.NewReaderSize(conn, readBufferSize),
writer: bufio.NewWriterSize(conn, writeBufferSize),
}
f.fr = http2.NewFramer(f.writer, f.reader)
// Opt-in to Frame reuse API on framer to reduce garbage.
@ -487,132 +487,3 @@ func newFramer(conn net.Conn) *framer {
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}
func (f *framer) adjustNumWriters(i int32) int32 {
return atomic.AddInt32(&f.numWriters, i)
}
// The following writeXXX functions can only be called when the caller gets
// unblocked from writableChan channel (i.e., owns the privilege to write).
func (f *framer) writeContinuation(forceFlush bool, streamID uint32, endHeaders bool, headerBlockFragment []byte) error {
if err := f.fr.WriteContinuation(streamID, endHeaders, headerBlockFragment); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writeData(forceFlush bool, streamID uint32, endStream bool, data []byte) error {
if err := f.fr.WriteData(streamID, endStream, data); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writeGoAway(forceFlush bool, maxStreamID uint32, code http2.ErrCode, debugData []byte) error {
if err := f.fr.WriteGoAway(maxStreamID, code, debugData); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writeHeaders(forceFlush bool, p http2.HeadersFrameParam) error {
if err := f.fr.WriteHeaders(p); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writePing(forceFlush, ack bool, data [8]byte) error {
if err := f.fr.WritePing(ack, data); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writePriority(forceFlush bool, streamID uint32, p http2.PriorityParam) error {
if err := f.fr.WritePriority(streamID, p); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writePushPromise(forceFlush bool, p http2.PushPromiseParam) error {
if err := f.fr.WritePushPromise(p); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writeRSTStream(forceFlush bool, streamID uint32, code http2.ErrCode) error {
if err := f.fr.WriteRSTStream(streamID, code); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writeSettings(forceFlush bool, settings ...http2.Setting) error {
if err := f.fr.WriteSettings(settings...); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writeSettingsAck(forceFlush bool) error {
if err := f.fr.WriteSettingsAck(); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) writeWindowUpdate(forceFlush bool, streamID, incr uint32) error {
if err := f.fr.WriteWindowUpdate(streamID, incr); err != nil {
return err
}
if forceFlush {
return f.writer.Flush()
}
return nil
}
func (f *framer) flushWrite() error {
return f.writer.Flush()
}
func (f *framer) readFrame() (http2.Frame, error) {
return f.fr.ReadFrame()
}
func (f *framer) errorDetail() error {
return f.fr.ErrorDetail()
}

View file

@ -21,10 +21,12 @@
package transport // import "google.golang.org/grpc/transport"
import (
stdctx "context"
"fmt"
"io"
"net"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/http2"
@ -67,20 +69,20 @@ func newRecvBuffer() *recvBuffer {
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 {
select {
case b.c <- r:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r)
b.mu.Unlock()
}
func (b *recvBuffer) load() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
@ -89,6 +91,7 @@ func (b *recvBuffer) load() {
default:
}
}
b.mu.Unlock()
}
// get returns the channel that receives a recvMsg in the buffer.
@ -164,20 +167,20 @@ func newControlBuffer() *controlBuffer {
func (b *controlBuffer) put(r item) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 {
select {
case b.c <- r:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r)
b.mu.Unlock()
}
func (b *controlBuffer) load() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
@ -186,6 +189,7 @@ func (b *controlBuffer) load() {
default:
}
}
b.mu.Unlock()
}
// get returns the channel that receives an item in the buffer.
@ -235,7 +239,8 @@ type Stream struct {
// is used to adjust flow control, if need be.
requestRead func(int)
sendQuotaPool *quotaPool
sendQuotaPool *quotaPool
localSendQuota *quotaPool
// Close headerChan to indicate the end of reception of header metadata.
headerChan chan struct{}
// header caches the received header metadata.
@ -313,8 +318,9 @@ func (s *Stream) Header() (metadata.MD, error) {
// side only.
func (s *Stream) Trailer() metadata.MD {
s.mu.RLock()
defer s.mu.RUnlock()
return s.trailer.Copy()
c := s.trailer.Copy()
s.mu.RUnlock()
return c
}
// ServerTransport returns the underlying ServerTransport for the stream.
@ -342,14 +348,16 @@ func (s *Stream) Status() *status.Status {
// Server side only.
func (s *Stream) SetHeader(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.headerOk || s.state == streamDone {
s.mu.Unlock()
return ErrIllegalHeaderWrite
}
if md.Len() == 0 {
s.mu.Unlock()
return nil
}
s.header = metadata.Join(s.header, md)
s.mu.Unlock()
return nil
}
@ -360,8 +368,8 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.trailer = metadata.Join(s.trailer, md)
s.mu.Unlock()
return nil
}
@ -412,15 +420,17 @@ func (s *Stream) finish(st *status.Status) {
// BytesSent indicates whether any bytes have been sent on this stream.
func (s *Stream) BytesSent() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.bytesSent
bs := s.bytesSent
s.mu.Unlock()
return bs
}
// BytesReceived indicates whether any bytes have been received on this stream.
func (s *Stream) BytesReceived() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.bytesReceived
br := s.bytesReceived
s.mu.Unlock()
return br
}
// GoString is implemented by Stream so context.String() won't
@ -449,7 +459,6 @@ type transportState int
const (
reachable transportState = iota
unreachable
closing
draining
)
@ -464,6 +473,8 @@ type ServerConfig struct {
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@ -491,10 +502,14 @@ type ConnectOptions struct {
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// InitialWindowSize sets the intial window size for a stream.
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the intial window size for a connection.
// InitialConnWindowSize sets the initial window size for a connection.
InitialConnWindowSize int32
// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
}
// TargetInfo contains the information of the target such as network address and metadata.
@ -505,8 +520,8 @@ type TargetInfo struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
return newHTTP2Client(ctx, target, opts)
func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, timeout time.Duration) (ClientTransport, error) {
return newHTTP2Client(ctx, target, opts, timeout)
}
// Options provides additional hints and information for message
@ -518,7 +533,7 @@ type Options struct {
// Delay is a hint to the transport implementation for whether
// the data could be buffered for a batching write. The
// Transport implementation may ignore the hint.
// transport implementation may ignore the hint.
Delay bool
}
@ -688,34 +703,33 @@ func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
// wait blocks until it can receive from ctx.Done, closing, or proceed.
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
// it return the StreamError for ctx.Err.
// If it receives from goAway, it returns 0, ErrStreamDrain.
// If it receives from closing, it returns 0, ErrConnClosing.
// If it receives from proceed, it returns the received integer, nil.
func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
// wait blocks until it can receive from one of the provided contexts or channels
func wait(ctx, tctx context.Context, done, goAway <-chan struct{}, proceed <-chan int) (int, error) {
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
case <-done:
// User cancellation has precedence.
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
default:
}
return 0, io.EOF
case <-goAway:
return 0, ErrStreamDrain
case <-closing:
case <-tctx.Done():
return 0, ErrConnClosing
case i := <-proceed:
return i, nil
}
}
// ContextErr converts the error from context package into a StreamError.
func ContextErr(err error) StreamError {
switch err {
case context.DeadlineExceeded, stdctx.DeadlineExceeded:
return streamErrorf(codes.DeadlineExceeded, "%v", err)
case context.Canceled, stdctx.Canceled:
return streamErrorf(codes.Canceled, "%v", err)
}
return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
}
// GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8
@ -725,6 +739,39 @@ const (
// NoReason is the default value when GoAway frame is received.
NoReason GoAwayReason = 1
// TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
// was recieved and that the debug data said "too_many_pings".
// was received and that the debug data said "too_many_pings".
TooManyPings GoAwayReason = 2
)
// loopyWriter is run in a separate go routine. It is the single code path that will
// write data on wire.
func loopyWriter(ctx context.Context, cbuf *controlBuffer, handler func(item) error) {
for {
select {
case i := <-cbuf.get():
cbuf.load()
if err := handler(i); err != nil {
return
}
case <-ctx.Done():
return
}
hasData:
for {
select {
case i := <-cbuf.get():
cbuf.load()
if err := handler(i); err != nil {
return
}
case <-ctx.Done():
return
default:
if err := handler(&flushIO{}); err != nil {
return
}
break hasData
}
}
}
}

View file

@ -49,6 +49,7 @@ type server struct {
startedErr chan error // error (or nil) with server start value
mu sync.Mutex
conns map[ServerTransport]bool
h *testStreamHandler
}
var (
@ -60,7 +61,8 @@ var (
)
type testStreamHandler struct {
t *http2Server
t *http2Server
notify chan struct{}
}
type hType int
@ -68,6 +70,7 @@ type hType int
const (
normal hType = iota
suspended
notifyCall
misbehaved
encodingRequiredStatus
invalidHeaderField
@ -76,6 +79,19 @@ const (
pingpong
)
func (h *testStreamHandler) handleStreamAndNotify(s *Stream) {
if h.notify == nil {
return
}
go func() {
select {
case <-h.notify:
default:
close(h.notify)
}
}()
}
func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) {
req := expectedRequest
resp := expectedResponse
@ -92,7 +108,7 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) {
t.Fatalf("handleStream got %v, want %v", p, req)
}
// send a response back to the client.
h.t.Write(s, resp, nil, &Options{})
h.t.Write(s, nil, resp, &Options{})
// send the trailer to end the stream.
h.t.WriteStatus(s, status.New(codes.OK, ""))
}
@ -112,7 +128,7 @@ func (h *testStreamHandler) handleStreamPingPong(t *testing.T, s *Stream) {
buf[0] = byte(0)
binary.BigEndian.PutUint32(buf[1:], uint32(sz))
copy(buf[5:], msg)
h.t.Write(s, buf, nil, &Options{})
h.t.Write(s, nil, buf, &Options{})
}
}
@ -124,7 +140,6 @@ func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *Stream) {
var sent int
p := make([]byte, http2MaxFrameLen)
for sent < initialWindowSize {
<-conn.writableChan
n := initialWindowSize - sent
// The last message may be smaller than http2MaxFrameLen
if n <= http2MaxFrameLen {
@ -137,11 +152,7 @@ func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *Stream) {
p = make([]byte, n+1)
}
}
if err := conn.framer.writeData(true, s.id, false, p); err != nil {
conn.writableChan <- 0
break
}
conn.writableChan <- 0
conn.controlBuf.put(&dataFrame{s.id, false, p, func() {}})
sent += len(p)
}
}
@ -152,13 +163,13 @@ func (h *testStreamHandler) handleStreamEncodingRequiredStatus(t *testing.T, s *
}
func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stream) {
<-h.t.writableChan
h.t.hBuf.Reset()
h.t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField})
if err := h.t.writeHeaders(s, h.t.hBuf, false); err != nil {
t.Fatalf("Failed to write headers: %v", err)
}
h.t.writableChan <- 0
headerFields := []hpack.HeaderField{}
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField})
h.t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
})
}
func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) {
@ -183,7 +194,7 @@ func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) {
t.Fatalf("handleStream got %v, want %v", p, req)
}
// send a response back to the client.
h.t.Write(s, resp, nil, &Options{})
h.t.Write(s, nil, resp, &Options{})
// send the trailer to end the stream.
h.t.WriteStatus(s, status.New(codes.OK, ""))
}
@ -208,7 +219,7 @@ func (h *testStreamHandler) handleStreamDelayWrite(t *testing.T, s *Stream) {
// Wait before sending. Give time to client to start reading
// before server starts sending.
time.Sleep(2 * time.Second)
h.t.Write(s, resp, nil, &Options{})
h.t.Write(s, nil, resp, &Options{})
// send the trailer to end the stream.
h.t.WriteStatus(s, status.New(codes.OK, ""))
}
@ -249,9 +260,15 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT
return
}
s.conns[transport] = true
h := &testStreamHandler{t: transport.(*http2Server)}
s.h = h
s.mu.Unlock()
h := &testStreamHandler{transport.(*http2Server)}
switch ht {
case notifyCall:
go transport.HandleStreams(h.handleStreamAndNotify,
func(ctx context.Context, _ string) context.Context {
return ctx
})
case suspended:
go transport.HandleStreams(func(*Stream) {}, // Do nothing to handle the stream.
func(ctx context.Context, method string) context.Context {
@ -340,7 +357,7 @@ func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hTy
target := TargetInfo{
Addr: addr,
}
ct, connErr = NewClientTransport(context.Background(), target, copts)
ct, connErr = NewClientTransport(context.Background(), target, copts, 2*time.Second)
if connErr != nil {
t.Fatalf("failed to create transport: %v", connErr)
}
@ -363,7 +380,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
}
done <- conn
}()
tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts)
tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, 2*time.Second)
if err != nil {
// Server clean-up.
lis.Close()
@ -801,7 +818,7 @@ func TestClientSendAndReceive(t *testing.T) {
Last: true,
Delay: false,
}
if err := ct.Write(s1, expectedRequest, nil, &opts); err != nil && err != io.EOF {
if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("failed to send data: %v", err)
}
p := make([]byte, len(expectedResponse))
@ -838,7 +855,7 @@ func performOneRPC(ct ClientTransport) {
Last: true,
Delay: false,
}
if err := ct.Write(s, expectedRequest, nil, &opts); err == nil || err == io.EOF {
if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF {
time.Sleep(5 * time.Millisecond)
// The following s.Recv()'s could error out because the
// underlying transport is gone.
@ -882,7 +899,7 @@ func TestLargeMessage(t *testing.T) {
if err != nil {
t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
}
if err := ct.Write(s, expectedRequestLarge, nil, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
}
p := make([]byte, len(expectedResponseLarge))
@ -914,7 +931,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
if err != nil {
t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
}
if err := ct.Write(s, expectedRequestLarge, nil, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
}
p := make([]byte, len(expectedResponseLarge))
@ -952,7 +969,7 @@ func TestLargeMessageDelayWrite(t *testing.T) {
// Give time to server to start reading before client starts sending.
time.Sleep(2 * time.Second)
if err := ct.Write(s, expectedRequestLarge, nil, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
}
p := make([]byte, len(expectedResponseLarge))
@ -998,7 +1015,7 @@ func TestGracefulClose(t *testing.T) {
Delay: false,
}
// The stream which was created before graceful close can still proceed.
if err := ct.Write(s, expectedRequest, nil, &opts); err != nil && err != io.EOF {
if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
}
p := make([]byte, len(expectedResponse))
@ -1028,7 +1045,7 @@ func TestLargeMessageSuspension(t *testing.T) {
}
// Write should not be done successfully due to flow control.
msg := make([]byte, initialWindowSize*8)
err = ct.Write(s, msg, nil, &Options{Last: true, Delay: false})
err = ct.Write(s, nil, msg, &Options{Last: true, Delay: false})
expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
if err != expectedErr {
t.Fatalf("Write got %v, want %v", err, expectedErr)
@ -1150,12 +1167,7 @@ func TestServerContextCanceledOnClosedConnection(t *testing.T) {
if err != nil {
t.Fatalf("Failed to open stream: %v", err)
}
// Make sure the headers frame is flushed out.
<-cc.writableChan
if err = cc.framer.writeData(true, s.id, false, make([]byte, http2MaxFrameLen)); err != nil {
t.Fatalf("Failed to write data: %v", err)
}
cc.writableChan <- 0
cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, http2MaxFrameLen), func() {}})
// Loop until the server side stream is created.
var ss *Stream
for {
@ -1186,7 +1198,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client := setUpWithOptions(t, 0, &ServerConfig{}, suspended, connectOptions)
server, client := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions)
defer server.stop()
defer client.Close()
@ -1205,66 +1217,56 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
for k := range server.conns {
st = k.(*http2Server)
}
notifyChan := make(chan struct{})
server.h.notify = notifyChan
server.mu.Unlock()
cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Client failed to create first stream. Err: %v", err)
}
<-notifyChan
var sstream1 *Stream
// Access stream on the server.
waitWhileTrue(t, func() (bool, error) {
st.mu.Lock()
defer st.mu.Unlock()
if len(st.activeStreams) != 1 {
return true, fmt.Errorf("timed-out while waiting for server to have created a stream")
}
for _, v := range st.activeStreams {
st.mu.Lock()
for _, v := range st.activeStreams {
if v.id == cstream1.id {
sstream1 = v
}
return false, nil
})
}
st.mu.Unlock()
if sstream1 == nil {
t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream1.id)
}
// Exhaust client's connection window.
<-st.writableChan
if err := st.framer.writeData(true, sstream1.id, true, make([]byte, defaultWindowSize)); err != nil {
st.writableChan <- 0
if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
t.Fatalf("Server failed to write data. Err: %v", err)
}
st.writableChan <- 0
notifyChan = make(chan struct{})
server.mu.Lock()
server.h.notify = notifyChan
server.mu.Unlock()
// Create another stream on client.
cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Client failed to create second stream. Err: %v", err)
}
<-notifyChan
var sstream2 *Stream
waitWhileTrue(t, func() (bool, error) {
st.mu.Lock()
defer st.mu.Unlock()
if len(st.activeStreams) != 2 {
return true, fmt.Errorf("timed-out while waiting for server to have created the second stream")
st.mu.Lock()
for _, v := range st.activeStreams {
if v.id == cstream2.id {
sstream2 = v
}
for _, v := range st.activeStreams {
if v.id == cstream2.id {
sstream2 = v
}
}
if sstream2 == nil {
return true, fmt.Errorf("didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
}
return false, nil
})
}
st.mu.Unlock()
if sstream2 == nil {
t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
}
// Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream.
<-st.writableChan
if err := st.framer.writeData(true, sstream2.id, true, make([]byte, defaultWindowSize)); err != nil {
st.writableChan <- 0
if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
t.Fatalf("Server failed to write data. Err: %v", err)
}
st.writableChan <- 0
// Client should be able to read data on second stream.
if _, err := cstream2.Read(make([]byte, defaultWindowSize)); err != nil {
@ -1305,7 +1307,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
t.Fatalf("Failed to create 1st stream. Err: %v", err)
}
// Exhaust server's connection window.
if err := client.Write(cstream1, make([]byte, defaultWindowSize), nil, &Options{Last: true}); err != nil {
if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
//Client should be able to create another stream and send data on it.
@ -1313,7 +1315,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create 2nd stream. Err: %v", err)
}
if err := client.Write(cstream2, make([]byte, defaultWindowSize), nil, &Options{}); err != nil {
if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), &Options{}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
// Get the streams on server.
@ -1336,11 +1338,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
st.mu.Unlock()
// Trying to write more on a max-ed out stream should result in a RST_STREAM from the server.
ct := client.(*http2Client)
<-ct.writableChan
if err := ct.framer.writeData(true, cstream2.id, true, make([]byte, 1)); err != nil {
t.Fatalf("Client failed to write. Err: %v", err)
}
ct.writableChan <- 0
ct.controlBuf.put(&dataFrame{cstream2.id, true, make([]byte, 1), func() {}})
code := http2ErrConvTab[http2.ErrCodeFlowControl]
waitWhileTrue(t, func() (bool, error) {
cstream2.mu.Lock()
@ -1397,11 +1395,7 @@ func TestServerWithMisbehavedClient(t *testing.T) {
}
var sent int
// Drain the stream flow control window
<-cc.writableChan
if err = cc.framer.writeData(true, s.id, false, make([]byte, http2MaxFrameLen)); err != nil {
t.Fatalf("Failed to write data: %v", err)
}
cc.writableChan <- 0
cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, http2MaxFrameLen), func() {}})
sent += http2MaxFrameLen
// Wait until the server creates the corresponding stream and receive some data.
var ss *Stream
@ -1426,11 +1420,7 @@ func TestServerWithMisbehavedClient(t *testing.T) {
}
// Keep sending until the server inbound window is drained for that stream.
for sent <= initialWindowSize {
<-cc.writableChan
if err = cc.framer.writeData(true, s.id, false, make([]byte, 1)); err != nil {
t.Fatalf("Failed to write data: %v", err)
}
cc.writableChan <- 0
cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, 1), func() {}})
sent++
}
// Server sent a resetStream for s already.
@ -1468,7 +1458,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
t.Fatalf("Failed to open stream: %v", err)
}
d := make([]byte, 1)
if err := ct.Write(s, d, nil, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
if err := ct.Write(s, nil, d, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
t.Fatalf("Failed to write: %v", err)
}
// Read without window update.
@ -1510,7 +1500,7 @@ func TestEncodingRequiredStatus(t *testing.T) {
Last: true,
Delay: false,
}
if err := ct.Write(s, expectedRequest, nil, &opts); err != nil && err != io.EOF {
if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("Failed to write the request: %v", err)
}
p := make([]byte, http2MaxFrameLen)
@ -1538,7 +1528,7 @@ func TestInvalidHeaderField(t *testing.T) {
Last: true,
Delay: false,
}
if err := ct.Write(s, expectedRequest, nil, &opts); err != nil && err != io.EOF {
if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("Failed to write the request: %v", err)
}
p := make([]byte, http2MaxFrameLen)
@ -1690,7 +1680,7 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
serverSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire())
serverSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
if err != nil {
t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
}
@ -1712,7 +1702,7 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
t.Fatalf("Client transport flow control window size is %v, want %v", limit, connectOptions.InitialConnWindowSize)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire())
clientSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
if err != nil {
t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
}
@ -1781,7 +1771,7 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
opts := Options{}
header := make([]byte, 5)
for i := 1; i <= 10; i++ {
if err := ct.Write(cstream, buf, nil, &opts); err != nil {
if err := ct.Write(cstream, nil, buf, &opts); err != nil {
t.Fatalf("Error on client while writing message: %v", err)
}
if _, err := cstream.Read(header); err != nil {
@ -1848,7 +1838,7 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
// Check flow conrtrol window on client stream is equal to out flow on server stream.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
serverStreamSendQuota, err := wait(ctx, nil, nil, nil, sstream.sendQuotaPool.acquire())
serverStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, sstream.sendQuotaPool.acquire())
cancel()
if err != nil {
return true, fmt.Errorf("error while acquiring server stream send quota. Err: %v", err)
@ -1863,7 +1853,7 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
// Check flow control window on server stream is equal to out flow on client stream.
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
clientStreamSendQuota, err := wait(ctx, nil, nil, nil, cstream.sendQuotaPool.acquire())
clientStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, cstream.sendQuotaPool.acquire())
cancel()
if err != nil {
return true, fmt.Errorf("error while acquiring client stream send quota. Err: %v", err)
@ -1878,7 +1868,7 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
// Check flow control window on client transport is equal to out flow of server transport.
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
serverTrSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire())
serverTrSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
cancel()
if err != nil {
return true, fmt.Errorf("error while acquring server transport send quota. Err: %v", err)
@ -1893,7 +1883,7 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
// Check flow control window on server transport is equal to out flow of client transport.
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
clientTrSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire())
clientTrSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
cancel()
if err != nil {
return true, fmt.Errorf("error while acquiring client transport send quota. Err: %v", err)
@ -1943,15 +1933,12 @@ func writeOneHeader(framer *http2.Framer, sid uint32, httpStatus int) error {
var buf bytes.Buffer
henc := hpack.NewEncoder(&buf)
henc.WriteField(hpack.HeaderField{Name: ":status", Value: fmt.Sprint(httpStatus)})
if err := framer.WriteHeaders(http2.HeadersFrameParam{
return framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: sid,
BlockFragment: buf.Bytes(),
EndStream: true,
EndHeaders: true,
}); err != nil {
return err
}
return nil
})
}
func writeTwoHeaders(framer *http2.Framer, sid uint32, httpStatus int) error {
@ -1973,15 +1960,12 @@ func writeTwoHeaders(framer *http2.Framer, sid uint32, httpStatus int) error {
Name: ":status",
Value: fmt.Sprint(httpStatus),
})
if err := framer.WriteHeaders(http2.HeadersFrameParam{
return framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: sid,
BlockFragment: buf.Bytes(),
EndStream: true,
EndHeaders: true,
}); err != nil {
return err
}
return nil
})
}
type httpServer struct {
@ -2005,8 +1989,8 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) {
t.Errorf("Error at server-side while reading preface from cleint. Err: %v", err)
return
}
reader := bufio.NewReaderSize(s.conn, http2IOBufSize)
writer := bufio.NewWriterSize(s.conn, http2IOBufSize)
reader := bufio.NewReaderSize(s.conn, defaultWriteBufSize)
writer := bufio.NewWriterSize(s.conn, defaultReadBufSize)
framer := http2.NewFramer(writer, reader)
if err = framer.WriteSettingsAck(); err != nil {
t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
@ -2071,7 +2055,7 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream
wh: wh,
}
server.start(t, lis)
client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{})
client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, 2*time.Second)
if err != nil {
t.Fatalf("Error creating client. Err: %v", err)
}