Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 4131c30

Browse files
committed
receiver/opencensus: multiplex application/grpc+proto for gRPC based receiver
The registered grpc-gateway HTTP/2 content-type header field was "application/grpc" but some clients in the wild send "application/grpc+proto" and as per the gRPC spec, it is a descriptive content length that should be handled by Proto (un)marshalers. This change removed the use of (*grpc.Server).Stop() which painfully took 20+s even without the "-race" go start option. Sure, that method adds some sort of internal shutdown mechanism but it is very painful to wait for 20+s for a server with zero connections to stop yet all we need to do is stop the actual net.Listener by (net.Listener).Stop() This change also adds a test which should work in a perfect world. However, we don't have control over writes on the wire with the gRPC connection hence our expectations for results to assert against are blissfully optimistic and will fail most of the time as those writes take sometime. The test has been included but skipped until we can figure out what's going on. In the meantime though, users should be able to use the OpenCensus receiver without trouble. Also internally in the receivers, we've got data bundling which is controlled separately despite options. Fixes #365 Fixes #366
1 parent f34bc81 commit 4131c30

2 files changed

Lines changed: 112 additions & 5 deletions

File tree

receiver/opencensus/opencensus.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,16 @@ func (ocr *Receiver) Stop() error {
189189

190190
var err = errAlreadyStopped
191191
ocr.stopOnce.Do(func() {
192-
// TODO: (@odeke-em) should we instead do (*grpc.Server).GracefulStop?
193-
ocr.serverGRPC.Stop()
194192
_ = ocr.serverHTTP.Close()
193+
195194
_ = ocr.ln.Close()
195+
196+
// TODO: @(odeke-em) investigate what utility invoking (*grpc.Server).Stop()
197+
// gives us yet we invoke (net.Listener).Close().
198+
// Sure (*grpc.Server).Stop() enables proper shutdown but imposes
199+
// a painful and artificial wait time that goes into 20+seconds yet most of our
200+
// tests and code should be reactive in less than even 1second.
201+
// ocr.serverGRPC.Stop()
196202
})
197203
return err
198204
}
@@ -231,7 +237,10 @@ func (ocr *Receiver) startServer() error {
231237

232238
// Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port.
233239
m := cmux.New(ocr.ln)
234-
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
240+
grpcL := m.Match(
241+
cmux.HTTP2HeaderField("content-type", "application/grpc"),
242+
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
243+
235244
httpL := m.Match(cmux.Any())
236245
go func() {
237246
errChan <- ocr.serverGRPC.Serve(grpcL)

receiver/opencensus/opencensus_test.go

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@ import (
2424
"testing"
2525
"time"
2626

27+
"google.golang.org/grpc"
28+
"google.golang.org/grpc/metadata"
29+
2730
"github.com/census-instrumentation/opencensus-service/receiver/opencensus"
31+
"github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace"
2832
"github.com/census-instrumentation/opencensus-service/receiver/testhelper"
2933

3034
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
3135
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
3236
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
3337
"github.com/census-instrumentation/opencensus-service/internal"
34-
"github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace"
3538
)
3639

3740
func TestGrpcGateway_endToEnd(t *testing.T) {
@@ -153,7 +156,7 @@ func TestGrpcGatewayCors_endToEnd(t *testing.T) {
153156
if err != nil {
154157
t.Fatalf("Failed to create trace receiver: %v", err)
155158
}
156-
defer ocr.StopTraceReception(context.Background())
159+
defer ocr.Stop()
157160

158161
sink := new(testhelper.ConcurrentSpanSink)
159162
go func() {
@@ -174,6 +177,101 @@ func TestGrpcGatewayCors_endToEnd(t *testing.T) {
174177
verifyCorsResp(t, url, "disallowed-origin.com", 200, false)
175178
}
176179

180+
// As per Issue https://github.com/census-instrumentation/opencensus-service/issues/366
181+
// the agent's mux should be able to accept all Proto affiliated content-types and not
182+
// redirect them to the web-grpc-gateway endpoint.
183+
func TestAcceptAllGRPCProtoAffiliatedContentTypes(t *testing.T) {
184+
t.Skip("Currently a flaky test as we need a way to flush all written traces")
185+
186+
addr := ":35991"
187+
ocr, err := opencensus.New(addr, opencensus.WithTraceReceiverOptions(octrace.WithSpanBufferCount(1)))
188+
if err != nil {
189+
t.Fatalf("Failed to create trace receiver: %v", err)
190+
}
191+
192+
cbts := new(testhelper.ConcurrentSpanSink)
193+
if err := ocr.StartTraceReception(context.Background(), cbts); err != nil {
194+
t.Fatalf("Failed to start the trace receiver: %v", err)
195+
}
196+
defer ocr.Stop()
197+
198+
// Now start the client with the various Proto affiliated gRPC Content-SubTypes as per:
199+
// https://godoc.org/google.golang.org/grpc#CallContentSubtype
200+
protoAffiliatedContentSubTypes := []string{"", "proto"}
201+
for _, subContentType := range protoAffiliatedContentSubTypes {
202+
if err := runContentTypeTests(addr, asSubContentType, subContentType); err != nil {
203+
t.Errorf("%q subContentType failed to send proto: %v", subContentType, err)
204+
}
205+
}
206+
207+
// Now start the client with the various Proto affiliated gRPC Content-Types,
208+
// as we encountered in https://github.com/census-instrumentation/opencensus-service/issues/366
209+
protoAffiliatedContentTypes := []string{"application/grpc", "application/grpc+proto"}
210+
for _, contentType := range protoAffiliatedContentTypes {
211+
if err := runContentTypeTests(addr, asContentType, contentType); err != nil {
212+
t.Errorf("%q Content-type failed to send proto: %v", contentType, err)
213+
}
214+
}
215+
216+
// Before we exit we have to verify that we got exactly 4 TraceService requests.
217+
wantLen := len(protoAffiliatedContentSubTypes) + len(protoAffiliatedContentTypes)
218+
gotReqs := cbts.AllTraces()
219+
if len(gotReqs) != wantLen {
220+
t.Errorf("Receiver ExportTraceServiceRequest length mismatch:: Got %d Want %d", len(gotReqs), wantLen)
221+
}
222+
}
223+
224+
const (
225+
asSubContentType = true
226+
asContentType = false
227+
)
228+
229+
func runContentTypeTests(addr string, contentTypeDesignation bool, contentType string) error {
230+
opts := []grpc.DialOption{
231+
grpc.WithInsecure(),
232+
grpc.WithBlock(),
233+
grpc.WithDisableRetry(),
234+
}
235+
236+
if contentTypeDesignation == asContentType {
237+
opts = append(opts, grpc.WithDefaultCallOptions(
238+
grpc.Header(&metadata.MD{"Content-Type": []string{contentType}})))
239+
} else {
240+
opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallContentSubtype(contentType)))
241+
}
242+
243+
cc, err := grpc.Dial(addr, opts...)
244+
if err != nil {
245+
return fmt.Errorf("Creating grpc.ClientConn: %v", err)
246+
}
247+
defer cc.Close()
248+
249+
// First step is to send the Node.
250+
acc := agenttracepb.NewTraceServiceClient(cc)
251+
252+
stream, err := acc.Export(context.Background())
253+
if err != nil {
254+
return fmt.Errorf("Initializing the export stream: %v", err)
255+
}
256+
257+
msg := &agenttracepb.ExportTraceServiceRequest{
258+
Node: &commonpb.Node{
259+
Attributes: map[string]string{
260+
"sub-type": contentType,
261+
},
262+
},
263+
Spans: []*tracepb.Span{
264+
{
265+
TraceId: []byte{
266+
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
267+
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0X10,
268+
},
269+
},
270+
},
271+
}
272+
return stream.Send(msg)
273+
}
274+
177275
func verifyCorsResp(t *testing.T, url string, origin string, wantStatus int, wantAllowed bool) {
178276
req, err := http.NewRequest("OPTIONS", url, nil)
179277
if err != nil {

0 commit comments

Comments
 (0)