Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 10953d1

Browse files
author
Paulo Janotti
authored
Add zpages to collector (#541)
* Add zpages to collector Move zpages to its own type and share the usage of that type between agent and collector. * Paying coverage tax * Added very basic end-to-end coverage for collector * A new test for zpagesserver: port already in use * Improve collector test synchronization * Make TestApplication_Start resilient to port conflicts There are other tests in the code base that can suffer with port conflicts but I didn't wat to go overboard and chase them right now.
1 parent 98b8f9c commit 10953d1

File tree

7 files changed

+268
-44
lines changed

7 files changed

+268
-44
lines changed

cmd/ocagent/main.go

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@ import (
2020
"context"
2121
"fmt"
2222
"log"
23-
"net"
24-
"net/http"
2523
"os"
2624
"os/signal"
2725
"syscall"
2826

2927
"github.com/spf13/cobra"
3028
"github.com/spf13/viper"
3129
"go.opencensus.io/stats/view"
32-
"go.opencensus.io/zpages"
3330
"go.uber.org/zap"
3431
"go.uber.org/zap/zapcore"
3532

@@ -38,6 +35,7 @@ import (
3835
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
3936
"github.com/census-instrumentation/opencensus-service/internal/pprofserver"
4037
"github.com/census-instrumentation/opencensus-service/internal/version"
38+
"github.com/census-instrumentation/opencensus-service/internal/zpagesserver"
4139
"github.com/census-instrumentation/opencensus-service/observability"
4240
"github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
4341
"github.com/census-instrumentation/opencensus-service/receiver/jaegerreceiver"
@@ -132,7 +130,11 @@ func runOCAgent() {
132130
// If zPages are enabled, run them
133131
zPagesPort, zPagesEnabled := agentConfig.ZPagesPort()
134132
if zPagesEnabled {
135-
zCloseFn := runZPages(zPagesPort)
133+
zCloseFn, err := zpagesserver.Run(asyncErrorChan, zPagesPort)
134+
if err != nil {
135+
log.Fatal(err)
136+
}
137+
log.Printf("Running zPages on port %d", zPagesPort)
136138
closeFns = append(closeFns, zCloseFn)
137139
}
138140

@@ -202,28 +204,6 @@ func runOCAgent() {
202204
}
203205
}
204206

205-
func runZPages(port int) func() error {
206-
// And enable zPages too
207-
zPagesMux := http.NewServeMux()
208-
zpages.Handle(zPagesMux, "/debug")
209-
210-
addr := fmt.Sprintf(":%d", port)
211-
ln, err := net.Listen("tcp", addr)
212-
if err != nil {
213-
log.Fatalf("Failed to bind to run zPages on %q: %v", addr, err)
214-
}
215-
216-
srv := http.Server{Handler: zPagesMux}
217-
go func() {
218-
log.Printf("Running zPages at %q", addr)
219-
if err := srv.Serve(ln); err != nil && err != http.ErrServerClosed {
220-
log.Fatalf("Failed to serve zPages: %v", err)
221-
}
222-
}()
223-
224-
return srv.Close
225-
}
226-
227207
func runOCReceiver(logger *zap.Logger, acfg *config.Config, tc consumer.TraceConsumer, mc consumer.MetricsConsumer, asyncErrorChan chan<- error) (doneFn func() error, err error) {
228208
tlsCredsOption, hasTLSCreds, err := acfg.OpenCensusReceiverTLSCredentialsServerOption()
229209
if err != nil {

cmd/occollector/app/collector/collector.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/census-instrumentation/opencensus-service/consumer"
3333
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
3434
"github.com/census-instrumentation/opencensus-service/internal/pprofserver"
35+
"github.com/census-instrumentation/opencensus-service/internal/zpagesserver"
3536
"github.com/census-instrumentation/opencensus-service/receiver"
3637
)
3738

@@ -47,11 +48,16 @@ type Application struct {
4748
healthCheck *healthcheck.HealthCheck
4849
processor consumer.TraceConsumer
4950
receivers []receiver.TraceReceiver
51+
// stopTestChan is used to terminate the application in end to end tests.
52+
stopTestChan chan struct{}
53+
// readyChan is used in tests to indicate that the application is ready.
54+
readyChan chan struct{}
5055
}
5156

5257
func newApp() *Application {
5358
return &Application{
54-
v: viper.New(),
59+
v: viper.New(),
60+
readyChan: make(chan struct{}),
5561
}
5662
}
5763

@@ -89,6 +95,20 @@ func (app *Application) execute() {
8995
var closeFns []func()
9096
app.processor, closeFns = startProcessor(app.v, app.logger)
9197

98+
zpagesPort := app.v.GetInt(zpagesserver.ZPagesHTTPPort)
99+
if zpagesPort > 0 {
100+
closeZPages, err := zpagesserver.Run(asyncErrorChannel, zpagesPort)
101+
if err != nil {
102+
app.logger.Error("Failed to run zPages", zap.Error(err))
103+
os.Exit(1)
104+
}
105+
app.logger.Info("Running zPages", zap.Int("port", zpagesPort))
106+
closeFn := func() {
107+
closeZPages()
108+
}
109+
closeFns = append(closeFns, closeFn)
110+
}
111+
92112
app.receivers = createReceivers(app.v, app.logger, app.processor, asyncErrorChannel)
93113

94114
err = initTelemetry(asyncErrorChannel, app.v, app.logger)
@@ -103,11 +123,18 @@ func (app *Application) execute() {
103123
// mark service as ready to receive traffic.
104124
app.healthCheck.Ready()
105125

126+
// set the channel to stop testing.
127+
app.stopTestChan = make(chan struct{})
128+
// notify tests that it is ready.
129+
close(app.readyChan)
130+
106131
select {
107132
case err = <-asyncErrorChannel:
108133
app.logger.Error("Asynchronous error received, terminating process", zap.Error(err))
109134
case s := <-signalsChannel:
110135
app.logger.Info("Received signal from OS", zap.String("signal", s.String()))
136+
case <-app.stopTestChan:
137+
app.logger.Info("Received stop test request")
111138
}
112139

113140
app.healthCheck.Set(healthcheck.Unavailable)
@@ -144,6 +171,7 @@ func (app *Application) Start() error {
144171
healthCheckFlags,
145172
loggerFlags,
146173
pprofserver.AddFlags,
174+
zpagesserver.AddFlags,
147175
)
148176

149177
return rootCmd.Execute()
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package collector handles the command-line, configuration, and runs the OC collector.
16+
package collector
17+
18+
import (
19+
"net"
20+
"net/http"
21+
"testing"
22+
23+
"github.com/census-instrumentation/opencensus-service/internal/zpagesserver"
24+
)
25+
26+
func TestApplication_Start(t *testing.T) {
27+
portArg := []string{
28+
healthCheckHTTPPort, // Keep it as first since its address is used later.
29+
zpagesserver.ZPagesHTTPPort,
30+
"metrics-port",
31+
"receivers.opencensus.port",
32+
}
33+
addresses := getMultipleAvailableLocalAddresses(t, uint(len(portArg)))
34+
for i, addr := range addresses {
35+
_, port, err := net.SplitHostPort(addr)
36+
if err != nil {
37+
t.Fatalf("failed to split host and port from %q: %v", addr, err)
38+
}
39+
App.v.Set(portArg[i], port)
40+
}
41+
42+
// Without exporters the collector will start and just shutdown, no error is expected.
43+
App.v.Set("logging-exporter", true)
44+
45+
appDone := make(chan struct{})
46+
go func() {
47+
defer close(appDone)
48+
if err := App.Start(); err != nil {
49+
t.Fatalf("App.Start() got %v, want nil", err)
50+
}
51+
}()
52+
53+
<-App.readyChan
54+
if !isAppAvailable(t, "http://"+addresses[0]) {
55+
t.Fatalf("App didn't reach ready state")
56+
}
57+
close(App.stopTestChan)
58+
<-appDone
59+
}
60+
61+
// isAppAvailable checks if the healthcheck server at the given endpoint is
62+
// returning `available`.
63+
func isAppAvailable(t *testing.T, healthCheckEndPoint string) bool {
64+
client := &http.Client{}
65+
resp, err := client.Get(healthCheckEndPoint)
66+
if err != nil {
67+
t.Fatalf("failed to get a response from health probe: %v", err)
68+
}
69+
defer resp.Body.Close()
70+
return resp.StatusCode == http.StatusNoContent
71+
}
72+
73+
func getMultipleAvailableLocalAddresses(t *testing.T, numAddresses uint) []string {
74+
addresses := make([]string, numAddresses, numAddresses)
75+
for i := uint(0); i < numAddresses; i++ {
76+
addresses[i] = getAvailableLocalAddress(t)
77+
}
78+
return addresses
79+
}
80+
81+
func getAvailableLocalAddress(t *testing.T) string {
82+
ln, err := net.Listen("tcp", ":0")
83+
if err != nil {
84+
t.Fatalf("failed to get a free local port: %v", err)
85+
}
86+
// There is a possible race if something else takes this same port before
87+
// the test uses it, however, that is unlikely in practice.
88+
defer ln.Close()
89+
return ln.Addr().String()
90+
}

cmd/occollector/app/collector/empty_test.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

demos/trace/docker-compose.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ services:
2222
- ./oc-collector-config.yaml:/etc/oc-collector-config.yaml
2323
ports:
2424
- "55678"
25+
- "55680:55679"
2526
- "1777:1777"
2627
- "8888:8888" # Prometheus metrics
2728
depends_on:
@@ -38,6 +39,7 @@ services:
3839
- "1888:1888"
3940
- "14268"
4041
- "55678"
42+
- "55679:55679"
4143
depends_on:
4244
- oc-collector
4345

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package zpagesserver
16+
17+
import (
18+
"flag"
19+
"fmt"
20+
"net"
21+
"net/http"
22+
23+
"go.opencensus.io/zpages"
24+
)
25+
26+
const (
27+
// ZPagesHTTPPort is the name of the flag used to specify the zpages port.
28+
ZPagesHTTPPort = "zpages-http-port"
29+
)
30+
31+
// AddFlags adds to the flag set a flag to configure the zpages server.
32+
func AddFlags(flags *flag.FlagSet) {
33+
flags.Uint(
34+
ZPagesHTTPPort,
35+
55679,
36+
"Port on which to run the zpages http server, use 0 to disable zpages.")
37+
}
38+
39+
// Run run a zPages HTTP endpoint on the given port.
40+
func Run(asyncErrorChannel chan<- error, port int) (closeFn func() error, err error) {
41+
zPagesMux := http.NewServeMux()
42+
zpages.Handle(zPagesMux, "/debug")
43+
44+
addr := fmt.Sprintf(":%d", port)
45+
ln, err := net.Listen("tcp", addr)
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to bind to run zPages on %q: %v", addr, err)
48+
}
49+
50+
srv := http.Server{Handler: zPagesMux}
51+
go func() {
52+
if err := srv.Serve(ln); err != nil && err != http.ErrServerClosed {
53+
asyncErrorChannel <- fmt.Errorf("failed to server zPages: %v", err)
54+
}
55+
}()
56+
57+
return srv.Close, nil
58+
}

0 commit comments

Comments
 (0)