@@ -11381,22 +11381,6 @@ func (l *fastProdLogger) Debugf(format string, args ...any) {
1138111381 }
1138211382}
1138311383
11384- type slowWriteConn struct {
11385- sync.RWMutex
11386- net.Conn
11387- delay bool
11388- }
11389-
11390- func (c * slowWriteConn ) Write (b []byte ) (int , error ) {
11391- c .RLock ()
11392- delay := c .delay
11393- c .RUnlock ()
11394- if delay {
11395- time .Sleep (100 * time .Millisecond )
11396- }
11397- return c .Conn .Write (b )
11398- }
11399-
1140011384func TestNoRaceNoFastProducerStall (t * testing.T ) {
1140111385 tmpl := `
1140211386 listen: "127.0.0.1:-1"
@@ -11409,173 +11393,86 @@ func TestNoRaceNoFastProducerStall(t *testing.T) {
1140911393 l := & fastProdLogger {gotIt : make (chan struct {}, 1 )}
1141011394 s .SetLogger (l , true , false )
1141111395
11412- ncSlow := natsConnect (t , s .ClientURL (), nats . ReconnectWait ( 10 * time . Millisecond ) )
11396+ ncSlow := natsConnect (t , s .ClientURL ())
1141311397 defer ncSlow .Close ()
1141411398 natsSub (t , ncSlow , "foo" , func (_ * nats.Msg ) {})
1141511399 natsFlush (t , ncSlow )
1141611400
11401+ nc := natsConnect (t , s .ClientURL ())
11402+ defer nc .Close ()
11403+ traceSub := natsSubSync (t , nc , "my.trace.subj" )
11404+ natsFlush (t , nc )
11405+
11406+ ncProd := natsConnect (t , s .ClientURL ())
11407+ defer ncProd .Close ()
11408+
11409+ payload := make ([]byte , 256 )
11410+
1141711411 cid , err := ncSlow .GetClientID ()
1141811412 require_NoError (t , err )
1141911413 c := s .GetClient (cid )
1142011414 require_True (t , c != nil )
1142111415
11422- c .mu .Lock ()
11423- swc := & slowWriteConn {Conn : c .nc , delay : true }
11424- c .nc = swc
11425- c .mu .Unlock ()
11416+ wg := sync.WaitGroup {}
11417+ pub := func () {
11418+ wg .Add (1 )
11419+ go func () {
11420+ defer wg .Done ()
1142611421
11427- defer func () {
11428- swc .Lock ()
11429- swc .delay = false
11430- swc .Unlock ()
11431- }()
11422+ msg := nats .NewMsg ("foo" )
11423+ msg .Header .Set (MsgTraceDest , traceSub .Subject )
11424+ msg .Data = payload
11425+ ncProd .PublishMsg (msg )
11426+ }()
11427+ }
1143211428
11433- // The producer could still overwhelm the "fast" consumer.
11434- // So we will send more than what it will use as a target
11435- // to consider the test done.
11436- total := 2_000_000
11437- target := total / 2
11438- ch := make ( chan struct {}, 1 )
11439- var count int
11440-
11441- ncFast := natsConnect (t , s . ClientURL (), nats . ReconnectWait ( 10 * time . Millisecond ) )
11442- defer ncFast . Close ()
11443- fastSub := natsSub (t , ncFast , "foo" , func ( _ * nats. Msg ) {
11444- if count ++ ; count == target {
11445- ch <- struct {}{}
11429+ checkTraceMsg := func ( err string ) {
11430+ t . Helper ()
11431+ var e MsgTraceEvent
11432+ traceMsg := natsNexMsg ( t , traceSub , time . Second )
11433+ json . Unmarshal ( traceMsg . Data , & e )
11434+ egresses := e . Egresses ( )
11435+ require_Equal ( t , len ( egresses ), 1 )
11436+ eg := egresses [ 0 ]
11437+ require_Equal (t , eg . CID , cid )
11438+ if err != _EMPTY_ {
11439+ require_Contains (t , eg . Error , err )
11440+ } else {
11441+ require_Equal ( t , eg . Error , _EMPTY_ )
1144611442 }
11447- })
11448- err = fastSub .SetPendingLimits (- 1 , - 1 )
11449- require_NoError (t , err )
11450- natsFlush (t , ncFast )
11451-
11452- ncProd := natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11453- defer ncProd .Close ()
11454-
11455- cid , err = ncProd .GetClientID ()
11456- require_NoError (t , err )
11457- pc := s .GetClient (cid )
11458- pc .mu .Lock ()
11459- pcnc := pc .nc
11460- pc .mu .Unlock ()
11443+ }
1146111444
11462- payload := make ([]byte , 256 )
11445+ // Artificially set a stall channel.
11446+ c .mu .Lock ()
11447+ c .out .stc = make (chan struct {})
11448+ c .mu .Unlock ()
1146311449
11464- for i := 0 ; i < total ; i ++ {
11465- natsPub (t , ncProd , "foo" , payload )
11466- }
11467- select {
11468- case <- ch :
11469- // OK
11470- case <- time .After (10 * time .Second ):
11471- t .Fatal ("Test timed-out" )
11472- }
11473- // Now wait a bit and make sure we did not get any fast producer debug statements.
11450+ // Publish a message, it should not stall the producer.
11451+ pub ()
11452+ // Now make sure we did not get any fast producer debug statements.
1147411453 select {
1147511454 case <- l .gotIt :
1147611455 t .Fatal ("Got debug logs about fast producer" )
11477- case <- time .After (time .Second ):
11478- // OK
11456+ case <- time .After (250 * time .Millisecond ):
11457+ // OK!
1147911458 }
11459+ wg .Wait ()
11460+
11461+ checkTraceMsg (errMsgTraceFastProdNoStall )
1148011462
11481- // We don't need that one anymore
11482- ncFast .Close ()
1148311463 // Now we will conf reload to enable fast producer stalling.
1148411464 reloadUpdateConfig (t , s , conf , fmt .Sprintf (tmpl , "false" ))
1148511465
11486- // Since the producer can block, we will publish from a different routine,
11487- // and check for the debug trace from the main.
11488- wg := sync.WaitGroup {}
11489- wg .Add (1 )
11490- doneCh := make (chan struct {})
11491- go func () {
11492- defer wg .Done ()
11493- for i := 0 ; ; i ++ {
11494- ncProd .Publish ("foo" , payload )
11495- select {
11496- case <- doneCh :
11497- return
11498- default :
11499- }
11500- if i % 1000 == 0 {
11501- time .Sleep (time .Millisecond )
11502- }
11503- }
11504- }()
11466+ // Publish, this time the prod should be stalled.
11467+ pub ()
1150511468 select {
1150611469 case <- l .gotIt :
11507- pcnc .Close ()
11508- close (doneCh )
11509- case <- time .After (20 * time .Second ):
11470+ // OK!
11471+ case <- time .After (500 * time .Millisecond ):
1151011472 t .Fatal ("Timed-out waiting for a warning" )
1151111473 }
1151211474 wg .Wait ()
11513- }
11514-
11515- func TestNoRaceNoFastProducerStallAndMsgTrace (t * testing.T ) {
11516- o := DefaultOptions ()
11517- o .NoFastProducerStall = true
11518- o .WriteDeadline = 2 * time .Second
11519- s := RunServer (o )
11520- defer s .Shutdown ()
11521-
11522- ncSlow := natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11523- defer ncSlow .Close ()
11524- natsSub (t , ncSlow , "foo" , func (_ * nats.Msg ) {})
11525- natsFlush (t , ncSlow )
1152611475
11527- cid , err := ncSlow .GetClientID ()
11528- require_NoError (t , err )
11529- c := s .GetClient (cid )
11530- require_True (t , c != nil )
11531-
11532- c .mu .Lock ()
11533- swc := & slowWriteConn {Conn : c .nc , delay : true }
11534- c .nc = swc
11535- c .mu .Unlock ()
11536-
11537- defer func () {
11538- swc .Lock ()
11539- swc .delay = false
11540- swc .Unlock ()
11541- }()
11542-
11543- ncProd := natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11544- defer ncProd .Close ()
11545-
11546- payload := make ([]byte , 256 )
11547-
11548- nc := natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11549- defer nc .Close ()
11550- doneCh := make (chan struct {})
11551- traceSub := natsSub (t , nc , "my.trace.subj" , func (traceMsg * nats.Msg ) {
11552- var e MsgTraceEvent
11553- json .Unmarshal (traceMsg .Data , & e )
11554- egresses := e .Egresses ()
11555- if len (egresses ) == 1 {
11556- if err := egresses [0 ].Error ; err == errMsgTraceFastProdNoStall {
11557- close (doneCh )
11558- traceMsg .Sub .Unsubscribe ()
11559- }
11560- }
11561- })
11562- err = traceSub .SetPendingLimits (- 1 , - 1 )
11563- require_NoError (t , err )
11564- natsFlush (t , nc )
11565-
11566- msg := nats .NewMsg ("foo" )
11567- msg .Header .Set (MsgTraceDest , traceSub .Subject )
11568- msg .Data = payload
11569- for i , done := 0 , false ; ! done ; i ++ {
11570- ncProd .PublishMsg (msg )
11571- select {
11572- case <- doneCh :
11573- // OK
11574- return
11575- default :
11576- }
11577- if i % 1000 == 0 {
11578- time .Sleep (time .Millisecond )
11579- }
11580- }
11476+ // Should have been delivered to the trace subscription.
11477+ checkTraceMsg (_EMPTY_ )
1158111478}
0 commit comments