@@ -28,9 +28,9 @@ import Control.Monad (forM_, forever)
28
28
import Control.Monad.Class.MonadFork (MonadFork (forkIO ))
29
29
import Control.Tracer (Tracer (Tracer ), traceWith )
30
30
import Data.Array (Array , listArray , (!) )
31
- import Data.Kind
32
31
import Data.Foldable (traverse_ )
33
32
import Data.Functor.Const (Const (Const ), getConst )
33
+ import Data.Kind
34
34
import STMCompat
35
35
36
36
class ConnectionBundle bundle where
@@ -48,8 +48,7 @@ class ConnectionBundle bundle where
48
48
-- 'toBundleMsg'. For example, a valid implementation would be:
49
49
--
50
50
-- > ToFromBundleMsg toDynamic (fromJust . fromDynamic)
51
- data ToFromBundleMsg mm a
52
- = ToFromBundleMsg
51
+ data ToFromBundleMsg mm a = ToFromBundleMsg
53
52
{ toBundleMsg :: a -> mm
54
53
, fromBundleMsg :: mm -> a
55
54
}
@@ -68,9 +67,10 @@ data BorneMsg a = BorneMsg !Int a
68
67
-- The mini protocols never see this, so this type is not exported. It does
69
68
-- occur in the argument types of some exported functions, but the caller
70
69
-- should be using parametric functions to generate those arguments.
71
- data BearerMsg a = BearerMsg ! Bytes [BorneMsg a ]
72
- -- ^ the cumulative size of the slices the borne messages whose /final/ slice
73
- -- is in this message
70
+ data BearerMsg a
71
+ = -- | the cumulative size of the slices the borne messages whose /final/ slice
72
+ -- is in this message
73
+ BearerMsg ! Bytes [BorneMsg a ]
74
74
75
75
instance MessageSize (BearerMsg a ) where
76
76
messageSizeBytes (BearerMsg sz _) = 1 + sz
@@ -140,7 +140,7 @@ newMuxChanSingle
140
140
takeMVar sendLock
141
141
atomically $
142
142
writeTQueue sendQueue $
143
- (sendLock, messageSizeBytes bundleMsg, muxmsg)
143
+ (sendLock, messageSizeBytes bundleMsg, muxmsg)
144
144
}
145
145
146
146
data RecvQueue m mm where
@@ -167,20 +167,20 @@ muxer ::
167
167
TQueue m (MVar m () , Bytes , BorneMsg (BundleMsg bundle )) ->
168
168
m ()
169
169
muxer bearer sendQueue =
170
- forever $ do
171
- x <- atomically (readTQueue sendQueue)
172
- (muxmsg, locks) <- go 0 [] [] x
173
- mapM_ (flip putMVar () ) locks
174
- writeChan bearer muxmsg
175
- where
176
- --- from ouroboros-network's @Network.Mux.Bearer.makeSocketBearer'@
177
- sliceBytes = 12288
178
- loafBytes = 131072
179
-
180
- go ! accBytes acc locks (lock, bytes, msg) = do
181
- let ! accBytes' = accBytes + min sliceBytes bytes
182
- (acc', locks') <-
183
- if bytes <= sliceBytes
170
+ forever $ do
171
+ x <- atomically (readTQueue sendQueue)
172
+ (muxmsg, locks) <- go 0 [] [] x
173
+ mapM_ (flip putMVar () ) locks
174
+ writeChan bearer muxmsg
175
+ where
176
+ --- from ouroboros-network's @Network.Mux.Bearer.makeSocketBearer'@
177
+ sliceBytes = 12288
178
+ loafBytes = 131072
179
+
180
+ go ! accBytes acc locks (lock, bytes, msg) = do
181
+ let ! accBytes' = accBytes + min sliceBytes bytes
182
+ (acc', locks') <-
183
+ if bytes <= sliceBytes
184
184
then do
185
185
-- We do not release the lock before finalizing the loaf because a
186
186
-- single loaf should include slices from at most one borne message
@@ -192,8 +192,10 @@ muxer bearer sendQueue =
192
192
atomically $ writeTQueue sendQueue (lock, bytes', msg)
193
193
pure (acc, locks)
194
194
195
- let result = (BearerMsg accBytes' acc', locks')
196
- if accBytes' >= loafBytes then pure result else do
195
+ let result = (BearerMsg accBytes' acc', locks')
196
+ if accBytes' >= loafBytes
197
+ then pure result
198
+ else do
197
199
atomically (tryReadTQueue sendQueue) >>= \ case
198
200
Nothing -> pure result
199
201
Just x -> go accBytes' acc' locks' x
0 commit comments