diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index b274cee9d0..1e4557fc88 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -37,6 +37,13 @@ import ( type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error +// maxConsensusMsgSize caps the wire size of an incoming QBFTConsensusMsg, well below +// the 128MB default p2p frame limit. A legitimate message carries at most a handful of +// small justification sub-messages (bounded in handle) plus its values, the largest of +// which is a single block proposal (a few MB on mainnet); 32MB leaves ample margin while +// bounding the receive/decode/allocation cost a malicious peer can inflict per message. +const maxConsensusMsgSize = 32 * 1024 * 1024 // 32 MB. + var supportedCompareDuties = []core.DutyType{core.DutyAttester} // newDefinition returns a qbft definition (this is constant across all consensus instances). @@ -365,7 +372,7 @@ func (c *Consensus) SubscribePriority(fn func(ctx context.Context, duty core.Dut func (c *Consensus) Start(ctx context.Context) { p2p.RegisterHandler("qbft", c.p2pNode, protocols.QBFTv2ProtocolID, func() proto.Message { return new(pbv1.QBFTConsensusMsg) }, - c.handle) + c.handle, p2p.WithReadLimit(maxConsensusMsgSize)) go func() { for { @@ -678,7 +685,18 @@ func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (p return nil, false, errors.New("invalid duty", z.Any("duty", duty)) } + if err := verifyMsgLimits(pbMsg, len(c.pubkeys)); err != nil { + return nil, false, err + } + for _, justification := range pbMsg.GetJustification() { + // Bail out as soon as the receive deadline fires rather than burning the full + // CPU budget on signature recovery for every justification in a large message. + if ctx.Err() != nil { + return nil, false, errors.Wrap(ctx.Err(), "receive cancelled during justification verification", + z.Any("duty", duty), z.Any("after", time.Since(t0))) + } + if err := verifyMsg(justification, c.pubkeys); err != nil { return nil, false, errors.Wrap(err, "invalid justification") } @@ -776,6 +794,29 @@ func (c *Consensus) getPeerIdx() (int64, error) { return peerIdx, nil } +// verifyMsgLimits bounds the justification and value counts of a consensus message +// before any expensive per-element work (each justification requires an ECDSA +// signature recovery, each value a proto unmarshal + hash). Without it a single +// authenticated peer could pack one large message with many sub-messages to exhaust +// CPU/memory on every peer (amplification DoS). +func verifyMsgLimits(pbMsg *pbv1.QBFTConsensusMsg, nodes int) error { + // A legitimate justification set contains at most a quorum of ROUND-CHANGE plus a + // quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded above by 2*nodes. + maxJust := 2 * nodes + if n := len(pbMsg.GetJustification()); n > maxJust { + return errors.New("too many justifications", z.Int("count", n), z.Int("max", maxJust)) + } + + // Each message (the main message plus each justification) references at most two + // values (value and prepared value), so the values are bounded by 2*(justifications+1). + maxValues := 2 * (len(pbMsg.GetJustification()) + 1) + if n := len(pbMsg.GetValues()); n > maxValues { + return errors.New("too many values", z.Int("count", n), z.Int("max", maxValues)) + } + + return nil +} + func verifyMsg(msg *pbv1.QBFTMsg, pubkeys map[int64]*k1.PublicKey) error { if msg == nil || msg.GetDuty() == nil { return errors.New("invalid consensus message") diff --git a/core/consensus/qbft/qbft_internal_test.go b/core/consensus/qbft/qbft_internal_test.go index 4d1d2f2344..599daaefa2 100644 --- a/core/consensus/qbft/qbft_internal_test.go +++ b/core/consensus/qbft/qbft_internal_test.go @@ -706,6 +706,129 @@ func TestQBFTConsensusHandle(t *testing.T) { } } +// TestQBFTConsensusHandleAmplificationLimits verifies that handle rejects messages +// carrying more justifications or values than a legitimate consensus message ever +// needs, before doing the expensive per-element signature recovery / unmarshalling. +// This caps the CPU/memory amplification a single authenticated peer can inflict. +func TestQBFTConsensusHandleAmplificationLimits(t *testing.T) { + // newConsensus returns a single-node consensus, so max justifications = 2*nodes = 2. + newConsensus := func(t *testing.T) (*Consensus, *k1.PrivateKey) { + t.Helper() + + var c Consensus + + deadliner := coremocks.NewDeadliner(t) + deadliner.On("Add", mock.Anything).Maybe().Return(core.DeadlineScheduled) + c.deadliner = deadliner + c.gaterFunc = func(core.Duty) bool { return true } + c.mutable.instances = make(map[core.Duty]*instance.IO[Msg]) + + p2pKey := testutil.GenerateInsecureK1Key(t, 0) + c.pubkeys = make(map[int64]*k1.PublicKey) + c.pubkeys[0] = p2pKey.PubKey() + + return &c, p2pKey + } + + // signedBase returns a validly-signed main message so verification reaches the limit checks. + signedBase := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTConsensusMsg { + t.Helper() + + base := &pbv1.QBFTConsensusMsg{Msg: newRandomQBFTMsg(t)} + base.Msg.PeerIdx = 0 + base.Msg.Round = 1 + base.Msg.Duty = &pbv1.Duty{Slot: 42, Type: 1} + + msgHash, err := hashProto(base.GetMsg()) + require.NoError(t, err) + + sign, err := k1util.Sign(p2pKey, msgHash[:]) + require.NoError(t, err) + + base.Msg.Signature = sign + + return base + } + + // signedJustification returns a validly-signed justification matching the base message's duty. + signedJustification := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTMsg { + t.Helper() + + j := newRandomQBFTMsg(t) + j.PeerIdx = 0 + j.Round = 1 // verifyMsg requires round > 0, don't rely on the random value. + j.Duty = &pbv1.Duty{Slot: 42, Type: 1} + + jHash, err := hashProto(j) + require.NoError(t, err) + + j.Signature, err = k1util.Sign(p2pKey, jHash[:]) + require.NoError(t, err) + + return j + } + + t.Run("too many justifications rejected", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // 3 justifications > 2*nodes (2). Content is irrelevant since the count + // check runs before any per-justification verification. + for range 3 { + base.Justification = append(base.Justification, &pbv1.QBFTMsg{}) + } + + _, _, err := c.handle(context.Background(), "peerID", base) + require.ErrorContains(t, err, "too many justifications") + }) + + t.Run("max justifications accepted", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // Exactly 2*nodes (2) justifications must not be rejected by the count check. + for range 2 { + base.Justification = append(base.Justification, signedJustification(t, p2pKey)) + } + + _, _, err := c.handle(context.Background(), "peerID", base) + require.NoError(t, err) + }) + + t.Run("too many values rejected", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // 0 justifications => max values = 2*(0+1) = 2. Provide 3. + base.Values = []*anypb.Any{{}, {}, {}} + + _, _, err := c.handle(context.Background(), "peerID", base) + require.ErrorContains(t, err, "too many values") + }) + + t.Run("max values accepted", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // 2 justifications => max values = 2*(2+1) = 6. A message carrying exactly + // the maximum must pass the count check and the rest of handle, guarding + // against the bound being tightened below the legitimate maximum. + for range 2 { + base.Justification = append(base.Justification, signedJustification(t, p2pKey)) + } + + for i := range 6 { + value, err := anypb.New(&pbv1.Duty{Slot: uint64(i + 1)}) + require.NoError(t, err) + + base.Values = append(base.Values, value) + } + + _, _, err := c.handle(context.Background(), "peerID", base) + require.NoError(t, err) + }) +} + func TestInstanceIO_MaybeStart(t *testing.T) { t.Run("MaybeStart for new instance", func(t *testing.T) { inst1 := instance.NewIO[Msg]() diff --git a/core/qbft/qbft.go b/core/qbft/qbft.go index f1093f02a1..c9ff955b15 100644 --- a/core/qbft/qbft.go +++ b/core/qbft/qbft.go @@ -158,6 +158,21 @@ type dedupKey struct { Round int64 } +// maxDecidedResends bounds the number of MsgDecided rebroadcasts that +// post-decision ROUND-CHANGE messages from a single peer can trigger. +// A lagging peer re-sends ROUND-CHANGE with an increasing round on each +// timeout until it learns the decided value, so a handful of resends is +// ample for liveness, while the cap stops a malicious peer minting +// ever-higher rounds from extracting unlimited large rebroadcasts. +const maxDecidedResends = 16 + +// decidedResend tracks the MsgDecided rebroadcasts triggered by a peer's +// post-decision ROUND-CHANGE messages. +type decidedResend struct { + Round int64 // Highest round a rebroadcast was triggered for. + Count int // Total rebroadcasts triggered by the peer. +} + // errors var ( errCompare = errors.New("compare leader value with local value failed") @@ -211,6 +226,7 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C], qCommit []Msg[I, V, C] buffer = make(map[int64][]Msg[I, V, C]) dedupRules = make(map[dedupKey]bool) + decidedResends = make(map[int64]decidedResend) // Bounds MsgDecided rebroadcasts by peer source. timerChan <-chan time.Time stopTimer func() ) @@ -273,6 +289,25 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C], return true } + // allowDecidedResend reports whether a post-decision ROUND-CHANGE from source at + // round may trigger a MsgDecided rebroadcast, recording it when it does. It permits + // at most one rebroadcast per source per strictly-increasing round, capped at + // maxDecidedResends per source, so duplicate, replayed or maliciously + // round-incremented messages can't repeatedly trigger a large rebroadcast + // (amplification DoS), while a peer advancing to a genuinely new round still gets + // served. Sources are authenticated by the transport, so the tracked set stays + // naturally bounded by the cluster size. + allowDecidedResend := func(incomingSource, incomingRound int64) bool { + resend := decidedResends[incomingSource] + if incomingRound <= resend.Round || resend.Count >= maxDecidedResends { + return false + } + + decidedResends[incomingSource] = decidedResend{Round: incomingRound, Count: resend.Count + 1} + + return true + } + // changeRound updates round and clears the rule dedup state. changeRound := func(newRound int64, rule UponRule) { if round == newRound { @@ -316,9 +351,12 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C], inputValueCh = nil // Don't read from this channel again. case msg := <-t.Receive: - // Just send Qcommit if consensus already decided + // Just send Qcommit if consensus already decided. The resend is rate-limited + // (see allowDecidedResend) to bound amplification; note this runs before the + // isJustified check, so the ROUND-CHANGE need not even be justified. if len(qCommit) > 0 { - if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17 + if msg.Source() != process && msg.Type() == MsgRoundChange && // Algorithm 3:17 + allowDecidedResend(msg.Source(), msg.Round()) { err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit) } diff --git a/core/qbft/qbft_internal_test.go b/core/qbft/qbft_internal_test.go index a078d6742b..d3dbefba7c 100644 --- a/core/qbft/qbft_internal_test.go +++ b/core/qbft/qbft_internal_test.go @@ -620,6 +620,118 @@ func (m msg) Justification() []Msg[int64, int64, int64] { return resp } +// TestDecidedRebroadcastLimits verifies that once consensus has decided, post-decision +// ROUND-CHANGE messages trigger at most one MsgDecided rebroadcast per source per +// (strictly increasing) round, capped at maxDecidedResends per source. This bounds +// amplification while still serving lagging peers that advance to new rounds. +func TestDecidedRebroadcastLimits(t *testing.T) { + const ( + n = 4 + process = 0 + value = 42 + ) + + // Build a justified MsgDecided: quorum (3) commits for round 1, value 42. + commits := []msg{ + {msgType: MsgCommit, peerIdx: 1, round: 1, value: value}, + {msgType: MsgCommit, peerIdx: 2, round: 1, value: value}, + {msgType: MsgCommit, peerIdx: 3, round: 1, value: value}, + } + decided := msg{msgType: MsgDecided, peerIdx: 1, round: 1, value: value, justify: commits} + + rc := func(source, round int64) msg { + return msg{msgType: MsgRoundChange, peerIdx: source, round: round} + } + + // runDecidedInstance starts a qbft instance, sends it the decided message and + // returns a synchronous send function plus the channel collecting MsgDecided + // broadcasts. The receive channel is unbuffered, so the instance only accepts + // a send once it has fully processed all earlier messages (the just-sent + // message may still be in flight, hence tests end with an inert flush send). + // This makes broadcast-count assertions deterministic. + runDecidedInstance := func(t *testing.T) (func(msg), chan MsgType) { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + recv := make(chan Msg[int64, int64, int64]) + decidedBroadcasts := make(chan MsgType, 100) + + def := noopDef + def.Nodes = n + def.FIFOLimit = 100 + def.Decide = func(context.Context, int64, int64, []Msg[int64, int64, int64]) {} + + trans := Transport[int64, int64, int64]{ + Broadcast: func(_ context.Context, typ MsgType, _ int64, _ int64, _ int64, _ int64, + _ int64, _ int64, _ []Msg[int64, int64, int64], + ) error { + if typ == MsgDecided { + decidedBroadcasts <- typ + } + + return nil + }, + Receive: recv, + } + + // Never-delivering input channels (this process is not a leader and proposes nothing). + go func() { + _ = Run(ctx, def, trans, 0, process, make(chan int64), make(chan int64)) + }() + + send := func(m msg) { + select { + case recv <- m: + case <-time.After(5 * time.Second): + require.Fail(t, "timeout sending message to qbft instance") + } + } + + send(decided) + + return send, decidedBroadcasts + } + + t.Run("dedup duplicates and stale rounds", func(t *testing.T) { + send, broadcasts := runDecidedInstance(t) + + for _, m := range []msg{ + rc(2, 2), // Rebroadcast #1. + rc(2, 2), // Duplicate, no rebroadcast. + rc(2, 2), // Duplicate, no rebroadcast. + rc(3, 2), // Rebroadcast #2 (other source). + rc(3, 2), // Duplicate, no rebroadcast. + rc(2, 1), // Stale round (already rebroadcast for round 2), no rebroadcast. + rc(2, 3), // Rebroadcast #3 (source advanced to a new round). + } { + send(m) + } + + // Flush with an inert message: once this send returns, all messages above + // have been fully processed, so the broadcast count is final. + send(rc(2, 1)) + + require.Len(t, broadcasts, 3) + }) + + t.Run("resend cap per source", func(t *testing.T) { + send, broadcasts := runDecidedInstance(t) + + // One peer keeps advancing rounds: only the first maxDecidedResends + // ROUND-CHANGE messages may trigger a rebroadcast. + for round := int64(2); round < 2+maxDecidedResends+5; round++ { + send(rc(2, round)) + } + + // Flush with an inert message (stale round, never rebroadcast). + send(rc(2, 1)) + + require.Len(t, broadcasts, maxDecidedResends) + }) +} + func TestIsJustifiedPrePrepare(t *testing.T) { const ( n = 4 diff --git a/p2p/sender.go b/p2p/sender.go index b23f07b79c..a36a69d15e 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -243,6 +243,21 @@ func WithDelimitedProtocol(pID protocol.ID) func(*sendRecvOpts) { } } +// WithReadLimit returns an option that caps the maximum size in bytes of a single +// message read for the registered protocol(s), overriding the default maxMsgSize (128MB). +// Use a tighter limit for protocols whose legitimate messages are known to be much +// smaller, to bound the receive/decode/allocation cost of oversized (potentially +// malicious) messages before they ever reach the handler. +func WithReadLimit(limit int) func(*sendRecvOpts) { + return func(opts *sendRecvOpts) { + for _, pID := range opts.protocols { + opts.readersByProtocol[pID] = func(s network.Stream) pbio.Reader { + return pbio.NewDelimitedReader(s, limit) + } + } + } +} + // SetFuzzerDefaultsUnsafe sets default reader and writer functions to fuzzed versions of the same if p2p fuzz is enabled. // // The fuzzReaderWriter is responsible for creating a customized reader and writer for each network stream diff --git a/p2p/sender_test.go b/p2p/sender_test.go index c0eb4bad47..3c0c07ee28 100644 --- a/p2p/sender_test.go +++ b/p2p/sender_test.go @@ -4,6 +4,8 @@ package p2p_test import ( "context" + "math" + "sync/atomic" "testing" "time" @@ -42,6 +44,33 @@ func TestWithReceiveTimeout(t *testing.T) { } } +func TestWithReadLimit(t *testing.T) { + servers := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))} + clients := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))} + + for i := range len(servers) { + client, server := clients[i], servers[i] + + client.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Hour) + + protocolID := protocol.ID("testprotocol") + + var handled atomic.Bool + + p2p.RegisterHandler("test", server, protocolID, func() proto.Message { return new(pbv1.Duty) }, + func(context.Context, peer.ID, proto.Message) (proto.Message, bool, error) { + handled.Store(true) // Must never run: the message exceeds the read limit. + return nil, false, nil + }, p2p.WithReadLimit(4)) // Tiny limit so any real message trips it. + + // Slot serializes to an 11 byte varint, well over the 4 byte read limit. + err := p2p.SendReceive(context.Background(), client, server.ID(), + &pbv1.Duty{Slot: math.MaxUint64}, new(pbv1.Duty), protocolID) + require.Error(t, err) + require.False(t, handled.Load(), "handler must not run when message exceeds read limit") + } +} + func TestWithSendTimeout(t *testing.T) { servers := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))} clients := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))}