Skip to content

Commit 0bad201

Browse files
authored
Split main.rs into its own module and add basic CI (#1)
* Apply cargo fmt * Apply clippy suggestions * Split main into some modules Moved most structures and enums into lib.rs. Added the worker module and moved functions related to it to be methods of the WorkerConfig structure, still needs some more work though. * Add basic CI * Fix syscall spawn loop and use references in worker methods
1 parent 022db9d commit 0bad201

File tree

4 files changed

+279
-203
lines changed

4 files changed

+279
-203
lines changed

.github/workflows/main.yml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
name: Main berserker CI
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
pull_request:
8+
9+
concurrency:
10+
group: ${{ github.head_ref || github.run_id }}
11+
cancel-in-progress: true
12+
13+
jobs:
14+
lint:
15+
runs-on: ubuntu-latest
16+
steps:
17+
- uses: actions/checkout@v3
18+
19+
- name: Check code formatting
20+
run: cargo fmt --check
21+
22+
- name: Run clippy
23+
run: cargo clippy
24+
25+
build:
26+
runs-on: ubuntu-latest
27+
steps:
28+
- uses: actions/checkout@v3
29+
- run: make

src/lib.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
pub mod worker;
2+
3+
#[derive(Debug, Copy, Clone)]
4+
pub enum Distribution {
5+
Zipfian,
6+
Uniform,
7+
}
8+
9+
#[derive(Debug, Copy, Clone)]
10+
pub enum Workload {
11+
Endpoints,
12+
Processes,
13+
Syscalls,
14+
}
15+
16+
#[derive(Debug, Copy, Clone)]
17+
pub struct WorkloadConfig {
18+
pub restart_interval: u64,
19+
pub endpoints_dist: Distribution,
20+
pub workload: Workload,
21+
pub zipf_exponent: f64,
22+
pub n_ports: u64,
23+
pub uniform_lower: u64,
24+
pub uniform_upper: u64,
25+
pub arrival_rate: f64,
26+
pub departure_rate: f64,
27+
pub random_process: bool,
28+
}

src/main.rs

Lines changed: 54 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -2,177 +2,26 @@
22
extern crate log;
33
extern crate core_affinity;
44

5+
use std::collections::HashMap;
6+
7+
use berserker::WorkloadConfig;
8+
use config::Config;
59
use fork::{fork, Fork};
6-
use std::{thread, time};
7-
use std::process::Command;
8-
use std::net::{TcpListener};
9-
use core_affinity::CoreId;
1010
use itertools::iproduct;
1111
use nix::sys::wait::waitpid;
1212
use nix::unistd::Pid;
13-
use config::Config;
14-
use std::collections::HashMap;
15-
use syscalls::{Sysno, syscall};
16-
1713
use rand::prelude::*;
18-
use rand::{distributions::Alphanumeric, Rng};
19-
use rand_distr::Zipf;
2014
use rand_distr::Uniform;
21-
use rand_distr::Exp;
22-
23-
#[derive(Debug, Copy, Clone)]
24-
enum Distribution {
25-
Zipfian,
26-
Uniform,
27-
}
28-
29-
#[derive(Debug, Copy, Clone)]
30-
enum Workload {
31-
Endpoints,
32-
Processes,
33-
Syscalls,
34-
}
35-
36-
#[derive(Debug, Copy, Clone)]
37-
struct WorkloadConfig {
38-
restart_interval: u64,
39-
endpoints_dist: Distribution,
40-
workload: Workload,
41-
zipf_exponent: f64,
42-
n_ports: u64,
43-
uniform_lower: u64,
44-
uniform_upper: u64,
45-
arrival_rate: f64,
46-
departure_rate: f64,
47-
random_process: bool,
48-
}
49-
50-
#[derive(Debug, Copy, Clone)]
51-
struct WorkerConfig {
52-
workload: WorkloadConfig,
53-
cpu: CoreId,
54-
process: usize,
55-
lower: usize,
56-
upper: usize,
57-
}
58-
59-
fn listen(port: usize, sleep: u64) -> std::io::Result<()> {
60-
let addr = format!("127.0.0.1:{}", port);
61-
let listener = TcpListener::bind(addr)?;
62-
63-
let _res = listener.incoming();
64-
65-
thread::sleep(time::Duration::from_secs(sleep));
66-
Ok(())
67-
}
68-
69-
fn spawn_process(config: WorkerConfig, lifetime: u64) -> std::io::Result<()> {
70-
if config.workload.random_process {
71-
let uniq_arg: String = rand::thread_rng()
72-
.sample_iter(&Alphanumeric)
73-
.take(7)
74-
.map(char::from)
75-
.collect();
76-
let _res = Command::new("stub").arg(uniq_arg).output().unwrap();
77-
//info!("Command output: {}", String::from_utf8(res.stdout).unwrap());
78-
Ok(())
79-
} else {
80-
match fork() {
81-
Ok(Fork::Parent(child)) => {
82-
info!("Parent: child {}", child);
83-
waitpid(Pid::from_raw(child), None);
84-
Ok(())
85-
},
86-
Ok(Fork::Child) => {
87-
info!("{}-{}: Child start, {}", config.cpu.id, config.process, lifetime);
88-
thread::sleep(time::Duration::from_millis(lifetime));
89-
info!("{}-{}: Child stop", config.cpu.id, config.process);
90-
Ok(())
91-
},
92-
Err(_) => {
93-
warn!("Failed");
94-
Ok(())
95-
},
96-
}
97-
}
98-
}
99-
100-
// Spawn processes with a specified rate
101-
fn process_payload(config: WorkerConfig) -> std::io::Result<()> {
102-
info!("Process {} from {}: {}-{}",
103-
config.process, config.cpu.id, config.lower, config.upper);
104-
105-
loop {
106-
let lifetime: f64 = thread_rng().sample(Exp::new(config.workload.departure_rate).unwrap());
107-
108-
thread::spawn(move || {
109-
spawn_process(config, (lifetime * 1000.0).round() as u64)
110-
});
111-
112-
let interval: f64 = thread_rng().sample(Exp::new(config.workload.arrival_rate).unwrap());
113-
info!("{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}",
114-
config.cpu.id, config.process,
115-
interval, (interval * 1000.0).round() as u64,
116-
lifetime, (lifetime * 1000.0).round() as u64);
117-
thread::sleep(time::Duration::from_millis((interval * 1000.0).round() as u64));
118-
info!("{}-{}: Continue", config.cpu.id, config.process);
119-
}
120-
}
121-
122-
fn listen_payload(config: WorkerConfig) -> std::io::Result<()> {
123-
info!("Process {} from {}: {}-{}",
124-
config.process, config.cpu.id, config.lower, config.upper);
125-
126-
let listeners: Vec<_> = (config.lower..config.upper).map(|port| {
127-
thread::spawn(move || {
128-
listen(port, config.workload.restart_interval)
129-
})
130-
}).collect();
131-
132-
for listener in listeners {
133-
let _res = listener.join().unwrap();
134-
}
135-
136-
Ok(())
137-
}
138-
139-
fn do_syscall(config: WorkerConfig) -> std::io::Result<()> {
140-
match unsafe { syscall!(Sysno::getpid) } {
141-
Ok(_) => {
142-
Ok(())
143-
}
144-
Err(err) => {
145-
warn!("Syscall failed: {}", err);
146-
Ok(())
147-
}
148-
}
149-
}
150-
151-
fn syscalls_payload(config: WorkerConfig) -> std::io::Result<()> {
152-
info!("Process {} from {}: {}-{}",
153-
config.process, config.cpu.id, config.lower, config.upper);
154-
155-
loop {
156-
thread::spawn(move || {
157-
do_syscall(config)
158-
});
15+
use rand_distr::Zipf;
15916

160-
let interval: f64 = thread_rng().sample(Exp::new(config.workload.arrival_rate).unwrap());
161-
info!("{}-{}: Interval {}, rounded {}",
162-
config.cpu.id, config.process,
163-
interval, (interval * 1000.0).round() as u64);
164-
thread::sleep(time::Duration::from_millis((interval * 1000.0).round() as u64));
165-
info!("{}-{}: Continue", config.cpu.id, config.process);
166-
}
167-
}
17+
use berserker::{worker::WorkerConfig, Distribution, Workload};
16818

16919
fn main() {
17020
// Retrieve the IDs of all active CPU cores.
17121
let core_ids = core_affinity::get_core_ids().unwrap();
17222
let settings = Config::builder()
17323
// Add in `./Settings.toml`
174-
.add_source(config::File::with_name("/etc/berserker/workload.toml")
175-
.required(false))
24+
.add_source(config::File::with_name("/etc/berserker/workload.toml").required(false))
17625
.add_source(config::File::with_name("workload.toml").required(false))
17726
// Add in settings from the environment (with a prefix of APP)
17827
// Eg.. `WORKLOAD_DEBUG=1 ./target/app` would set the `debug` key
@@ -191,19 +40,19 @@ fn main() {
19140
"endpoints" => Workload::Endpoints,
19241
"processes" => Workload::Processes,
19342
"syscalls" => Workload::Syscalls,
194-
_ => Workload::Endpoints,
43+
_ => Workload::Endpoints,
19544
};
19645

19746
let endpoints_dist = match settings["endpoints_distribution"].as_str() {
198-
"zipf" => Distribution::Zipfian,
199-
"uniform" => Distribution::Uniform,
200-
_ => Distribution::Zipfian,
47+
"zipf" => Distribution::Zipfian,
48+
"uniform" => Distribution::Uniform,
49+
_ => Distribution::Zipfian,
20150
};
20251

203-
let config: WorkloadConfig = WorkloadConfig{
52+
let config = WorkloadConfig {
20453
restart_interval: settings["restart_interval"].parse::<u64>().unwrap(),
205-
endpoints_dist: endpoints_dist,
206-
workload: workload,
54+
endpoints_dist,
55+
workload,
20756
zipf_exponent: settings["zipf_exponent"].parse::<f64>().unwrap(),
20857
n_ports: settings["n_ports"].parse::<u64>().unwrap(),
20958
arrival_rate: settings["arrival_rate"].parse::<f64>().unwrap(),
@@ -216,53 +65,55 @@ fn main() {
21665
// Create processes for each active CPU core.
21766
let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..9)
21867
.map(|(cpu, process)| {
68+
match config.endpoints_dist {
69+
Distribution::Zipfian => {
70+
let n_ports: f64 = thread_rng()
71+
.sample(Zipf::new(config.n_ports, config.zipf_exponent).unwrap());
21972

220-
match config.endpoints_dist {
221-
Distribution::Zipfian => {
222-
let n_ports: f64 = thread_rng().sample(Zipf::new(config.n_ports, config.zipf_exponent).unwrap());
223-
224-
lower = upper;
225-
upper += n_ports as usize;
226-
},
227-
Distribution::Uniform => {
228-
let n_ports = thread_rng().sample(Uniform::new(config.uniform_lower, config.uniform_upper));
73+
lower = upper;
74+
upper += n_ports as usize;
75+
}
76+
Distribution::Uniform => {
77+
let n_ports = thread_rng()
78+
.sample(Uniform::new(config.uniform_lower, config.uniform_upper));
22979

230-
lower = upper;
231-
upper += n_ports as usize;
80+
lower = upper;
81+
upper += n_ports as usize;
82+
}
23283
}
233-
}
234-
235-
match fork() {
236-
Ok(Fork::Parent(child)) => {info!("Child {}", child); Some(child)},
237-
Ok(Fork::Child) => {
238-
if core_affinity::set_for_current(cpu) {
239-
let worker_config: WorkerConfig = WorkerConfig{
240-
workload: config,
241-
cpu: cpu,
242-
process: process,
243-
lower: lower,
244-
upper: upper,
245-
};
24684

247-
loop {
248-
let _res = match config.workload {
249-
Workload::Endpoints => listen_payload(worker_config),
250-
Workload::Processes => process_payload(worker_config),
251-
Workload::Syscalls => syscalls_payload(worker_config),
252-
};
253-
}
85+
match fork() {
86+
Ok(Fork::Parent(child)) => {
87+
info!("Child {}", child);
88+
Some(child)
25489
}
90+
Ok(Fork::Child) => {
91+
if core_affinity::set_for_current(cpu) {
92+
let worker_config = WorkerConfig::new(config, cpu, process, lower, upper);
93+
94+
loop {
95+
let _res = match config.workload {
96+
Workload::Endpoints => worker_config.listen_payload(),
97+
Workload::Processes => worker_config.process_payload(),
98+
Workload::Syscalls => worker_config.syscalls_payload(),
99+
};
100+
}
101+
}
255102

256-
None
257-
},
258-
Err(_) => {warn!("Failed"); None},
259-
}
260-
}).collect();
103+
None
104+
}
105+
Err(_) => {
106+
warn!("Failed");
107+
None
108+
}
109+
}
110+
})
111+
.collect();
261112

262113
info!("In total: {}", upper);
263114

264-
for handle in handles.into_iter().filter_map(|pid| pid) {
115+
for handle in handles.into_iter().flatten() {
265116
info!("waitpid: {}", handle);
266-
waitpid(Pid::from_raw(handle), None);
117+
waitpid(Pid::from_raw(handle), None).unwrap();
267118
}
268119
}

0 commit comments

Comments
 (0)