@@ -147,11 +147,19 @@ const LINK_DELAY_DURATION: Duration = Duration::from_secs(5);
147
147
#[ cfg( test) ]
148
148
const LINK_DELAY_DURATION : Duration = Duration :: from_millis ( 333 ) ;
149
149
150
- #[ derive( PartialEq ) ]
151
- enum CoordinatorMessage {
150
+ #[ derive( Default , PartialEq ) ]
151
+ struct CoordinatorMessageUpdateType {
152
152
/// Update guest VF state based on current availability and the guest VF state tracked by the primary channel.
153
153
/// This includes adding the guest VF device and switching the data path.
154
- UpdateGuestVfState ,
154
+ guest_vf_state : bool ,
155
+ /// Update the receive filter for all channels.
156
+ filter_state : bool ,
157
+ }
158
+
159
+ #[ derive( PartialEq ) ]
160
+ enum CoordinatorMessage {
161
+ /// Update network state.
162
+ Update ( CoordinatorMessageUpdateType ) ,
155
163
/// Restart endpoints and resume processing. This will also attempt to set VF and data path state to match current
156
164
/// expectations.
157
165
Restart ,
@@ -382,6 +390,7 @@ struct NetChannel<T: RingMem> {
382
390
pending_send_size : usize ,
383
391
restart : Option < CoordinatorMessage > ,
384
392
can_use_ring_size_opt : bool ,
393
+ packet_filter : u32 ,
385
394
}
386
395
387
396
/// Buffers used during packet processing.
@@ -432,6 +441,7 @@ struct ActiveState {
432
441
pending_tx_packets : Vec < PendingTxPacket > ,
433
442
free_tx_packets : Vec < TxId > ,
434
443
pending_tx_completions : VecDeque < PendingTxCompletion > ,
444
+ pending_rx_packets : VecDeque < RxId > ,
435
445
436
446
rx_bufs : RxBuffers ,
437
447
@@ -442,6 +452,7 @@ struct ActiveState {
442
452
struct QueueStats {
443
453
tx_stalled : Counter ,
444
454
rx_dropped_ring_full : Counter ,
455
+ rx_dropped_filtered : Counter ,
445
456
spurious_wakes : Counter ,
446
457
rx_packets : Counter ,
447
458
tx_packets : Counter ,
@@ -908,6 +919,7 @@ impl ActiveState {
908
919
pending_tx_packets : vec ! [ Default :: default ( ) ; TX_PACKET_QUOTA ] ,
909
920
free_tx_packets : ( 0 ..TX_PACKET_QUOTA as u32 ) . rev ( ) . map ( TxId ) . collect ( ) ,
910
921
pending_tx_completions : VecDeque :: new ( ) ,
922
+ pending_rx_packets : VecDeque :: new ( ) ,
911
923
rx_bufs : RxBuffers :: new ( recv_buffer_count) ,
912
924
stats : Default :: default ( ) ,
913
925
}
@@ -1364,6 +1376,7 @@ impl Nic {
1364
1376
pending_send_size : 0 ,
1365
1377
restart : None ,
1366
1378
can_use_ring_size_opt,
1379
+ packet_filter : rndisprot:: NDIS_PACKET_TYPE_NONE ,
1367
1380
} ,
1368
1381
state,
1369
1382
coordinator_send : self . coordinator_send . clone ( ) . unwrap ( ) ,
@@ -1453,6 +1466,7 @@ impl Nic {
1453
1466
mut control : RestoreControl < ' _ > ,
1454
1467
state : saved_state:: SavedState ,
1455
1468
) -> Result < ( ) , NetRestoreError > {
1469
+ let mut saved_packet_filter = 0u32 ;
1456
1470
if let Some ( state) = state. open {
1457
1471
let open = match & state. primary {
1458
1472
saved_state:: Primary :: Version => vec ! [ true ] ,
@@ -1537,8 +1551,12 @@ impl Nic {
1537
1551
tx_spread_sent,
1538
1552
guest_link_down,
1539
1553
pending_link_action,
1554
+ packet_filter,
1540
1555
} = ready;
1541
1556
1557
+ // If saved state does not have a packet filter set, default to directed, multicast, and broadcast.
1558
+ saved_packet_filter = packet_filter. unwrap_or ( rndisprot:: NPROTO_PACKET_FILTER ) ;
1559
+
1542
1560
let version = check_version ( version)
1543
1561
. ok_or ( NetRestoreError :: UnsupportedVersion ( version) ) ?;
1544
1562
@@ -1621,6 +1639,11 @@ impl Nic {
1621
1639
self . insert_worker ( channel_idx as u16 , & request. unwrap ( ) , state, false ) ?;
1622
1640
}
1623
1641
}
1642
+ for worker in self . coordinator . state_mut ( ) . unwrap ( ) . workers . iter_mut ( ) {
1643
+ if let Some ( worker_state) = worker. state_mut ( ) {
1644
+ worker_state. channel . packet_filter = saved_packet_filter;
1645
+ }
1646
+ }
1624
1647
} else {
1625
1648
control
1626
1649
. restore ( & [ false ] )
@@ -1782,6 +1805,11 @@ impl Nic {
1782
1805
PrimaryChannelGuestVfState :: Restoring ( saved_state) => saved_state,
1783
1806
} ;
1784
1807
1808
+ let worker_0_packet_filter = coordinator. workers [ 0 ]
1809
+ . state ( )
1810
+ . unwrap ( )
1811
+ . channel
1812
+ . packet_filter ;
1785
1813
saved_state:: Primary :: Ready ( saved_state:: ReadyPrimary {
1786
1814
version : ready. buffers . version as u32 ,
1787
1815
receive_buffer : ready. buffers . recv_buffer . saved_state ( ) ,
@@ -1811,6 +1839,7 @@ impl Nic {
1811
1839
tx_spread_sent : primary. tx_spread_sent ,
1812
1840
guest_link_down : !primary. guest_link_up ,
1813
1841
pending_link_action,
1842
+ packet_filter : Some ( worker_0_packet_filter) ,
1814
1843
} )
1815
1844
}
1816
1845
} ;
@@ -2594,7 +2623,12 @@ impl<T: RingMem> NetChannel<T> {
2594
2623
if primary. rndis_state == RndisState :: Operational {
2595
2624
if self . guest_vf_is_available ( Some ( vfid) , buffers. version , buffers. ndis_config ) ? {
2596
2625
primary. guest_vf_state = PrimaryChannelGuestVfState :: AvailableAdvertised ;
2597
- return Ok ( Some ( CoordinatorMessage :: UpdateGuestVfState ) ) ;
2626
+ return Ok ( Some ( CoordinatorMessage :: Update (
2627
+ CoordinatorMessageUpdateType {
2628
+ guest_vf_state : true ,
2629
+ ..Default :: default ( )
2630
+ } ,
2631
+ ) ) ) ;
2598
2632
} else if let Some ( true ) = primary. is_data_path_switched {
2599
2633
tracing:: error!(
2600
2634
"Data path switched, but current guest negotiation does not support VTL0 VF"
@@ -2734,10 +2768,7 @@ impl<T: RingMem> NetChannel<T> {
2734
2768
// flag on inband packets and won't send a completion
2735
2769
// packet.
2736
2770
primary. guest_vf_state = PrimaryChannelGuestVfState :: AvailableAdvertised ;
2737
- // restart will also add the VF based on the guest_vf_state
2738
- if self . restart . is_none ( ) {
2739
- self . restart = Some ( CoordinatorMessage :: UpdateGuestVfState ) ;
2740
- }
2771
+ self . send_coordinator_update_vf ( ) ;
2741
2772
} else if let Some ( true ) = primary. is_data_path_switched {
2742
2773
tracing:: error!(
2743
2774
"Data path switched, but current guest negotiation does not support VTL0 VF"
@@ -2785,12 +2816,18 @@ impl<T: RingMem> NetChannel<T> {
2785
2816
tracing:: trace!( ?request, "handling control message MESSAGE_TYPE_SET_MSG" ) ;
2786
2817
2787
2818
let status = match self . adapter . handle_oid_set ( primary, request. oid , reader) {
2788
- Ok ( restart_endpoint) => {
2819
+ Ok ( ( restart_endpoint, packet_filter ) ) => {
2789
2820
// Restart the endpoint if the OID changed some critical
2790
2821
// endpoint property.
2791
2822
if restart_endpoint {
2792
2823
self . restart = Some ( CoordinatorMessage :: Restart ) ;
2793
2824
}
2825
+ if let Some ( filter) = packet_filter {
2826
+ if self . packet_filter != filter {
2827
+ self . packet_filter = filter;
2828
+ self . send_coordinator_update_filter ( ) ;
2829
+ }
2830
+ }
2794
2831
rndisprot:: STATUS_SUCCESS
2795
2832
}
2796
2833
Err ( err) => {
@@ -2974,6 +3011,31 @@ impl<T: RingMem> NetChannel<T> {
2974
3011
}
2975
3012
Ok ( ( ) )
2976
3013
}
3014
+
3015
+ fn send_coordinator_update_message ( & mut self , guest_vf : bool , packet_filter : bool ) {
3016
+ if self . restart . is_none ( ) {
3017
+ self . restart = Some ( CoordinatorMessage :: Update ( CoordinatorMessageUpdateType {
3018
+ guest_vf_state : guest_vf,
3019
+ filter_state : packet_filter,
3020
+ } ) ) ;
3021
+ } else if let Some ( CoordinatorMessage :: Restart ) = self . restart {
3022
+ // If a restart message is pending, do nothing.
3023
+ // A restart will try to switch the data path based on primary.guest_vf_state.
3024
+ // A restart will apply packet filter changes.
3025
+ } else if let Some ( CoordinatorMessage :: Update ( ref mut update) ) = self . restart {
3026
+ // Add the new update to the existing message.
3027
+ update. guest_vf_state |= guest_vf;
3028
+ update. filter_state |= packet_filter;
3029
+ }
3030
+ }
3031
+
3032
+ fn send_coordinator_update_vf ( & mut self ) {
3033
+ self . send_coordinator_update_message ( true , false ) ;
3034
+ }
3035
+
3036
+ fn send_coordinator_update_filter ( & mut self ) {
3037
+ self . send_coordinator_update_message ( false , true ) ;
3038
+ }
2977
3039
}
2978
3040
2979
3041
/// Writes an RNDIS message to `writer`.
@@ -3291,13 +3353,14 @@ impl Adapter {
3291
3353
primary : & mut PrimaryChannelState ,
3292
3354
oid : rndisprot:: Oid ,
3293
3355
reader : impl MemoryRead + Clone ,
3294
- ) -> Result < bool , OidError > {
3356
+ ) -> Result < ( bool , Option < u32 > ) , OidError > {
3295
3357
tracing:: debug!( ?oid, "oid set" ) ;
3296
3358
3297
3359
let mut restart_endpoint = false ;
3360
+ let mut packet_filter = None ;
3298
3361
match oid {
3299
3362
rndisprot:: Oid :: OID_GEN_CURRENT_PACKET_FILTER => {
3300
- // TODO
3363
+ packet_filter = self . oid_set_packet_filter ( reader ) ? ;
3301
3364
}
3302
3365
rndisprot:: Oid :: OID_TCP_OFFLOAD_PARAMETERS => {
3303
3366
self . oid_set_offload_parameters ( reader, primary) ?;
@@ -3324,7 +3387,7 @@ impl Adapter {
3324
3387
return Err ( OidError :: UnknownOid ) ;
3325
3388
}
3326
3389
}
3327
- Ok ( restart_endpoint)
3390
+ Ok ( ( restart_endpoint, packet_filter ) )
3328
3391
}
3329
3392
3330
3393
fn oid_set_rss_parameters (
@@ -3382,6 +3445,15 @@ impl Adapter {
3382
3445
Ok ( ( ) )
3383
3446
}
3384
3447
3448
+ fn oid_set_packet_filter (
3449
+ & self ,
3450
+ reader : impl MemoryRead + Clone ,
3451
+ ) -> Result < Option < u32 > , OidError > {
3452
+ let filter: rndisprot:: RndisPacketFilterOidValue = reader. clone ( ) . read_plain ( ) ?;
3453
+ tracing:: debug!( filter, "set packet filter" ) ;
3454
+ Ok ( Some ( filter) )
3455
+ }
3456
+
3385
3457
fn oid_set_offload_parameters (
3386
3458
& self ,
3387
3459
reader : impl MemoryRead + Clone ,
@@ -3872,8 +3944,26 @@ impl Coordinator {
3872
3944
}
3873
3945
sleep_duration = None ;
3874
3946
}
3875
- Message :: Internal ( CoordinatorMessage :: UpdateGuestVfState ) => {
3876
- self . update_guest_vf_state ( state) . await ;
3947
+ Message :: Internal ( CoordinatorMessage :: Update ( update_type) ) => {
3948
+ if update_type. filter_state {
3949
+ self . stop_workers ( ) . await ;
3950
+ let worker_0_packet_filter =
3951
+ self . workers [ 0 ] . state ( ) . unwrap ( ) . channel . packet_filter ;
3952
+ self . workers . iter_mut ( ) . skip ( 1 ) . for_each ( |worker| {
3953
+ if let Some ( state) = worker. state_mut ( ) {
3954
+ state. channel . packet_filter = worker_0_packet_filter;
3955
+ tracing:: debug!(
3956
+ packet_filter = ?worker_0_packet_filter,
3957
+ channel_idx = state. channel_idx,
3958
+ "update packet filter"
3959
+ ) ;
3960
+ }
3961
+ } ) ;
3962
+ }
3963
+
3964
+ if update_type. guest_vf_state {
3965
+ self . update_guest_vf_state ( state) . await ;
3966
+ }
3877
3967
}
3878
3968
Message :: UpdateFromEndpoint ( EndpointAction :: RestartRequired ) => self . restart = true ,
3879
3969
Message :: UpdateFromEndpoint ( EndpointAction :: LinkStatusNotify ( connect) ) => {
@@ -4306,13 +4396,22 @@ impl Coordinator {
4306
4396
self . num_queues = num_queues;
4307
4397
}
4308
4398
4399
+ let worker_0_packet_filter = self . workers [ 0 ] . state ( ) . unwrap ( ) . channel . packet_filter ;
4309
4400
// Provide the queue and receive buffer ranges for each worker.
4310
4401
for ( ( worker, queue) , rx_buffer) in self . workers . iter_mut ( ) . zip ( queues) . zip ( rx_buffers) {
4311
4402
worker. task_mut ( ) . queue_state = Some ( QueueState {
4312
4403
queue,
4313
4404
target_vp_set : false ,
4314
4405
rx_buffer_range : rx_buffer,
4315
4406
} ) ;
4407
+ // Update the receive packet filter for the subchannel worker.
4408
+ if let Some ( worker) = worker. state_mut ( ) {
4409
+ worker. channel . packet_filter = worker_0_packet_filter;
4410
+ // Clear any pending RxIds as buffers were redistributed.
4411
+ if let Some ( ready_state) = worker. state . ready_mut ( ) {
4412
+ ready_state. state . pending_rx_packets . clear ( ) ;
4413
+ }
4414
+ }
4316
4415
}
4317
4416
4318
4417
Ok ( ( ) )
@@ -4726,6 +4825,17 @@ impl<T: 'static + RingMem> NetChannel<T> {
4726
4825
send. capacity ( ) - limit
4727
4826
} ;
4728
4827
4828
+ // If the packet filter has changed to allow rx packets, add any pended RxIds.
4829
+ if !state. pending_rx_packets . is_empty ( )
4830
+ && self . packet_filter != rndisprot:: NDIS_PACKET_TYPE_NONE
4831
+ {
4832
+ let epqueue = queue_state. queue . as_mut ( ) ;
4833
+ let ( front, back) = state. pending_rx_packets . as_slices ( ) ;
4834
+ epqueue. rx_avail ( front) ;
4835
+ epqueue. rx_avail ( back) ;
4836
+ state. pending_rx_packets . clear ( ) ;
4837
+ }
4838
+
4729
4839
// Handle any guest state changes since last run.
4730
4840
if let Some ( primary) = state. primary . as_mut ( ) {
4731
4841
if primary. requested_num_queues > 1 && !primary. tx_spread_sent {
@@ -4927,6 +5037,22 @@ impl<T: 'static + RingMem> NetChannel<T> {
4927
5037
return Ok ( false ) ;
4928
5038
}
4929
5039
5040
+ state. stats . rx_packets_per_wake . add_sample ( n as u64 ) ;
5041
+
5042
+ if self . packet_filter == rndisprot:: NDIS_PACKET_TYPE_NONE {
5043
+ tracing:: trace!(
5044
+ packet_filter = self . packet_filter,
5045
+ "rx packets dropped due to packet filter"
5046
+ ) ;
5047
+ // Pend the newly available RxIds until the packet filter is updated.
5048
+ // Under high load this will eventually lead to no available RxIds,
5049
+ // which will cause the backend to drop the packets instead of
5050
+ // processing them here.
5051
+ state. pending_rx_packets . extend ( & data. rx_ready [ ..n] ) ;
5052
+ state. stats . rx_dropped_filtered . add ( n as u64 ) ;
5053
+ return Ok ( false ) ;
5054
+ }
5055
+
4930
5056
let transaction_id = data. rx_ready [ 0 ] . 0 . into ( ) ;
4931
5057
let ready_ids = data. rx_ready [ ..n] . iter ( ) . map ( |& RxId ( id) | id) ;
4932
5058
@@ -4951,15 +5077,15 @@ impl<T: 'static + RingMem> NetChannel<T> {
4951
5077
}
4952
5078
Some ( _) => {
4953
5079
// Ring buffer is full. Drop the packets and free the rx
4954
- // buffers.
5080
+ // buffers. When the ring has limited space, the main loop will
5081
+ // stop polling for receive packets.
4955
5082
state. stats . rx_dropped_ring_full . add ( n as u64 ) ;
4956
5083
4957
5084
state. rx_bufs . free ( data. rx_ready [ 0 ] . 0 ) ;
4958
5085
epqueue. rx_avail ( & data. rx_ready [ ..n] ) ;
4959
5086
}
4960
5087
}
4961
5088
4962
- state. stats . rx_packets_per_wake . add_sample ( n as u64 ) ;
4963
5089
Ok ( true )
4964
5090
}
4965
5091
@@ -5062,10 +5188,7 @@ impl<T: 'static + RingMem> NetChannel<T> {
5062
5188
_ => ( ) ,
5063
5189
} ;
5064
5190
if queue_switch_operation {
5065
- // A restart will also try to switch the data path based on primary.guest_vf_state.
5066
- if self . restart . is_none ( ) {
5067
- self . restart = Some ( CoordinatorMessage :: UpdateGuestVfState )
5068
- } ;
5191
+ self . send_coordinator_update_vf ( ) ;
5069
5192
} else {
5070
5193
self . send_completion ( transaction_id, & [ ] ) ?;
5071
5194
}
0 commit comments