11use crate :: rules_manager;
22
33use chrono:: prelude:: * ;
4+ use crossbeam_utils:: thread as crossbeam_thread;
5+ use crossbeam:: queue:: SegQueue ;
46use git2:: { Oid , Repository , Delta } ;
5- use git2:: Error ;
67use parking_lot:: Mutex ;
7- use rayon :: prelude:: * ;
8+ use pyo3 :: prelude:: * ;
89use std:: collections:: HashMap ;
910use std:: path:: Path ;
1011use std:: sync:: Arc ;
12+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
13+ use std:: thread;
14+ use std:: time;
1115
1216fn scan_commit_oid (
17+ should_stop : & AtomicBool ,
1318 git_repo : & Repository ,
1419 oid : & Oid ,
1520 rules_manager : & rules_manager:: RulesManager ,
1621 output_matches : Arc < Mutex < Vec < HashMap < & str , String > > > > ,
17- ) -> Result < ( ) , Error > {
22+ ) -> Result < ( ) , git2 :: Error > {
1823 let commit = git_repo. find_commit ( * oid) ?;
1924
2025 let commit_parent_count = commit. parent_count ( ) ;
@@ -34,6 +39,10 @@ fn scan_commit_oid(
3439 } ;
3540
3641 for delta in commit_diff. deltas ( ) {
42+ if should_stop. load ( Ordering :: Relaxed ) {
43+ break ;
44+ }
45+
3746 match delta. status ( ) {
3847 Delta :: Added | Delta :: Modified => { } ,
3948 _ => continue ,
@@ -119,25 +128,22 @@ fn scan_commit_oid(
119128pub fn get_file_content (
120129 repository_path : & str ,
121130 file_oid : & str ,
122- ) -> Result < Vec < u8 > , Error > {
131+ ) -> Result < Vec < u8 > , git2 :: Error > {
123132 let git_repo = Repository :: open ( repository_path) ?;
124133 let oid = Oid :: from_str ( file_oid) ?;
125134 let blob = git_repo. find_blob ( oid) ?;
126135
127136 Ok ( blob. content ( ) . to_vec ( ) )
128137}
129138
130- pub fn scan_repository (
139+ fn get_oids (
131140 repository_path : & str ,
132141 branch_glob_pattern : & str ,
133142 from_timestamp : i64 ,
134- rules_manager : & rules_manager:: RulesManager ,
135- output_matches : Arc < Mutex < Vec < HashMap < & str , String > > > > ,
136- ) -> Result < ( ) , Error > {
143+ ) -> Result < Vec < Oid > , git2:: Error > {
137144 let git_repo = Repository :: open ( repository_path) ?;
138145
139146 let mut revwalk = git_repo. revwalk ( ) ?;
140-
141147 revwalk. push_head ( ) ?;
142148 revwalk. set_sorting ( git2:: Sort :: TIME ) ?;
143149 revwalk. push_glob ( branch_glob_pattern) ?;
@@ -153,22 +159,79 @@ pub fn scan_repository(
153159 }
154160 }
155161
156- let chunk_size = ( oids. len ( ) as f64 / ( num_cpus:: get ( ) * 5 ) as f64 ) . ceil ( ) as usize ;
157- if !oids. is_empty ( ) {
158- oids. par_chunks ( chunk_size) . for_each (
159- |oids| {
160- let git_repo = Repository :: open ( repository_path) . unwrap ( ) ;
161- for oid in oids {
162- scan_commit_oid (
163- & git_repo,
164- oid,
165- rules_manager,
166- output_matches. clone ( )
167- ) . unwrap_or ( ( ) ) ;
168- }
169- } ,
170- ) ;
162+ Ok ( oids)
163+ }
164+
165+ pub fn scan_repository (
166+ py : & Python ,
167+ repository_path : & str ,
168+ branch_glob_pattern : & str ,
169+ from_timestamp : i64 ,
170+ rules_manager : & rules_manager:: RulesManager ,
171+ output_matches : Arc < Mutex < Vec < HashMap < & str , String > > > > ,
172+ ) -> Result < ( ) , PyErr > {
173+ let oids_queue = Arc :: new ( SegQueue :: new ( ) ) ;
174+ match get_oids (
175+ repository_path,
176+ branch_glob_pattern,
177+ from_timestamp
178+ ) {
179+ Ok ( oids) => {
180+ for oid in oids {
181+ oids_queue. push ( oid) ;
182+ }
183+ } ,
184+ Err ( error) => {
185+ return Err ( pyo3:: exceptions:: PyRuntimeError :: new_err ( error. to_string ( ) ) )
186+ } ,
171187 }
188+ py. check_signals ( ) ?;
189+
190+ let mut py_signal_error: PyResult < ( ) > = Ok ( ( ) ) ;
191+
192+ crossbeam_thread:: scope (
193+ |scope| {
194+ let should_stop = Arc :: new ( AtomicBool :: new ( false ) ) ;
195+
196+ for _ in 0 ..num_cpus:: get ( ) {
197+ let output_matches = output_matches. clone ( ) ;
198+ let oids_queue = oids_queue. clone ( ) ;
199+ let should_stop = should_stop. clone ( ) ;
200+ scope. spawn (
201+ move |_| {
202+ if let Ok ( git_repo) = Repository :: open ( repository_path) {
203+ while !should_stop. load ( Ordering :: Relaxed ) {
204+ if let Some ( oid) = oids_queue. pop ( ) {
205+ scan_commit_oid (
206+ & should_stop,
207+ & git_repo,
208+ & oid,
209+ rules_manager,
210+ output_matches. clone ( ) ,
211+ ) . unwrap_or ( ( ) ) ;
212+ } else {
213+ break ;
214+ }
215+ }
216+ } ;
217+ }
218+ ) ;
219+ }
220+
221+ while !oids_queue. is_empty ( ) {
222+ py_signal_error = py. check_signals ( ) ;
223+ if py_signal_error. is_err ( ) {
224+ should_stop. store ( true , Ordering :: Relaxed ) ;
225+
226+ break ;
227+ }
228+
229+ thread:: sleep ( time:: Duration :: from_millis ( 100 ) ) ;
230+ }
231+ }
232+ ) . unwrap_or ( ( ) ) ;
233+
234+ py_signal_error?;
172235
173236 Ok ( ( ) )
174237}
0 commit comments