Skip to content

Commit 55109c4

Browse files
[AUTO-CHERRYPICK] Upgrade moby-compose to version 2.17.3 to address multiple CVEs - branch main (#8091)
Co-authored-by: Sam Meluch <109628994+sameluch@users.noreply.github.com>
1 parent c596f71 commit 55109c4

7 files changed

Lines changed: 679 additions & 6 deletions
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
From 4ef370839829840761e091b510c9e7e7b5fea022 Mon Sep 17 00:00:00 2001
2+
From: Sam Meluch <sammeluch@microsoft.com>
3+
Date: Thu, 22 Feb 2024 14:49:41 -0800
4+
Subject: [PATCH 2/3] Change server stream context handling
5+
6+
---
7+
.../grpc/internal/transport/handler_server.go | 2 +-
8+
.../grpc/internal/transport/http2_server.go | 7 +-
9+
.../grpc/internal/transport/transport.go | 2 +-
10+
vendor/google.golang.org/grpc/server.go | 116 ++++++++----------
11+
4 files changed, 56 insertions(+), 71 deletions(-)
12+
13+
diff --git a/google.golang.org/grpc/internal/transport/handler_server.go b/google.golang.org/grpc/internal/transport/handler_server.go
14+
index 0901209..3af9b4a 100644
15+
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
16+
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
17+
@@ -326,7 +326,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
18+
return err
19+
}
20+
21+
-func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
22+
+func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
23+
// With this transport type there will be exactly 1 stream: this HTTP request.
24+
25+
ctx := ht.req.Context()
26+
diff --git a/google.golang.org/grpc/internal/transport/http2_server.go b/google.golang.org/grpc/internal/transport/http2_server.go
27+
index 3dd1564..94d441c 100644
28+
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
29+
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
30+
@@ -345,7 +345,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
31+
}
32+
33+
// operateHeader takes action on the decoded headers.
34+
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
35+
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (fatal bool) {
36+
// Acquire max stream ID lock for entire duration
37+
t.maxStreamMu.Lock()
38+
defer t.maxStreamMu.Unlock()
39+
@@ -565,7 +565,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
40+
s.requestRead = func(n int) {
41+
t.adjustWindow(s, uint32(n))
42+
}
43+
- s.ctx = traceCtx(s.ctx, s.method)
44+
for _, sh := range t.stats {
45+
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
46+
inHeader := &stats.InHeader{
47+
@@ -603,7 +602,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
48+
// HandleStreams receives incoming streams using the given handler. This is
49+
// typically run in a separate goroutine.
50+
// traceCtx attaches trace to ctx and returns the new context.
51+
-func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
52+
+func (t *http2Server) HandleStreams(handle func(*Stream)) {
53+
defer close(t.readerDone)
54+
for {
55+
t.controlBuf.throttle()
56+
@@ -641,7 +640,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
57+
}
58+
switch frame := frame.(type) {
59+
case *http2.MetaHeadersFrame:
60+
- if t.operateHeaders(frame, handle, traceCtx) {
61+
+ if t.operateHeaders(frame, handle){
62+
t.Close()
63+
break
64+
}
65+
diff --git a/google.golang.org/grpc/internal/transport/transport.go b/google.golang.org/grpc/internal/transport/transport.go
66+
index 6c3ba85..992fc25 100644
67+
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
68+
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
69+
@@ -674,7 +674,7 @@ type ClientTransport interface {
70+
// Write methods for a given Stream will be called serially.
71+
type ServerTransport interface {
72+
// HandleStreams receives incoming streams using the given handler.
73+
- HandleStreams(func(*Stream), func(context.Context, string) context.Context)
74+
+ HandleStreams(func(*Stream))
75+
76+
// WriteHeader sends the header metadata for the given stream.
77+
// WriteHeader may not be called on all streams.
78+
diff --git a/google.golang.org/grpc/server.go b/google.golang.org/grpc/server.go
79+
index b8f9b5e..5bccd48 100644
80+
--- a/vendor/google.golang.org/grpc/server.go
81+
+++ b/vendor/google.golang.org/grpc/server.go
82+
@@ -577,7 +577,7 @@ func (s *Server) serverWorker() {
83+
84+
func (s *Server) handleSingleStream(data *serverWorkerData) {
85+
defer data.wg.Done()
86+
- s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
87+
+ s.handleStream(data.st, data.stream)
88+
}
89+
90+
// initServerWorkers creates worker goroutines and a channel to process incoming
91+
@@ -955,14 +955,8 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
92+
}
93+
go func() {
94+
defer wg.Done()
95+
- s.handleStream(st, stream, s.traceInfo(st, stream))
96+
+ s.handleStream(st, stream)
97+
}()
98+
- }, func(ctx context.Context, method string) context.Context {
99+
- if !EnableTracing {
100+
- return ctx
101+
- }
102+
- tr := trace.New("grpc.Recv."+methodFamily(method), method)
103+
- return trace.NewContext(ctx, tr)
104+
})
105+
wg.Wait()
106+
}
107+
@@ -1010,30 +1004,6 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
108+
s.serveStreams(st)
109+
}
110+
111+
-// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
112+
-// If tracing is not enabled, it returns nil.
113+
-func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
114+
- if !EnableTracing {
115+
- return nil
116+
- }
117+
- tr, ok := trace.FromContext(stream.Context())
118+
- if !ok {
119+
- return nil
120+
- }
121+
-
122+
- trInfo = &traceInfo{
123+
- tr: tr,
124+
- firstLine: firstLine{
125+
- client: false,
126+
- remoteAddr: st.RemoteAddr(),
127+
- },
128+
- }
129+
- if dl, ok := stream.Context().Deadline(); ok {
130+
- trInfo.firstLine.deadline = time.Until(dl)
131+
- }
132+
- return trInfo
133+
-}
134+
-
135+
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
136+
s.mu.Lock()
137+
defer s.mu.Unlock()
138+
@@ -1094,7 +1064,7 @@ func (s *Server) incrCallsFailed() {
139+
atomic.AddInt64(&s.czData.callsFailed, 1)
140+
}
141+
142+
-func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
143+
+func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
144+
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
145+
if err != nil {
146+
channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
147+
@@ -1113,7 +1083,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
148+
err = t.Write(stream, hdr, payload, opts)
149+
if err == nil {
150+
for _, sh := range s.opts.statsHandlers {
151+
- sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
152+
+ sh.HandleRPC(ctx, outPayload(false, msg, data, payload, time.Now()))
153+
}
154+
}
155+
return err
156+
@@ -1160,7 +1130,7 @@ func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerIn
157+
}
158+
}
159+
160+
-func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
161+
+func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
162+
shs := s.opts.statsHandlers
163+
if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
164+
if channelz.IsOn() {
165+
@@ -1174,7 +1144,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
166+
IsClientStream: false,
167+
IsServerStream: false,
168+
}
169+
- sh.HandleRPC(stream.Context(), statsBegin)
170+
+ sh.HandleRPC(ctx, statsBegin)
171+
}
172+
if trInfo != nil {
173+
trInfo.tr.LazyLog(&trInfo.firstLine, false)
174+
@@ -1206,7 +1176,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
175+
if err != nil && err != io.EOF {
176+
end.Error = toRPCErr(err)
177+
}
178+
- sh.HandleRPC(stream.Context(), end)
179+
+ sh.HandleRPC(ctx, end)
180+
}
181+
182+
if channelz.IsOn() {
183+
@@ -1228,7 +1198,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
184+
}
185+
}
186+
if len(binlogs) != 0 {
187+
- ctx := stream.Context()
188+
md, _ := metadata.FromIncomingContext(ctx)
189+
logEntry := &binarylog.ClientHeader{
190+
Header: md,
191+
@@ -1307,7 +1276,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
192+
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
193+
}
194+
for _, sh := range shs {
195+
- sh.HandleRPC(stream.Context(), &stats.InPayload{
196+
+ sh.HandleRPC(ctx, &stats.InPayload{
197+
RecvTime: time.Now(),
198+
Payload: v,
199+
WireLength: payInfo.wireLength + headerLen,
200+
@@ -1328,7 +1297,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
201+
}
202+
return nil
203+
}
204+
- ctx := NewContextWithServerTransportStream(stream.Context(), stream)
205+
+ ctx = NewContextWithServerTransportStream(ctx, stream)
206+
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
207+
if appErr != nil {
208+
appStatus, ok := status.FromError(appErr)
209+
@@ -1371,7 +1340,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
210+
}
211+
opts := &transport.Options{Last: true}
212+
213+
- if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
214+
+ if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil {
215+
if err == io.EOF {
216+
// The entire stream is done (for unary RPC only).
217+
return err
218+
@@ -1480,7 +1449,7 @@ func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServe
219+
}
220+
}
221+
222+
-func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
223+
+func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
224+
if channelz.IsOn() {
225+
s.incrCallsStarted()
226+
}
227+
@@ -1494,10 +1463,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
228+
IsServerStream: sd.ServerStreams,
229+
}
230+
for _, sh := range shs {
231+
- sh.HandleRPC(stream.Context(), statsBegin)
232+
+ sh.HandleRPC(ctx, statsBegin)
233+
}
234+
}
235+
- ctx := NewContextWithServerTransportStream(stream.Context(), stream)
236+
+ ctx = NewContextWithServerTransportStream(ctx, stream)
237+
ss := &serverStream{
238+
ctx: ctx,
239+
t: t,
240+
@@ -1533,7 +1502,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
241+
end.Error = toRPCErr(err)
242+
}
243+
for _, sh := range shs {
244+
- sh.HandleRPC(stream.Context(), end)
245+
+ sh.HandleRPC(ctx, end)
246+
}
247+
}
248+
249+
@@ -1672,27 +1641,44 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
250+
return err
251+
}
252+
253+
-func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
254+
+func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
255+
+ ctx := stream.Context()
256+
+ var ti *traceInfo
257+
+ if EnableTracing {
258+
+ tr := trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
259+
+ ctx = trace.NewContext(ctx, tr)
260+
+ ti = &traceInfo{
261+
+ tr: tr,
262+
+ firstLine: firstLine{
263+
+ client: false,
264+
+ remoteAddr: t.RemoteAddr(),
265+
+ },
266+
+ }
267+
+ if dl, ok := ctx.Deadline(); ok {
268+
+ ti.firstLine.deadline = time.Until(dl)
269+
+ }
270+
+ }
271+
+
272+
sm := stream.Method()
273+
if sm != "" && sm[0] == '/' {
274+
sm = sm[1:]
275+
}
276+
pos := strings.LastIndex(sm, "/")
277+
if pos == -1 {
278+
- if trInfo != nil {
279+
- trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
280+
- trInfo.tr.SetError()
281+
+ if ti != nil {
282+
+ ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
283+
+ ti.tr.SetError()
284+
}
285+
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
286+
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
287+
- if trInfo != nil {
288+
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
289+
- trInfo.tr.SetError()
290+
+ if ti != nil {
291+
+ ti.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
292+
+ ti.tr.SetError()
293+
}
294+
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
295+
}
296+
- if trInfo != nil {
297+
- trInfo.tr.Finish()
298+
+ if ti != nil {
299+
+ ti.tr.Finish()
300+
}
301+
return
302+
}
303+
@@ -1702,17 +1688,17 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
304+
srv, knownService := s.services[service]
305+
if knownService {
306+
if md, ok := srv.methods[method]; ok {
307+
- s.processUnaryRPC(t, stream, srv, md, trInfo)
308+
+ s.processUnaryRPC(ctx, t, stream, srv, md, ti)
309+
return
310+
}
311+
if sd, ok := srv.streams[method]; ok {
312+
- s.processStreamingRPC(t, stream, srv, sd, trInfo)
313+
+ s.processStreamingRPC(ctx, t, stream, srv, sd, ti)
314+
return
315+
}
316+
}
317+
// Unknown service, or known server unknown method.
318+
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
319+
- s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
320+
+ s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti)
321+
return
322+
}
323+
var errDesc string
324+
@@ -1721,19 +1707,19 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
325+
} else {
326+
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
327+
}
328+
- if trInfo != nil {
329+
- trInfo.tr.LazyPrintf("%s", errDesc)
330+
- trInfo.tr.SetError()
331+
+ if ti != nil {
332+
+ ti.tr.LazyPrintf("%s", errDesc)
333+
+ ti.tr.SetError()
334+
}
335+
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
336+
- if trInfo != nil {
337+
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
338+
- trInfo.tr.SetError()
339+
+ if ti != nil {
340+
+ ti.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
341+
+ ti.tr.SetError()
342+
}
343+
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
344+
}
345+
- if trInfo != nil {
346+
- trInfo.tr.Finish()
347+
+ if ti != nil {
348+
+ ti.tr.Finish()
349+
}
350+
}
351+
352+
--
353+
2.34.1
354+

SPECS/moby-compose/generate_source_tarball.sh

100644100755
File mode changed.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"Signatures": {
3-
"moby-compose-2.17.2-govendor-v1.tar.gz": "439fded1938c7dfc8d18a4750e8d240a559763b17ef967734aa7c44570092993",
4-
"moby-compose-2.17.2.tar.gz": "d6e6de858ecdb0104991c86c66dde5dd4fb6a1160d707308d8ad3167450c8094"
3+
"moby-compose-2.17.3-govendor-v1.tar.gz": "8abc1f732e9ac9a0843c1c7edf2a0dcd23f7805b859a0e3059bc2f5d4edbe3c8",
4+
"moby-compose-2.17.3.tar.gz": "e5e9bdfc3a827240381b656da88f92b408ea2e203c3f8cfd9e0bbfe03f825f16"
55
}
66
}

0 commit comments

Comments
 (0)