@@ -329,7 +329,8 @@ public void openBlock(final long blockNumber) {
329329 earliestBlockNumber .updateAndGet (
330330 current -> current == Long .MIN_VALUE ? blockNumber : Math .min (current , blockNumber ));
331331 lastProducedBlockNumber .updateAndGet (old -> Math .max (old , blockNumber ));
332- blockStreamMetrics .setProducingBlockNumber (blockNumber );
332+ blockStreamMetrics .recordLatestBlockOpened (blockNumber );
333+ blockStreamMetrics .recordBlockOpened ();
333334 blockNodeConnectionManager .openBlock (blockNumber );
334335 }
335336
@@ -367,6 +368,7 @@ public void closeBlock(final long blockNumber) {
367368 throw new IllegalStateException ("Block state not found for block " + blockNumber );
368369 }
369370
371+ blockStreamMetrics .recordBlockClosed ();
370372 blockState .closeBlock ();
371373 }
372374
@@ -377,7 +379,13 @@ public void closeBlock(final long blockNumber) {
377379 * @return the block state, or null if no block state exists for the given block number
378380 */
379381 public @ Nullable BlockState getBlockState (final long blockNumber ) {
380- return blockBuffer .get (blockNumber );
382+ final BlockState block = blockBuffer .get (blockNumber );
383+
384+ if (block == null ) {
385+ blockStreamMetrics .recordBlockMissing ();
386+ }
387+
388+ return block ;
381389 }
382390
383391 /**
@@ -402,7 +410,7 @@ public void setLatestAcknowledgedBlock(final long blockNumber) {
402410 }
403411
404412 final long highestBlock = highestAckedBlockNumber .updateAndGet (current -> Math .max (current , blockNumber ));
405- blockStreamMetrics .setLatestAcknowledgedBlockNumber (highestBlock );
413+ blockStreamMetrics .recordLatestBlockAcked (highestBlock );
406414 }
407415
408416 /**
@@ -562,7 +570,6 @@ block period (e.g. 2 seconds). This gives us an ideal number of blocks in the bu
562570 int numPruned = 0 ;
563571 int numChecked = 0 ;
564572 int numPendingAck = 0 ;
565- final AtomicReference <Instant > oldestUnackedTimestamp = new AtomicReference <>(Instant .MAX );
566573 long newEarliestBlock = Long .MAX_VALUE ;
567574 long newLatestBlock = Long .MIN_VALUE ;
568575
@@ -587,8 +594,6 @@ block period (e.g. 2 seconds). This gives us an ideal number of blocks in the bu
587594 // Track unacknowledged blocks
588595 if (block .blockNumber () > highestBlockAcked ) {
589596 ++numPendingAck ;
590- oldestUnackedTimestamp .updateAndGet (
591- current -> current .compareTo (closedTimestamp ) < 0 ? current : closedTimestamp );
592597 }
593598 // Keep track of earliest remaining block
594599 newEarliestBlock = Math .min (newEarliestBlock , blockNum );
@@ -609,18 +614,17 @@ block period (e.g. 2 seconds). This gives us an ideal number of blocks in the bu
609614 // keep track of earliest remaining block
610615 newEarliestBlock = Math .min (newEarliestBlock , blockNum );
611616 newLatestBlock = Math .max (newLatestBlock , blockNum );
612- oldestUnackedTimestamp .updateAndGet (
613- current -> current .compareTo (closedTimestamp ) < 0 ? current : closedTimestamp );
614617 }
615618 }
616619
617620 // update the earliest block number after pruning
618- earliestBlockNumber .set (newEarliestBlock == Long .MAX_VALUE ? -1 : newEarliestBlock );
621+ newEarliestBlock = newEarliestBlock == Long .MAX_VALUE ? -1 : newEarliestBlock ;
622+ newLatestBlock = newLatestBlock == Long .MIN_VALUE ? -1 : newLatestBlock ;
623+ earliestBlockNumber .set (newEarliestBlock );
619624
620- final long oldestUnackedMillis = Instant .MAX .equals (oldestUnackedTimestamp .get ())
621- ? -1 // sentinel value indicating no blocks are unacked
622- : oldestUnackedTimestamp .get ().toEpochMilli ();
623- blockStreamMetrics .setOldestUnacknowledgedBlockTime (oldestUnackedMillis );
625+ blockStreamMetrics .recordNumberOfBlocksPruned (numPruned );
626+ blockStreamMetrics .recordBufferOldestBlock (newEarliestBlock );
627+ blockStreamMetrics .recordBufferNewestBlock (newLatestBlock );
624628
625629 return new PruneResult (
626630 idealMaxBufferSize , numChecked , numPendingAck , numPruned , newEarliestBlock , newLatestBlock );
@@ -667,6 +671,17 @@ static class PruneResult {
667671 .doubleValue ();
668672 }
669673 }
674+
675+ @ Override
676+ public String toString () {
677+ return "PruneResult{" + "idealMaxBufferSize="
678+ + idealMaxBufferSize + ", numBlocksChecked="
679+ + numBlocksChecked + ", numBlocksPendingAck="
680+ + numBlocksPendingAck + ", numBlocksPruned="
681+ + numBlocksPruned + ", saturationPercent="
682+ + saturationPercent + ", isSaturated="
683+ + isSaturated + '}' ;
684+ }
670685 }
671686
672687 /**
@@ -690,11 +705,11 @@ private void checkBuffer() {
690705 pruningResult .numBlocksChecked ,
691706 pruningResult .numBlocksPruned ,
692707 pruningResult .numBlocksPendingAck ,
693- pruningResult .oldestBlockNumber == Long . MAX_VALUE ? "-" : pruningResult .oldestBlockNumber ,
694- pruningResult .newestBlockNumber == Long . MIN_VALUE ? "-" : pruningResult .newestBlockNumber ,
708+ pruningResult .oldestBlockNumber == - 1 ? "-" : pruningResult .oldestBlockNumber ,
709+ pruningResult .newestBlockNumber == - 1 ? "-" : pruningResult .newestBlockNumber ,
695710 pruningResult .saturationPercent );
696711
697- blockStreamMetrics .updateBlockBufferSaturation (pruningResult .saturationPercent );
712+ blockStreamMetrics .recordBufferSaturation (pruningResult .saturationPercent );
698713
699714 final double actionStageThreshold = actionStageThreshold ();
700715
@@ -705,23 +720,25 @@ private void checkBuffer() {
705720 The buffer has transitioned from zero/low saturation levels to fully saturated. We need to ensure back
706721 pressure is engaged and potentially change which Block Node we are connected to.
707722 */
723+ blockStreamMetrics .recordBackPressureActive ();
708724 enableBackPressure (pruningResult );
709725 switchBlockNodeIfPermitted (pruningResult );
710726 } else if (pruningResult .saturationPercent >= actionStageThreshold ) {
711-
712727 /*
713728 Zero -> Action Stage
714729 The buffer has transitioned from zero/low saturation levels to exceeding the action stage threshold. We
715730 don't need to engage back pressure, but we should take proactive measures and swap to a different
716731 Block Node.
717732 */
733+ blockStreamMetrics .recordBackPressureActionStage ();
718734 switchBlockNodeIfPermitted (pruningResult );
719735 } else {
720736 /*
721737 Zero -> Zero
722738 Before and after the pruning, the buffer saturation remained lower than the action stage threshold so
723739 there is no action we need to take.
724740 */
741+ blockStreamMetrics .recordBackPressureDisabled ();
725742 }
726743 } else if (!previousPruneResult .isSaturated && previousPruneResult .saturationPercent >= actionStageThreshold ) {
727744 if (pruningResult .isSaturated ) {
@@ -730,6 +747,7 @@ private void checkBuffer() {
730747 The buffer has transitioned from the action stage saturation level to being completely full/saturated.
731748 Back pressure needs to be applied and possibly switch to a different Block Node.
732749 */
750+ blockStreamMetrics .recordBackPressureActive ();
733751 enableBackPressure (pruningResult );
734752 switchBlockNodeIfPermitted (pruningResult );
735753 } else if (pruningResult .saturationPercent >= actionStageThreshold ) {
@@ -739,13 +757,15 @@ private void checkBuffer() {
739757 does not need to be enabled yet (though may eventually if recovery is slow/blocked) but we should maybe
740758 swap Block Node connections.
741759 */
760+ blockStreamMetrics .recordBackPressureActionStage ();
742761 switchBlockNodeIfPermitted (pruningResult );
743762 } else {
744763 /*
745764 Action Stage -> Zero
746765 The buffer has transitioned from an action stage to having a saturation that is below the action stage
747766 threshold. There is no further action to take since recovery has been achieved.
748767 */
768+ blockStreamMetrics .recordBackPressureDisabled ();
749769 }
750770 } else if (previousPruneResult .isSaturated ) {
751771 if (pruningResult .isSaturated ) {
@@ -754,8 +774,9 @@ private void checkBuffer() {
754774 Before and after pruning, the buffer remained fully saturated. Back pressure should be enabled - if not
755775 already - and we should maybe swap to a different Block Node.
756776 */
757- enableBackPressure ( pruningResult );
777+ blockStreamMetrics . recordBackPressureActive ( );
758778 switchBlockNodeIfPermitted (pruningResult );
779+ enableBackPressure (pruningResult );
759780 } else if (pruningResult .saturationPercent >= actionStageThreshold ) {
760781 /*
761782 Full -> Action Stage
@@ -765,6 +786,11 @@ private void checkBuffer() {
765786 connect to a different Block Node.
766787 */
767788 disableBackPressureIfRecovered (pruningResult );
789+ if (awaitingRecovery ) {
790+ blockStreamMetrics .recordBackPressureRecovering ();
791+ } else {
792+ blockStreamMetrics .recordBackPressureActionStage ();
793+ }
768794 } else {
769795 /*
770796 Full -> Zero
@@ -773,6 +799,11 @@ private void checkBuffer() {
773799 since the buffer fully recovered we should avoid trying to connect to a different Block Node.
774800 */
775801 disableBackPressureIfRecovered (pruningResult );
802+ if (awaitingRecovery ) {
803+ blockStreamMetrics .recordBackPressureRecovering ();
804+ } else {
805+ blockStreamMetrics .recordBackPressureDisabled ();
806+ }
776807 }
777808 }
778809
0 commit comments