diff --git a/Cargo.lock b/Cargo.lock index 75b1941..4b6faf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,6 +88,12 @@ version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "assert_matches" version = "1.5.0" @@ -182,6 +188,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "branches" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f11502672c5570f77f6bdf573332483f8475bab6a7fda00f1fae8ddb5a6245c0" +dependencies = [ + "rustc_version", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -200,6 +215,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "cacheguard" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dbbe48daefc2575b7dfdc8227f4724582080ae48b0482191f6eb91e4cd2f405" + [[package]] name = "cassowary" version = "0.3.0" @@ -689,12 +710,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "875488b8711a968268c7cf5d139578713097ca4635a76044e8fe8eedf831d07e" -[[package]] -name = "futures-core" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" - [[package]] name = "generic-array" version = "0.14.7" @@ -891,7 +906,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574" dependencies = [ - "futures-core", "lock_api", ] @@ -1181,7 +1195,10 @@ name = "oryx-tui" version = "0.7.2" dependencies = [ "anyhow", + "arrayvec", "aya", + "branches", + "cacheguard", "chrono", "clap", "crossterm 0.29.0", @@ -1197,6 +1214,7 @@ dependencies = [ "oryx-common", "ratatui 0.29.0", "regex", + "rustc-hash", "serde", "serde_json", "strum 0.27.2", @@ -1544,6 +1562,21 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.44" @@ -1588,6 +1621,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.219" diff --git a/Cargo.toml b/Cargo.toml index 03e95c8..65f4584 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,9 @@ homepage = "https://github.com/pythops/oryx" network-types = { git = "https://github.com/vadorovsky/network-types", rev = "b78424c" } [profile.release] -lto = "fat" -strip = true +opt-level = 3 +debug = false +lto = 'fat' +panic = 'abort' codegen-units = 1 +rpath = false diff --git a/oryx-ebpf/Cargo.lock b/oryx-ebpf/Cargo.lock index dfb1458..1d0f475 100644 --- a/oryx-ebpf/Cargo.lock +++ b/oryx-ebpf/Cargo.lock @@ -47,6 +47,15 @@ dependencies = [ "syn", ] +[[package]] +name = "branches" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f11502672c5570f77f6bdf573332483f8475bab6a7fda00f1fae8ddb5a6245c0" +dependencies = [ + "rustc_version", +] + [[package]] name = "heck" version = "0.5.0" @@ -83,6 +92,7 @@ name = "oryx-ebpf" version = "0.7.2" dependencies = [ "aya-ebpf", + "branches", "network-types", "oryx-common", ] @@ -128,12 +138,27 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "strum" version = "0.27.2" diff --git a/oryx-ebpf/Cargo.toml b/oryx-ebpf/Cargo.toml index 5af2ae5..d016506 100644 --- a/oryx-ebpf/Cargo.toml +++ b/oryx-ebpf/Cargo.toml @@ -11,6 +11,7 @@ homepage = "https://github.com/pythops/oryx" aya-ebpf = "0.1.1" oryx-common = { path = "../oryx-common" } network-types = { git = "https://github.com/vadorovsky/network-types", rev = "b78424c" } +branches = { version = "0.3", default-features = false } [[bin]] name = "oryx" diff --git a/oryx-ebpf/src/main.rs b/oryx-ebpf/src/main.rs index 86390f2..55cf662 100644 --- a/oryx-ebpf/src/main.rs +++ b/oryx-ebpf/src/main.rs @@ -8,6 +8,7 @@ use aya_ebpf::{ maps::{Array, HashMap, RingBuf}, programs::TcContext, }; +use branches::unlikely; use core::mem; use network_types::{ arp::ArpHdr, @@ -194,7 +195,7 @@ fn process(ctx: TcContext) -> Result { u16::from_be_bytes(unsafe { (*tcp_header).dest }) }; - if block_ipv4(addr, port) { + if unlikely(block_ipv4(addr, port)) { return Ok(TC_ACT_SHOT); //block packet } @@ -227,7 +228,7 @@ fn process(ctx: TcContext) -> Result { u16::from_be_bytes(unsafe { (*udp_header).dst }) }; - if block_ipv4(addr, port) { + if unlikely(block_ipv4(addr, port)) { return Ok(TC_ACT_SHOT); //block packet } @@ -260,7 +261,7 @@ fn process(ctx: TcContext) -> Result { u16::from_be_bytes(unsafe { (*sctp_header).dst }) }; - if block_ipv4(addr, port) { + if unlikely(block_ipv4(addr, port)) { return Ok(TC_ACT_SHOT); //block packet } @@ -389,7 +390,7 @@ fn process(ctx: TcContext) -> Result { } } IpProto::Sctp => { - let sctp_header: *const SctpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; + let sctp_header: *const SctpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv6Hdr::LEN)?; let port = if is_ingress() { u16::from_be_bytes(unsafe { (*sctp_header).src }) @@ -401,7 +402,7 @@ fn process(ctx: TcContext) -> Result { return Ok(TC_ACT_SHOT); //block packet } - if filter_packet(Protocol::Network(NetworkProtocol::Ipv4)) + if filter_packet(Protocol::Network(NetworkProtocol::Ipv6)) || filter_packet(Protocol::Transport(TransportProtocol::SCTP)) || filter_direction() { diff --git a/oryx-tui/Cargo.toml b/oryx-tui/Cargo.toml index 96febfd..4a438b6 100644 --- a/oryx-tui/Cargo.toml +++ b/oryx-tui/Cargo.toml @@ -19,7 +19,7 @@ oryx-common = { path = "../oryx-common" } mio = { version = "1", features = ["os-poll", "os-ext"] } itertools = "0.14" dirs = "6" -kanal = "0.1" +kanal = { version = "0.1", default-features = false } mimalloc = "0.1" clap = { version = "4", features = ["derive", "cargo"] } network-types = { workspace = true } @@ -32,6 +32,10 @@ regex = "1" chrono = "0.4" strum = { version = "0.27", features = ["derive"] } anyhow = "1" +branches = "0.3" +cacheguard = "0.1" +rustc-hash = "2.1.1" +arrayvec = "0.7" [[bin]] name = "oryx" diff --git a/oryx-tui/src/app.rs b/oryx-tui/src/app.rs index f016e13..a338c34 100644 --- a/oryx-tui/src/app.rs +++ b/oryx-tui/src/app.rs @@ -1,29 +1,16 @@ -use chrono::Utc; use clap::ArgMatches; use itertools::Itertools; -use oryx_common::{ - RawData, RawFrame, - protocols::{LinkProtocol, NetworkProtocol, TransportProtocol}, -}; +use oryx_common::protocols::{LinkProtocol, NetworkProtocol, TransportProtocol}; use ratatui::{ Frame, layout::{Constraint, Direction, Layout}, }; -use std::{ - error, - str::FromStr, - sync::{Arc, RwLock}, - thread, - time::Duration, -}; +use std::{error, str::FromStr, thread, time::Duration}; use crate::{ - filter::Filter, - help::Help, - packet::{EthFrame, direction::TrafficDirection}, + filter::Filter, filter::IoChannels, help::Help, notification::Notification, + packet::direction::TrafficDirection, packet_store::PacketStore, section::Section, }; -use crate::{filter::IoChannels, notification::Notification}; -use crate::{packet::AppPacket, section::Section}; pub type AppResult = std::result::Result>; @@ -40,7 +27,6 @@ pub enum ActivePopup { #[derive(Debug)] pub struct DataEventHandler { - pub sender: kanal::Sender<[u8; RawFrame::LEN]>, pub handler: thread::JoinHandle<()>, } @@ -50,10 +36,9 @@ pub struct App { pub help: Help, pub filter: Filter, pub start_sniffing: bool, - pub app_packets: Arc>>, + pub app_packets: PacketStore, pub notifications: Vec, pub section: Section, - pub data_channel_sender: kanal::Sender<([u8; RawData::LEN], TrafficDirection)>, pub is_editing: bool, pub active_popup: Option, pub start_from_cli: bool, @@ -61,35 +46,10 @@ pub struct App { impl App { pub fn new(cli_args: &ArgMatches) -> Self { - let app_packets = Arc::new(RwLock::new(Vec::with_capacity( - AppPacket::LEN * 1024 * 1024, - ))); - - let (sender, receiver) = kanal::unbounded(); + let app_packets = PacketStore::new(); let firewall_channels = IoChannels::new(); - thread::spawn({ - let app_packets = app_packets.clone(); - move || loop { - if let Ok((raw_data, direction)) = receiver.recv() { - let data = RawData::from(raw_data); - let frame = EthFrame::from(data.frame); - let pid = data.pid; - - let mut app_packets = app_packets.write().unwrap(); - - let app_packet = AppPacket { - frame, - direction, - pid, - timestamp: Utc::now(), - }; - app_packets.push(app_packet); - } - } - }); - let (interface_name, transport_protocols, network_protocols, link_protocols, direction) = { if let Some(interface) = cli_args.get_one::("interface") { let transport_protocols = { @@ -193,7 +153,6 @@ impl App { app_packets: app_packets.clone(), notifications: Vec::new(), section: Section::new(app_packets.clone(), firewall_channels.clone()), - data_channel_sender: sender, is_editing: false, active_popup: None, start_from_cli: interface_name.is_some(), diff --git a/oryx-tui/src/bandwidth.rs b/oryx-tui/src/bandwidth.rs index 78380e1..5e80e34 100644 --- a/oryx-tui/src/bandwidth.rs +++ b/oryx-tui/src/bandwidth.rs @@ -95,12 +95,12 @@ impl Bandwidth { } loop { + buffer.clear(); thread::sleep(Duration::from_secs(1)); if fd.seek(std::io::SeekFrom::Start(0)).is_err() { drop(fd); fd = File::open("/proc/net/dev").unwrap(); } - let mut buffer = String::new(); fd.read_to_string(&mut buffer).unwrap(); let mut lines = buffer.lines(); diff --git a/oryx-tui/src/ebpf/egress.rs b/oryx-tui/src/ebpf/egress.rs index 009364c..b291888 100644 --- a/oryx-tui/src/ebpf/egress.rs +++ b/oryx-tui/src/ebpf/egress.rs @@ -12,6 +12,7 @@ use aya::{ programs::{SchedClassifier, TcAttachType, tc}, util::KernelVersion, }; +use branches::{likely, unlikely}; use log::error; use oryx_common::{MAX_RULES_PORT, RawData, protocols::Protocol}; @@ -19,7 +20,8 @@ use crate::{ event::Event, filter::FilterChannelSignal, notification::{Notification, NotificationLevel}, - packet::direction::TrafficDirection, + packet::{AppPacket, direction::TrafficDirection}, + packet_store::PacketStore, section::firewall::FirewallSignal, }; use mio::{Events, Interest, Poll, Token, unix::SourceFd}; @@ -40,7 +42,7 @@ fn is_pid_helper_available() -> bool { pub fn load_egress( iface: String, notification_sender: kanal::Sender, - data_sender: kanal::Sender<([u8; RawData::LEN], TrafficDirection)>, + packet_store: PacketStore, filter_channel_receiver: kanal::Receiver, firewall_egress_receiver: kanal::Receiver, terminate: Arc, @@ -220,26 +222,37 @@ pub fn load_egress( ) .unwrap(); + let mut packet_buffer = Vec::with_capacity(64 * 1024); loop { poll.poll(&mut events, Some(Duration::from_millis(100))) .unwrap(); - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } for event in &events { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } - if event.token() == Token(0) && event.is_readable() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if likely(event.token() == Token(0) && event.is_readable()) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } while let Some(item) = ring_buf.next() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } let data: [u8; RawData::LEN] = item.to_owned().try_into().unwrap(); - data_sender.send((data, TrafficDirection::Egress)).ok(); + let raw = RawData::from(data); + packet_buffer.push(AppPacket { + frame: raw.frame.into(), + direction: TrafficDirection::Egress, + pid: raw.pid, + timestamp: chrono::Utc::now(), + }) + } + if likely(!packet_buffer.is_empty()) { + packet_store.write_many(&packet_buffer); + packet_buffer.clear(); } } } diff --git a/oryx-tui/src/ebpf/ingress.rs b/oryx-tui/src/ebpf/ingress.rs index 5bc8d33..2857edb 100644 --- a/oryx-tui/src/ebpf/ingress.rs +++ b/oryx-tui/src/ebpf/ingress.rs @@ -11,6 +11,7 @@ use aya::{ maps::{Array, HashMap}, programs::{SchedClassifier, TcAttachType, tc}, }; +use branches::{likely, unlikely}; use log::error; use oryx_common::{MAX_RULES_PORT, RawData, protocols::Protocol}; @@ -18,7 +19,8 @@ use crate::{ event::Event, filter::FilterChannelSignal, notification::{Notification, NotificationLevel}, - packet::direction::TrafficDirection, + packet::{AppPacket, direction::TrafficDirection}, + packet_store::PacketStore, section::firewall::FirewallSignal, }; use mio::{Events, Interest, Poll, Token, unix::SourceFd}; @@ -31,7 +33,7 @@ use super::{ pub fn load_ingress( iface: String, notification_sender: kanal::Sender, - data_sender: kanal::Sender<([u8; RawData::LEN], TrafficDirection)>, + packet_store: PacketStore, filter_channel_receiver: kanal::Receiver, firewall_ingress_receiver: kanal::Receiver, terminate: Arc, @@ -202,26 +204,37 @@ pub fn load_ingress( ) .unwrap(); + let mut packet_buffer = Vec::with_capacity(64 * 1024); loop { poll.poll(&mut events, Some(Duration::from_millis(100))) .unwrap(); - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } for event in &events { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } - if event.token() == Token(0) && event.is_readable() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if likely(event.token() == Token(0) && event.is_readable()) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } while let Some(item) = ring_buf.next() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { + if unlikely(terminate.load(std::sync::atomic::Ordering::Relaxed)) { break; } let data: [u8; RawData::LEN] = item.to_owned().try_into().unwrap(); - data_sender.send((data, TrafficDirection::Ingress)).ok(); + let raw = RawData::from(data); + packet_buffer.push(AppPacket { + frame: raw.frame.into(), + direction: TrafficDirection::Ingress, + pid: raw.pid, + timestamp: chrono::Utc::now(), + }) + } + if likely(!packet_buffer.is_empty()) { + packet_store.write_many(&packet_buffer); + packet_buffer.clear(); } } } diff --git a/oryx-tui/src/export.rs b/oryx-tui/src/export.rs index 7de9573..be0f77b 100644 --- a/oryx-tui/src/export.rs +++ b/oryx-tui/src/export.rs @@ -8,14 +8,17 @@ use std::{ use chrono::Local; -use crate::packet::{ - AppPacket, NetworkPacket, - network::{IpPacket, ip::IpProto}, +use crate::{ + packet::{ + NetworkPacket, + network::{IpPacket, ip::IpProto}, + }, + packet_store::PacketStore, }; use anyhow::{Result, bail}; -pub fn export(packets: &[AppPacket]) -> Result<()> { +pub fn export(packets: &PacketStore) -> Result<()> { let local_date = Local::now().format("%Y-%m-%d_%H-%M"); let user = match std::env::var("SUDO_USER") { @@ -62,7 +65,7 @@ pub fn export(packets: &[AppPacket]) -> Result<()> { "{:39} {:11} {:39} {:11} {:8} {:10} {:10}\n", headers.0, headers.1, headers.2, headers.3, headers.4, headers.5, headers.6 )?; - for app_packet in packets { + packets.for_each(|app_packet| { let pid = if let Some(pid) = app_packet.pid { pid.to_string() } else { @@ -184,7 +187,8 @@ pub fn export(packets: &[AppPacket]) -> Result<()> { }, }, } - } + Ok(()) + })?; Ok(()) } diff --git a/oryx-tui/src/filter.rs b/oryx-tui/src/filter.rs index d88a640..d3362a1 100644 --- a/oryx-tui/src/filter.rs +++ b/oryx-tui/src/filter.rs @@ -8,12 +8,9 @@ use crossterm::event::{KeyCode, KeyEvent}; use direction::TrafficDirectionFilter; use link::LinkFilter; use network::NetworkFilter; -use oryx_common::{ - RawData, - protocols::{ - LinkProtocol, NB_LINK_PROTOCOL, NB_NETWORK_PROTOCOL, NB_TRANSPORT_PROTOCOL, - NetworkProtocol, Protocol, TransportProtocol, - }, +use oryx_common::protocols::{ + LinkProtocol, NB_LINK_PROTOCOL, NB_NETWORK_PROTOCOL, NB_TRANSPORT_PROTOCOL, NetworkProtocol, + Protocol, TransportProtocol, }; use ratatui::{ Frame, @@ -31,6 +28,7 @@ use crate::{ event::Event, interface::Interface, packet::direction::TrafficDirection, + packet_store::PacketStore, section::firewall::FirewallSignal, }; @@ -159,7 +157,7 @@ impl Filter { pub fn start( &mut self, notification_sender: kanal::Sender, - data_sender: kanal::Sender<([u8; RawData::LEN], TrafficDirection)>, + packet_store: PacketStore, ) -> AppResult<()> { let iface = self.interface.selected_interface.name.clone(); @@ -168,7 +166,7 @@ impl Filter { load_ingress( iface.clone(), notification_sender.clone(), - data_sender.clone(), + packet_store.clone(), self.filter_chans.ingress.receiver.clone(), self.firewall_chans.ingress.receiver.clone(), self.traffic_direction.terminate_ingress.clone(), @@ -177,7 +175,7 @@ impl Filter { load_egress( iface, notification_sender, - data_sender, + packet_store, self.filter_chans.egress.receiver.clone(), self.firewall_chans.egress.receiver.clone(), self.traffic_direction.terminate_egress.clone(), diff --git a/oryx-tui/src/filter/fuzzy.rs b/oryx-tui/src/filter/fuzzy.rs index 5ba689a..b900a0f 100644 --- a/oryx-tui/src/filter/fuzzy.rs +++ b/oryx-tui/src/filter/fuzzy.rs @@ -1,5 +1,5 @@ use std::{ - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, thread, time::Duration, }; @@ -11,7 +11,7 @@ use ratatui::{ }; use tui_input::Input; -use crate::{app::TICK_RATE, packet::AppPacket}; +use crate::{app::TICK_RATE, packet::AppPacket, packet_store::PacketStore}; #[derive(Debug, Clone, Default)] pub struct Fuzzy { @@ -24,7 +24,7 @@ pub struct Fuzzy { } impl Fuzzy { - pub fn new(packets: Arc>>) -> Arc> { + pub fn new(packets: PacketStore) -> Arc> { let fuzzy = Arc::new(Mutex::new(Self::default())); thread::spawn({ @@ -38,15 +38,12 @@ impl Fuzzy { let mut fuzzy = fuzzy.lock().unwrap(); if fuzzy.is_enabled() && !fuzzy.filter.value().is_empty() { - let packets = packets.read().unwrap(); let current_pattern = fuzzy.filter.value().to_owned(); if current_pattern != pattern { - fuzzy.find(packets.as_slice()); + last_index += fuzzy.find(&packets); pattern = current_pattern; - last_index = packets.len(); } else { - fuzzy.append(&packets.as_slice()[last_index..]); - last_index = packets.len(); + last_index += fuzzy.append(&packets, last_index); } } } @@ -94,30 +91,35 @@ impl Fuzzy { self.scroll_state.select(Some(i)); } - pub fn find(&mut self, packets: &[AppPacket]) { - self.packets = packets - .iter() - .copied() - .filter(|p| { - p.frame.payload.to_string().contains(self.filter.value()) - | p.pid + // returns number of processed items + pub fn find(&mut self, packets: &PacketStore) -> usize { + self.packets = Vec::new(); + packets + .for_each(|p| { + if p.frame.payload.to_string().contains(self.filter.value()) + || p.pid .is_some_and(|v| v.to_string().contains(self.filter.value())) + { + self.packets.push(*p); + } + Ok(()) }) - .collect::>(); + .unwrap() } - pub fn append(&mut self, packets: &[AppPacket]) { - self.packets.append( - &mut packets - .iter() - .copied() - .filter(|p| { - p.frame.payload.to_string().contains(self.filter.value()) - | p.pid - .is_some_and(|v| v.to_string().contains(self.filter.value())) - }) - .collect::>(), - ); + // returns number of processed items + pub fn append(&mut self, packets: &PacketStore, last_index: usize) -> usize { + packets + .for_each_range(last_index.., |p| { + if p.frame.payload.to_string().contains(self.filter.value()) + | p.pid + .is_some_and(|v| v.to_string().contains(self.filter.value())) + { + self.packets.push(*p); + } + Ok(()) + }) + .unwrap() } pub fn enable(&mut self) { diff --git a/oryx-tui/src/handler.rs b/oryx-tui/src/handler.rs index 9584e84..2110e27 100644 --- a/oryx-tui/src/handler.rs +++ b/oryx-tui/src/handler.rs @@ -23,7 +23,7 @@ pub fn handle_key_events( if app.filter.focused_block == FocusedBlock::Apply { app.section.stats = Some(Stats::new(app.app_packets.clone())); app.filter - .start(event_sender.clone(), app.data_channel_sender.clone())?; + .start(event_sender.clone(), app.app_packets.clone())?; sleep(Duration::from_millis(10)); app.start_sniffing = true; diff --git a/oryx-tui/src/lib.rs b/oryx-tui/src/lib.rs index d9e796e..d2b5af9 100644 --- a/oryx-tui/src/lib.rs +++ b/oryx-tui/src/lib.rs @@ -29,3 +29,5 @@ pub mod section; pub mod dns; pub mod cli; + +pub mod packet_store; diff --git a/oryx-tui/src/packet/link.rs b/oryx-tui/src/packet/link.rs index 27d6892..9c4d664 100644 --- a/oryx-tui/src/packet/link.rs +++ b/oryx-tui/src/packet/link.rs @@ -129,7 +129,7 @@ pub struct MacAddr(pub [u8; 6]); impl Display for MacAddr { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { //FIX: workaround for the moment - if self.0.iter().all(|&x| x == 0) { + if self.0.iter().all(|&x| x == 0x00) { write!(f, "ff:ff:ff:ff:ff:ff",) } else { write!( diff --git a/oryx-tui/src/packet_store.rs b/oryx-tui/src/packet_store.rs new file mode 100644 index 0000000..5cf9761 --- /dev/null +++ b/oryx-tui/src/packet_store.rs @@ -0,0 +1,354 @@ +use crate::packet::AppPacket; +use anyhow::Result; +use arrayvec::ArrayVec; +use branches::{likely, unlikely}; +use cacheguard::CacheGuard; +use std::cell::RefCell; +use std::ops::{Deref, RangeBounds}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +// The double edged sword, Too high increases copy time and contention, Too low increases number of allocations +const BUFFER_SIZE: usize = 32 * 1024; +// Stack buffer size max should be as large as possible without causing stack overflow in all operation systems +// for fallback when read range is too large we switch to thread local buffer with BUFFER_SIZE capacity +const STACK_BUFFER_SIZE_MAX: usize = 4 * 1024; +const STACK_BUFFER_SIZE: usize = if STACK_BUFFER_SIZE_MAX < BUFFER_SIZE { + STACK_BUFFER_SIZE_MAX +} else { + BUFFER_SIZE +}; + +#[derive(Debug)] +pub struct PacketStoreInner { + // It is here so user would know if archive that it read is changed while reading latest + latest_token: CacheGuard, + // It is here so when new entry is geting added, user can spin over this instead of locking the RwLock + archives_token: CacheGuard, + // Total number of packets stored + length: CacheGuard, + // Recent packets stored here + latest: CacheGuard>>, + // Old packets stored here in chunks of BUFFER_SIZE + archives: CacheGuard>>>>, +} + +#[derive(Debug)] +pub struct PacketStore { + inner: Arc, +} + +impl Default for PacketStore { + fn default() -> Self { + Self::new() + } +} + +impl Deref for PacketStore { + type Target = PacketStoreInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Clone for PacketStore { + fn clone(&self) -> Self { + PacketStore { + inner: Arc::clone(&self.inner), + } + } +} + +thread_local! { + // THREAD_LOCAL reused buffer to avoid heap allocations + static THREAD_BUFFER: RefCell> = RefCell::new(Vec::with_capacity(BUFFER_SIZE)); +} + +impl PacketStore { + pub fn new() -> Self { + PacketStore { + inner: Arc::new(PacketStoreInner { + latest_token: CacheGuard::new(AtomicUsize::new(0)), + latest: CacheGuard::new(RwLock::new(Vec::with_capacity(BUFFER_SIZE))), + archives: CacheGuard::new(RwLock::new(Vec::new())), + archives_token: CacheGuard::new(AtomicUsize::new(0)), + length: CacheGuard::new(AtomicUsize::new(0)), + }), + } + } + + #[inline] + pub fn len(&self) -> usize { + self.length.load(Ordering::Relaxed) + } + + #[inline] + pub fn is_empty(&self) -> bool { + unlikely(self.len() == 0) + } + + #[inline] + pub fn discard_archive(&self, index: usize) { + let mut archives = self.archives.write().unwrap(); + if likely(index < archives.len()) { + archives[index] = Arc::new(Vec::new()); + } + } + + #[inline] + pub fn write(&self, packet: &AppPacket) { + let mut latest = self.latest.write().unwrap(); + latest.push(*packet); + if unlikely(latest.len() >= BUFFER_SIZE) { + assert!(latest.len() == BUFFER_SIZE); + let full_buffer = std::mem::replace(&mut *latest, Vec::with_capacity(BUFFER_SIZE)); + self.latest_token.fetch_add(1, Ordering::SeqCst); + drop(latest); + let mut archive = self.archives.write().unwrap(); + archive.push(Arc::new(full_buffer)); + self.archives_token.fetch_add(1, Ordering::SeqCst); + } + self.length.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub fn archive_at(&self, index: usize) -> (Option>>, usize) { + let archive = self.archives.read().unwrap(); + (archive.get(index).cloned(), archive.len()) + } + + #[inline] + pub fn write_many(&self, packets: &[AppPacket]) { + let mut i = 0; + while likely(i < packets.len()) { + let mut latest = self.latest.write().unwrap(); + let remaining_capacity = BUFFER_SIZE - latest.len(); + let to_copy = remaining_capacity.min(packets.len() - i); + latest.extend_from_slice(&packets[i..i + to_copy]); + self.length.fetch_add(to_copy, Ordering::Relaxed); + i += to_copy; + if unlikely(latest.len() >= BUFFER_SIZE) { + assert!(latest.len() == BUFFER_SIZE); + let latest_cloned = latest.clone(); + latest.clear(); + self.latest_token.fetch_add(1, Ordering::Release); + drop(latest); + let mut archive = self.archives.write().unwrap(); + archive.push(Arc::new(latest_cloned)); + self.archives_token.fetch_add(1, Ordering::Release); + } + } + } + + #[inline] + pub fn get(&self, i: usize) -> Option { + loop { + let current_archive_length = self.archives_token.load(Ordering::SeqCst); + let latest_token = self.latest_token.load(Ordering::SeqCst); + + if i >= self.len() { + return None; + } + + // Check if in archives + if likely(i < current_archive_length * BUFFER_SIZE) { + let archive_index = i / BUFFER_SIZE; + let index_in_archive = i % BUFFER_SIZE; + + if let (Some(archive), _) = self.archive_at(archive_index) { + return Some(archive[index_in_archive]); + } + // Discarded archive or out of bounds + return None; + } + + // Check in latest + let latest = self.latest.read().unwrap(); + if unlikely(latest_token != self.latest_token.load(Ordering::SeqCst)) { + drop(latest); + while self.archives_token.load(Ordering::Relaxed) == current_archive_length { + std::thread::yield_now(); + } + continue; // Retry, archive was updated + } + + let index_in_latest = i % BUFFER_SIZE; + if index_in_latest < latest.len() { + return Some(latest[index_in_latest]); + } + + return None; + } + } + + #[inline] + pub fn write_range_into(&self, range: R, output: &mut Vec) + where + R: RangeBounds, + { + let start = match range.start_bound() { + std::ops::Bound::Included(b) => *b, + std::ops::Bound::Excluded(b) => *b + 1, + std::ops::Bound::Unbounded => 0, + }; + let mut i = start; + let end = match range.end_bound() { + std::ops::Bound::Included(b) => *b + 1, + std::ops::Bound::Excluded(b) => *b, + std::ops::Bound::Unbounded => usize::MAX, + }; + + // reserve + if end != usize::MAX { + output.reserve(end - start); + } + + loop { + let current_archive_length = self.archives_token.load(Ordering::SeqCst); + let latest_token = self.latest_token.load(Ordering::SeqCst); + // Process archives + while likely(i < current_archive_length * BUFFER_SIZE && i < end) { + let archive_index = i / BUFFER_SIZE; + let start_in_archive = i % BUFFER_SIZE; + let remaining = (end - i).min(BUFFER_SIZE - start_in_archive); + + if let (Some(archive), _) = self.archive_at(archive_index) { + let end_in_archive = (start_in_archive + remaining).min(archive.len()); + output.extend_from_slice(&archive[start_in_archive..end_in_archive]); + i += end_in_archive - start_in_archive; + } else { + // Discarded archive, skip it + i += remaining; + } + } + + if i >= end { + assert!(i == end); + return; + } + + let latest = self.latest.read().unwrap(); + if unlikely(latest_token != self.latest_token.load(Ordering::SeqCst)) { + drop(latest); + while self.archives_token.load(Ordering::Relaxed) == current_archive_length { + std::thread::yield_now(); + } + continue; // Retry, archive was updated + } + + if unlikely(latest.is_empty()) { + return; + } + + let start_in_latest = i % BUFFER_SIZE; + let end_in_latest = (start_in_latest + (end - i)).min(latest.len()); + + if start_in_latest >= latest.len() { + return; + } + + output.extend_from_slice(&latest[start_in_latest..end_in_latest]); + return; + } + } + + #[inline] + pub fn for_each(&self, f: F) -> Result + where + F: FnMut(&AppPacket) -> Result<()>, + { + self.for_each_range(0.., f) + } + + // returns number of processed packets + #[inline] + pub fn for_each_range(&self, range: R, mut f: F) -> Result + where + R: RangeBounds, + F: FnMut(&AppPacket) -> Result<()>, + { + let start = match range.start_bound() { + std::ops::Bound::Included(b) => *b, + std::ops::Bound::Excluded(b) => *b + 1, + std::ops::Bound::Unbounded => 0, + }; + let mut i = start; + let end = match range.end_bound() { + std::ops::Bound::Included(b) => *b + 1, + std::ops::Bound::Excluded(b) => *b, + std::ops::Bound::Unbounded => usize::MAX, + }; + + loop { + let current_archive_length = self.archives_token.load(Ordering::SeqCst); + let latest_token = self.latest_token.load(Ordering::SeqCst); + // Process archives + while likely(i < current_archive_length * BUFFER_SIZE && i < end) { + let archive_index = i / BUFFER_SIZE; + let start_in_archive = i % BUFFER_SIZE; + let remaining = (end - i).min(BUFFER_SIZE - start_in_archive); + + if let (Some(archive), _) = self.archive_at(archive_index) { + let end_in_archive = (start_in_archive + remaining).min(archive.len()); + for packet in &archive[start_in_archive..end_in_archive] { + f(packet)?; + } + i += end_in_archive - start_in_archive; + } else { + // Discarded archive, skip it + i += remaining; + } + } + + if i >= end { + assert!(i == end); + return Ok(i - start); + } + + let latest = self.latest.read().unwrap(); + if unlikely(latest_token != self.latest_token.load(Ordering::SeqCst)) { + drop(latest); + while self.archives_token.load(Ordering::Relaxed) == current_archive_length { + std::thread::yield_now(); + } + continue; // Retry, archive was updated + } + + if unlikely(latest.is_empty()) { + return Ok(i - start); + } + + let start_in_latest = i % BUFFER_SIZE; + let end_in_latest = (start_in_latest + (end - i)).min(latest.len()); + + if start_in_latest >= latest.len() { + return Ok(i - start); + } + + if end_in_latest - start_in_latest <= STACK_BUFFER_SIZE { + let mut buffer = ArrayVec::::new(); + buffer + .try_extend_from_slice(&latest[start_in_latest..end_in_latest]) + .unwrap(); + drop(latest); + for packet in buffer.iter() { + f(packet)?; + } + } else { + THREAD_BUFFER.with(move |buffer| { + let mut buffer = buffer.borrow_mut(); + buffer.extend_from_slice(&latest[start_in_latest..end_in_latest]); + drop(latest); + for packet in buffer.iter() { + f(packet)?; + } + buffer.clear(); + Ok::<(), anyhow::Error>(()) + })?; + } + + i += end_in_latest - start_in_latest; + return Ok(i - start); + } + } +} diff --git a/oryx-tui/src/section.rs b/oryx-tui/src/section.rs index ca215c7..54b0bc7 100644 --- a/oryx-tui/src/section.rs +++ b/oryx-tui/src/section.rs @@ -4,8 +4,6 @@ pub mod inspection; pub mod metrics; pub mod stats; -use std::sync::{Arc, RwLock}; - use alert::Alert; use crossterm::event::{KeyCode, KeyEvent}; use firewall::{Firewall, FirewallSignal}; @@ -25,7 +23,7 @@ use crate::{ app::{ActivePopup, AppResult}, event::Event, filter::IoChannels, - packet::AppPacket, + packet_store::PacketStore, }; #[derive(Debug, PartialEq)] @@ -48,10 +46,7 @@ pub struct Section { } impl Section { - pub fn new( - packets: Arc>>, - firewall_chans: IoChannels, - ) -> Self { + pub fn new(packets: PacketStore, firewall_chans: IoChannels) -> Self { Self { focused_section: FocusedSection::Inspection, inspection: Inspection::new(packets.clone()), diff --git a/oryx-tui/src/section/alert.rs b/oryx-tui/src/section/alert.rs index 7798a8b..e76ecd9 100644 --- a/oryx-tui/src/section/alert.rs +++ b/oryx-tui/src/section/alert.rs @@ -8,7 +8,6 @@ use ratatui::{ widgets::WidgetRef, }; use std::{ - collections::HashMap, net::IpAddr, sync::{Arc, RwLock}, thread, @@ -17,13 +16,16 @@ use std::{ use crate::{ packet::{ - AppPacket, NetworkPacket, + NetworkPacket, direction::TrafficDirection, network::{IpPacket, ip::IpProto}, }, + packet_store::PacketStore, section::alert::threat::synflood::SynFlood, }; +use rustc_hash::FxHashMap as HashMap; + use std::fmt::Debug; pub trait Threat: Send + Sync + Debug + WidgetRef {} @@ -37,45 +39,40 @@ pub struct Alert { } impl Alert { - pub fn new(packets: Arc>>) -> Self { - let threats: Arc>>> = Arc::new(RwLock::new(Vec::new())); - - thread::spawn({ - let threats = threats.clone(); - move || loop { - let start_index = { - let packets = packets.read().unwrap(); - let count = packets - .iter() - .filter(|packet| packet.direction == TrafficDirection::Ingress) - .count(); - - count.saturating_sub(1) - }; - + pub fn new(packets: PacketStore) -> Self { + let ret_threats: Arc>>> = Arc::new(RwLock::new(Vec::new())); + let threats = ret_threats.clone(); + + thread::spawn(move || { + let mut last_index = 0; + let mut counting_index = 0; + let mut count = 0usize; + loop { thread::sleep(Duration::from_secs(5)); - - let mut syn_flood_map: HashMap = HashMap::new(); - - let app_packets = { - let packets = packets.read().unwrap(); - packets.clone() - }; - - let app_packets: Vec = app_packets - .into_iter() - .filter(|packet| packet.direction == TrafficDirection::Ingress) - .collect(); - - if app_packets.len() < WIN_SIZE { + // Phase 1: waiting for enough samples + counting_index += packets + .for_each_range(last_index + counting_index.., |packet| { + if packet.direction == TrafficDirection::Ingress { + count += 1; + } + Ok(()) + }) + .unwrap(); + if count < WIN_SIZE { + threats.write().unwrap().clear(); continue; } + // clear counters + count = 0; + counting_index = 0; + // Phase 2: start statistics let mut nb_syn_packets = 0; + let mut syn_flood_map: HashMap = HashMap::default(); + syn_flood_map.reserve(128); - app_packets[start_index..app_packets.len().saturating_sub(1)] - .iter() - .for_each(|app_packet| { + last_index += packets + .for_each_range(last_index.., |app_packet| { if let NetworkPacket::Ip(ip_packet) = app_packet.frame.payload { match ip_packet { IpPacket::V4(ipv4_packet) => { @@ -108,8 +105,9 @@ impl Alert { } } } - }); - let threats = threats.clone(); + Ok(()) + }) + .unwrap(); threats.write().unwrap().clear(); // 90% of incoming packets @@ -121,7 +119,7 @@ impl Alert { }); Self { - threats, + threats: ret_threats, flash_count: 1, } } diff --git a/oryx-tui/src/section/alert/threat/synflood.rs b/oryx-tui/src/section/alert/threat/synflood.rs index 20e0a53..2df4c6f 100644 --- a/oryx-tui/src/section/alert/threat/synflood.rs +++ b/oryx-tui/src/section/alert/threat/synflood.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::IpAddr}; +use std::net::IpAddr; use ratatui::{ buffer::Buffer, @@ -8,6 +8,8 @@ use ratatui::{ widgets::{Block, Borders, Row, Table}, }; +use rustc_hash::FxHashMap as HashMap; + use crate::section::alert::Threat; #[derive(Debug)] diff --git a/oryx-tui/src/section/inspection.rs b/oryx-tui/src/section/inspection.rs index 30cd686..8929e92 100644 --- a/oryx-tui/src/section/inspection.rs +++ b/oryx-tui/src/section/inspection.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use crossterm::event::{KeyCode, KeyEvent}; use ratatui::{ @@ -23,21 +23,23 @@ use crate::{ eth_frame::EthFrameHeader, network::{IpPacket, ip::IpProto}, }, + packet_store::PacketStore, }; #[derive(Debug)] pub struct Inspection { - pub packets: Arc>>, + pub packets: PacketStore, pub state: TableState, pub fuzzy: Arc>, pub manual_scroll: bool, pub packet_end_index: usize, pub packet_window_size: usize, pub packet_index: Option, + pub packets_display_buffer: Vec, } impl Inspection { - pub fn new(packets: Arc>>) -> Self { + pub fn new(packets: PacketStore) -> Self { Self { packets: packets.clone(), state: TableState::default(), @@ -46,17 +48,16 @@ impl Inspection { packet_end_index: 0, packet_window_size: 0, packet_index: None, + packets_display_buffer: Vec::with_capacity(128), } } pub fn can_show_popup(&mut self) -> bool { - let packets = self.packets.read().unwrap(); let fuzzy = self.fuzzy.lock().unwrap(); - if fuzzy.is_enabled() { !fuzzy.packets.is_empty() } else { - !packets.is_empty() + !self.packets.is_empty() } } @@ -135,15 +136,14 @@ impl Inspection { } KeyCode::Char('s') => { - let app_packets = self.packets.read().unwrap(); - if app_packets.is_empty() { + if self.packets.is_empty() { Notification::send( "There is no packets".to_string(), NotificationLevel::Info, event_sender, )?; } else { - match export::export(&app_packets) { + match export::export(&self.packets) { Ok(_) => { Notification::send( "Packets exported to ~/oryx directory".to_string(), @@ -169,11 +169,10 @@ impl Inspection { } pub fn scroll_up(&mut self) { - let app_packets = self.packets.read().unwrap(); if !self.manual_scroll { self.manual_scroll = true; // Record the last position. Useful for selecting the packets to display - self.packet_end_index = app_packets.len(); + self.packet_end_index = self.packets.len(); } let i = match self.state.selected() { Some(i) => { @@ -194,19 +193,16 @@ impl Inspection { } pub fn scroll_down(&mut self) { - let app_packets = self.packets.read().unwrap(); - + let packets_len = self.packets.len(); if !self.manual_scroll { self.manual_scroll = true; - self.packet_end_index = app_packets.len(); + self.packet_end_index = packets_len; } let i = match self.state.selected() { Some(i) => { if i < self.packet_window_size - 1 { i + 1 - } else if i == self.packet_window_size - 1 - && app_packets.len() > self.packet_end_index - { + } else if i == self.packet_window_size - 1 && packets_len > self.packet_end_index { // shift the window by one self.packet_end_index += 1; i + 1 @@ -214,14 +210,13 @@ impl Inspection { i } } - None => app_packets.len(), + None => packets_len, }; self.state.select(Some(i)); } pub fn render(&mut self, frame: &mut Frame, block: Rect) { - let app_packets = self.packets.read().unwrap(); let mut fuzzy = self.fuzzy.lock().unwrap(); let fuzzy_packets = fuzzy.clone().packets.clone(); @@ -277,7 +272,10 @@ impl Inspection { fuzzy.packet_end_index = window_size; } - let packets_to_display = match self.manual_scroll { + let packets_len = self.packets.len(); + let pdb = &mut self.packets_display_buffer; + pdb.clear(); + match self.manual_scroll { true => { if fuzzy.is_enabled() & !fuzzy.filter.value().is_empty() { if fuzzy_packets.len() > window_size { @@ -286,54 +284,60 @@ impl Inspection { fuzzy.packet_end_index.saturating_sub(window_size) + selected_index, ); } - &fuzzy_packets[fuzzy.packet_end_index.saturating_sub(window_size) - ..fuzzy.packet_end_index] + pdb.extend_from_slice( + &fuzzy_packets[fuzzy.packet_end_index.saturating_sub(window_size) + ..fuzzy.packet_end_index], + ); } else { if let Some(selected_index) = fuzzy.scroll_state.selected() { self.packet_index = Some(selected_index); } else { self.packet_index = None; } - &fuzzy_packets + pdb.extend_from_slice(&fuzzy_packets) } - } else if app_packets.len() > window_size { + } else if packets_len > window_size { if let Some(selected_index) = self.state.selected() { self.packet_index = Some( self.packet_end_index.saturating_sub(window_size) + selected_index, ); } - &app_packets - [self.packet_end_index.saturating_sub(window_size)..self.packet_end_index] + self.packets.write_range_into( + self.packet_end_index.saturating_sub(window_size)..self.packet_end_index, + pdb, + ); } else { if let Some(selected_index) = self.state.selected() { self.packet_index = Some(selected_index); } - &app_packets + self.packets.write_range_into(0..packets_len, pdb); } } false => { if fuzzy.is_enabled() & !fuzzy.filter.value().is_empty() { if fuzzy_packets.len() > window_size { self.packet_index = Some(fuzzy_packets.len().saturating_sub(1)); - &fuzzy_packets[fuzzy_packets.len().saturating_sub(window_size)..] + pdb.extend_from_slice( + &fuzzy_packets[fuzzy_packets.len().saturating_sub(window_size)..], + ) } else { self.packet_index = Some(fuzzy_packets.len().saturating_sub(1)); - &fuzzy_packets + pdb.extend_from_slice(&fuzzy_packets); } - } else if app_packets.len() > window_size { - self.packet_index = Some(app_packets.len().saturating_sub(1)); - &app_packets[app_packets.len().saturating_sub(window_size)..] + } else if packets_len > window_size { + let start_index = packets_len.saturating_sub(window_size); + self.packet_index = Some(packets_len.saturating_sub(1)); + self.packets.write_range_into(start_index..packets_len, pdb); } else { - self.packet_index = Some(app_packets.len().saturating_sub(1)); - &app_packets + self.packet_index = Some(packets_len.saturating_sub(1)); + self.packets.write_range_into(0..packets_len, pdb); } } }; // Style the packets let packets: Vec = if fuzzy.is_enabled() & !fuzzy.filter.value().is_empty() { - packets_to_display - .iter() + pdb.iter() .map(|app_packet| { let pid = match app_packet.pid { Some(pid) => fuzzy::highlight(pattern, pid.to_string()).blue(), @@ -439,8 +443,7 @@ impl Inspection { }) .collect() } else { - packets_to_display - .iter() + pdb.iter() .map(|app_packet| { let pid = match app_packet.pid { Some(pid) => Span::from(pid.to_string()).into_centered_line().cyan(), @@ -594,9 +597,9 @@ impl Inspection { // Always select the last packet if !self.manual_scroll { if fuzzy.is_enabled() { - fuzzy.scroll_state.select(Some(packets_to_display.len())); + fuzzy.scroll_state.select(Some(pdb.len())); } else { - self.state.select(Some(packets_to_display.len())); + self.state.select(Some(pdb.len())); } } @@ -638,6 +641,7 @@ impl Inspection { .begin_symbol(Some("↑")) .end_symbol(Some("↓")); + let app_packets_len = self.packets.len(); let mut scrollbar_state = if fuzzy.is_enabled() && fuzzy_packets.len() > window_size { ScrollbarState::new(fuzzy_packets.len()).position({ if self.manual_scroll { @@ -650,8 +654,8 @@ impl Inspection { fuzzy.packets.len() } }) - } else if !fuzzy.is_enabled() && app_packets.len() > window_size { - ScrollbarState::new(app_packets.len()).position({ + } else if !fuzzy.is_enabled() && app_packets_len > window_size { + ScrollbarState::new(app_packets_len).position({ if self.manual_scroll { if self.packet_end_index == window_size { 0 @@ -659,7 +663,7 @@ impl Inspection { self.packet_end_index } } else { - app_packets.len() + app_packets_len } }) } else { @@ -733,12 +737,11 @@ impl Inspection { .split(layout[1])[1]; let fuzzy = self.fuzzy.lock().unwrap(); - let packets = self.packets.read().unwrap(); let app_packet = if fuzzy.is_enabled() { fuzzy.packets[self.packet_index.unwrap()] } else { - packets[self.packet_index.unwrap()] + self.packets.get(self.packet_index.unwrap()).unwrap() }; frame.render_widget(Clear, block); diff --git a/oryx-tui/src/section/metrics.rs b/oryx-tui/src/section/metrics.rs index b554a68..e3943ee 100644 --- a/oryx-tui/src/section/metrics.rs +++ b/oryx-tui/src/section/metrics.rs @@ -1,7 +1,7 @@ use std::{ cmp, ops::Range, - sync::{Arc, Mutex, RwLock, atomic::AtomicBool}, + sync::{Arc, Mutex, atomic::AtomicBool}, thread, time::Duration, }; @@ -24,10 +24,11 @@ use ratatui::{ use crate::{ app::AppResult, packet::{ - AppPacket, NetworkPacket, + NetworkPacket, direction::TrafficDirection, network::{IpPacket, ip::IpProto}, }, + packet_store::PacketStore, }; #[derive(Debug, Default)] @@ -39,7 +40,7 @@ struct ListState { #[derive(Debug)] pub struct Metrics { user_input: UserInput, - app_packets: Arc>>, + app_packets: PacketStore, metrics: Vec>>, terminate: Arc, state: ListState, @@ -96,7 +97,7 @@ pub struct PortCountMetric { } impl Metrics { - pub fn new(packets: Arc>>) -> Self { + pub fn new(packets: PacketStore) -> Self { Self { user_input: UserInput::default(), app_packets: packets, @@ -323,57 +324,55 @@ impl Metrics { let packets = self.app_packets.clone(); move || { let mut last_index = 0; - 'main: loop { + loop { thread::sleep(Duration::from_millis(100)); - let app_packets = { packets.read().unwrap().clone() }; - - if app_packets.is_empty() { + if packets.is_empty() { continue; } + let mut metric = port_count_metric.lock().unwrap(); - for app_packet in app_packets[last_index..].iter() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break 'main; - } - if app_packet.direction == TrafficDirection::Ingress - && let NetworkPacket::Ip(packet) = app_packet.frame.payload - { - match packet { - IpPacket::V4(ipv4_packet) => match ipv4_packet.proto { - IpProto::Tcp(tcp_packet) => { - if port_range.contains(&tcp_packet.dst_port) { - metric.tcp_count += 1; + + last_index += packets + .for_each_range(last_index.., |app_packet| { + if app_packet.direction == TrafficDirection::Ingress + && let NetworkPacket::Ip(packet) = app_packet.frame.payload + { + match packet { + IpPacket::V4(ipv4_packet) => match ipv4_packet.proto { + IpProto::Tcp(tcp_packet) => { + if port_range.contains(&tcp_packet.dst_port) { + metric.tcp_count += 1; + } } - } - IpProto::Udp(udp_packet) => { - if port_range.contains(&udp_packet.dst_port) { - metric.udp_count += 1; + IpProto::Udp(udp_packet) => { + if port_range.contains(&udp_packet.dst_port) { + metric.udp_count += 1; + } } - } - _ => {} - }, - IpPacket::V6(ipv6_packet) => match ipv6_packet.proto { - IpProto::Tcp(tcp_packet) => { - if port_range.contains(&tcp_packet.dst_port) { - metric.tcp_count += 1; + _ => {} + }, + IpPacket::V6(ipv6_packet) => match ipv6_packet.proto { + IpProto::Tcp(tcp_packet) => { + if port_range.contains(&tcp_packet.dst_port) { + metric.tcp_count += 1; + } } - } - IpProto::Udp(udp_packet) => { - if port_range.contains(&udp_packet.dst_port) { - metric.udp_count += 1; + IpProto::Udp(udp_packet) => { + if port_range.contains(&udp_packet.dst_port) { + metric.udp_count += 1; + } } - } - _ => {} - }, + _ => {} + }, + } } - } - } - - last_index = app_packets.len(); + Ok(()) + }) + .unwrap(); if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break 'main; + break; } } } diff --git a/oryx-tui/src/section/stats.rs b/oryx-tui/src/section/stats.rs index 54e5680..30e476d 100644 --- a/oryx-tui/src/section/stats.rs +++ b/oryx-tui/src/section/stats.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, net::IpAddr, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, thread, time::Duration, }; @@ -18,10 +18,11 @@ use crate::{ bandwidth::Bandwidth, dns::get_hostname, packet::{ - AppPacket, NetworkPacket, + NetworkPacket, direction::TrafficDirection, network::{IpPacket, ip::IpProto}, }, + packet_store::PacketStore, }; #[derive(Debug, Default)] @@ -41,113 +42,111 @@ pub struct Stats { } impl Stats { - pub fn new(packets: Arc>>) -> Self { + pub fn new(packets: PacketStore) -> Self { let packet_stats: Arc> = Arc::new(Mutex::new(PacketStats::default())); thread::spawn({ let packet_stats = packet_stats.clone(); move || { - let mut last_index = 0; + let mut last_index: usize = 0; loop { thread::sleep(Duration::from_millis(500)); - let app_packets = { packets.read().unwrap().clone() }; - - if app_packets.is_empty() { + if packets.is_empty() { continue; } - let mut packet_stats = packet_stats.lock().unwrap(); + last_index += packets + .for_each_range(last_index.., |app_packet| { + match app_packet.frame.payload { + NetworkPacket::Arp(_) => { + packet_stats.link.arp += 1; + } + NetworkPacket::Ip(packet) => match packet { + IpPacket::V4(ipv4_packet) => { + packet_stats.network.ipv4 += 1; - for app_packet in app_packets[last_index..].iter() { - match app_packet.frame.payload { - NetworkPacket::Arp(_) => { - packet_stats.link.arp += 1; - } - NetworkPacket::Ip(packet) => match packet { - IpPacket::V4(ipv4_packet) => { - packet_stats.network.ipv4 += 1; - - if app_packet.direction == TrafficDirection::Egress { - if let Some((_, counts)) = packet_stats - .addresses - .get_mut(&IpAddr::V4(ipv4_packet.dst_ip)) - { - *counts += 1; - } else if let Ok(host) = - get_hostname(&IpAddr::V4(ipv4_packet.dst_ip)) - { - packet_stats.addresses.insert( - IpAddr::V4(ipv4_packet.dst_ip), - (Some(host), 1), - ); - } else { - packet_stats + if app_packet.direction == TrafficDirection::Egress { + if let Some((_, counts)) = packet_stats .addresses - .insert(IpAddr::V4(ipv4_packet.dst_ip), (None, 1)); + .get_mut(&IpAddr::V4(ipv4_packet.dst_ip)) + { + *counts += 1; + } else if let Ok(host) = + get_hostname(&IpAddr::V4(ipv4_packet.dst_ip)) + { + packet_stats.addresses.insert( + IpAddr::V4(ipv4_packet.dst_ip), + (Some(host), 1), + ); + } else { + packet_stats.addresses.insert( + IpAddr::V4(ipv4_packet.dst_ip), + (None, 1), + ); + } } - } - match ipv4_packet.proto { - IpProto::Tcp(_) => { - packet_stats.transport.tcp += 1; - } - IpProto::Udp(_) => { - packet_stats.transport.udp += 1; - } - IpProto::Sctp(_) => { - packet_stats.transport.sctp += 1; - } - IpProto::Icmp(_) => { - packet_stats.network.icmpv4 += 1; + match ipv4_packet.proto { + IpProto::Tcp(_) => { + packet_stats.transport.tcp += 1; + } + IpProto::Udp(_) => { + packet_stats.transport.udp += 1; + } + IpProto::Sctp(_) => { + packet_stats.transport.sctp += 1; + } + IpProto::Icmp(_) => { + packet_stats.network.icmpv4 += 1; + } } } - } - IpPacket::V6(ipv6_packet) => { - packet_stats.network.ipv6 += 1; + IpPacket::V6(ipv6_packet) => { + packet_stats.network.ipv6 += 1; - if app_packet.direction == TrafficDirection::Egress { - if let Some((_, counts)) = packet_stats - .addresses - .get_mut(&IpAddr::V6(ipv6_packet.dst_ip)) - { - *counts += 1; - } else if let Ok(host) = - get_hostname(&IpAddr::V6(ipv6_packet.dst_ip)) - { - packet_stats.addresses.insert( - IpAddr::V6(ipv6_packet.dst_ip), - (Some(host), 1), - ); - } else { - packet_stats + if app_packet.direction == TrafficDirection::Egress { + if let Some((_, counts)) = packet_stats .addresses - .insert(IpAddr::V6(ipv6_packet.dst_ip), (None, 1)); + .get_mut(&IpAddr::V6(ipv6_packet.dst_ip)) + { + *counts += 1; + } else if let Ok(host) = + get_hostname(&IpAddr::V6(ipv6_packet.dst_ip)) + { + packet_stats.addresses.insert( + IpAddr::V6(ipv6_packet.dst_ip), + (Some(host), 1), + ); + } else { + packet_stats.addresses.insert( + IpAddr::V6(ipv6_packet.dst_ip), + (None, 1), + ); + } } - } - match ipv6_packet.proto { - IpProto::Tcp(_) => { - packet_stats.transport.tcp += 1; - } - IpProto::Udp(_) => { - packet_stats.transport.udp += 1; - } - IpProto::Sctp(_) => { - packet_stats.transport.sctp += 1; - } - IpProto::Icmp(_) => { - packet_stats.network.icmpv6 += 1; + match ipv6_packet.proto { + IpProto::Tcp(_) => { + packet_stats.transport.tcp += 1; + } + IpProto::Udp(_) => { + packet_stats.transport.udp += 1; + } + IpProto::Sctp(_) => { + packet_stats.transport.sctp += 1; + } + IpProto::Icmp(_) => { + packet_stats.network.icmpv6 += 1; + } } } - } - }, - } - - packet_stats.total += 1; - } - - last_index = app_packets.len() - 1; + }, + } + packet_stats.total += 1; + Ok(()) + }) + .unwrap(); } } });