Skip to content

Commit bc780a0

Browse files
committed
small fixes
1 parent eea56ae commit bc780a0

File tree

7 files changed

+97
-55
lines changed

7 files changed

+97
-55
lines changed

GraphCreation/MyCode/pipeline/read_and_emit.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,29 @@ def read_and_emit(self):
2222
state = {}
2323
current_t = None
2424

25-
# Read in very small chunks using pyarrow iter_batches
2625
for batch in pq_file.iter_batches(batch_size=100):
27-
batch_df = batch.to_pandas()
28-
for _, row in batch_df.iterrows():
26+
# Convert to Python objects column-wise without Pandas
27+
timestamps = batch.column("timestamp")
28+
nodes = batch.column("node")
29+
30+
# Convert once for the entire batch to Python scalars
31+
# Avoid per-row overhead
32+
ts_values = timestamps.to_pylist()
33+
node_values = nodes.to_pylist()
34+
35+
# Precompute the row dicts once
36+
# This avoids deepcopies of the same Arrow Row multiple times
37+
all_rows = batch.to_pylist()
38+
39+
for i, ts in enumerate(ts_values):
2940
if current_t is None:
30-
current_t = row["timestamp"]
31-
elif current_t != row["timestamp"]:
41+
current_t = ts
42+
elif ts != current_t:
3243
self.buffer.put(deepcopy(state))
33-
state = {}
34-
current_t = row["timestamp"]
35-
state[row["node"]] = deepcopy(row.to_dict())
44+
state.clear()
45+
current_t = ts
3646

47+
state[node_values[i]] = all_rows[i]
3748

3849
self.buffer.put(state)
3950
self.buffer.put(None)

GraphCreation/MyCode/run_pipeline.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
import time
55
from queue import Queue
6+
import cProfile
67

78
sys.path.append(os.path.join(os.path.dirname(__file__), 'pipeline'))
89

@@ -48,4 +49,4 @@ def main():
4849

4950

5051
if __name__ == '__main__':
51-
main()
52+
main()

SignalProcessing/common/logger.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33

44
class Logger:
5-
def __init__(self, name=__name__, log_file='pipeline.log', log_dir='logs'):
5+
def __init__(self, name=__name__, log_file='pipeline.log', log_dir='logs', rack='None'):
66
self.logger_fake = logging.getLogger("idk")
77

88
self.logger = logging.getLogger(name)
@@ -22,7 +22,7 @@ def __init__(self, name=__name__, log_file='pipeline.log', log_dir='logs'):
2222
self.log_dir = log_dir
2323

2424
current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
25-
log_filename = f"{os.path.splitext(os.path.basename(name if name != '__main__' else 'main'))[0]}.log"
25+
log_filename = f"{os.path.splitext(os.path.basename(name if name != '__main__' else 'main'))[0]}_{rack}.log"
2626
log_path = os.path.join(os.getcwd(), self.log_dir, current_time)
2727
os.makedirs(log_path, exist_ok=True)
2828
log_file = os.path.join(log_path, log_filename)

SignalProcessing/pipeline/file_reading/node_manager.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def iterate_batches(self, limit_rows=None, stop_event=None, final_log_frequency=
249249
break
250250
to_remove = []
251251

252-
logger.debug("reading data from nodes")
252+
# logger.debug("reading data from nodes")
253253

254254
for node_id in active_nodes:
255255
if stop_event and stop_event.is_set():
@@ -296,13 +296,6 @@ def iterate_batches(self, limit_rows=None, stop_event=None, final_log_frequency=
296296
break
297297

298298
self.buffer.put(None)
299-
# After processing, log expected vs actual rows per node
300-
logger.info("NodeManager: Checking processed row counts per node...")
301-
for node_id, man in self.node_managers.items():
302-
if self.expected_rows[node_id] != self.processed_rows[node_id]:
303-
logger.warning(f"Node {node_id}: Expected {self.expected_rows[node_id]} rows, processed {self.processed_rows[node_id]} rows!")
304-
else:
305-
logger.info(f"Node {node_id}: Processed all {self.processed_rows[node_id]} rows as expected.")
306299

307300
def main():
308301
import argparse

SignalProcessing/pipeline/persist.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class StatePersister:
2424
def __init__(self, input_queue, output_file='latest_state.json', batch_write_size=25):
2525
self.input_queue = input_queue
2626
self.output_file = output_file
27-
self.batch_write_size = 25 # Reduced from 100 for more aggressive memory management
27+
self.batch_write_size = batch_write_size
2828
self.state_buffer = [] # Buffer for batch writing
2929

3030
self.writer = None
@@ -130,9 +130,9 @@ def run(self, timeout=0):
130130
batch_count = 0
131131
while True:
132132
if self.input_queue.empty():
133-
logger.info("waiting, queue empty")
133+
# logger.info("waiting, queue empty")
134134
state_data = self.input_queue.get()
135-
logger.info("continuing")
135+
# logger.info("continuing")
136136
else:
137137
state_data = self.input_queue.get()
138138

SignalProcessing/pipeline/state_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from common.memory_utils import log_memory_usage, force_memory_cleanup
1111
from common.logger import Logger
12-
logger = Logger(name=__name__.split('.')[-1], log_dir='logs').get_logger_real()
12+
logger = Logger(name=__name__.split('.')[-1], log_dir='logs').get_logger()
1313

1414
class StateBuilder:
1515
def __init__(self, input_queue, output_queue, batch_size=5, max_queue_size=50):

SignalProcessing/run_pipeline.py

Lines changed: 69 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import threading
22
import queue
3+
import multiprocessing
34
from pipeline.file_reading.node_manager import NodeManager
45
from pipeline.changes.change_detector import ChangeLevelDetector
56
from pipeline.state_builder import StateBuilder
@@ -10,15 +11,41 @@
1011
import sys
1112

1213
from common.logger import Logger
13-
logger = Logger(name=__name__.split('.')[-1], log_dir='logs').get_logger_real()
14+
logger = Logger(name=__name__.split('.')[-1], log_dir='logs', rack=sys.argv[1]).get_logger_real()
15+
16+
def node_manager_process(buffer_queue, stop_event, limit_nodes, limit_racks, temp_dir, rows_in_mem):
17+
"""NodeManager process function that can be pickled"""
18+
node_manager = NodeManager(
19+
buffer=buffer_queue,
20+
limit_nodes=limit_nodes,
21+
temp_dir=temp_dir,
22+
rows_in_mem=rows_in_mem,
23+
limit_racks=limit_racks
24+
)
25+
node_manager.iterate_batches(stop_event=stop_event, final_log_frequency=500)
26+
27+
def change_detector_process(buffer_queue, change_queue, delta, clock):
28+
"""ChangeDetector process function that can be pickled"""
29+
change_detector = ChangeLevelDetector(buffer_queue, change_queue, delta=delta, clock=clock)
30+
change_detector.run()
31+
32+
def state_builder_process(change_queue, state_queue):
33+
"""StateBuilder process function that can be pickled"""
34+
state_builder = StateBuilder(change_queue, state_queue)
35+
state_builder.run()
36+
37+
def state_persister_process(state_queue, output_file):
38+
"""StatePersister process function that can be pickled"""
39+
state_persister = StatePersister(state_queue, output_file=output_file)
40+
state_persister.run()
1441

1542
def run():
1643
limit_nodes = None
1744
limit_racks = int(sys.argv[1])
1845
delta=0.5
1946
clock=3
20-
bq_max_size=300
21-
rows_in_mem=300
47+
rows_in_mem=1000
48+
bq_max_size=2*rows_in_mem
2249
temp_dir_loc="E:/temp_parquet_files"
2350

2451
vars_to_log = ['limit_nodes', 'limit_racks', 'delta', 'clock', 'bq_max_size', 'rows_in_mem']
@@ -30,48 +57,58 @@ def run():
3057
# Initialize memory monitor
3158
memory_monitor = MemoryMonitor(log_interval=50)
3259

33-
# Set up queues for each stage with size limits for backpressure
34-
# Create queues with smaller sizes for more aggressive memory management
35-
buffer_queue = queue.Queue(maxsize=bq_max_size) # NodeManager → ChangeLevelDetector (reduced from 200)
36-
change_queue = queue.Queue(maxsize=500) # ChangeLevelDetector → StateBuilder (reduced from 100)
37-
state_queue = queue.Queue(maxsize=500) # StateBuilder → StatePersister (reduced from 500)
60+
# Set up queues for each stage with size limits for backpressure
61+
# Use multiprocessing.Queue for inter-process communication
62+
buffer_queue = multiprocessing.Queue(maxsize=bq_max_size) # NodeManager → ChangeLevelDetector
63+
change_queue = multiprocessing.Queue(maxsize=bq_max_size) # ChangeLevelDetector → StateBuilder
64+
state_queue = multiprocessing.Queue(maxsize=bq_max_size) # StateBuilder → StatePersister
3865

39-
output_file = f'./outputs/threaded_pipeline_state_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.parquet'
66+
output_file = f'./outputs/threaded_pipeline_state_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_rack{limit_racks}.parquet'
4067

4168
# Remove output file if it exists
4269
if os.path.exists(output_file):
4370
os.remove(output_file)
4471

45-
# Create the stop event
46-
stop_event = threading.Event()
47-
48-
# Set up pipeline stages
49-
node_manager = NodeManager(buffer=buffer_queue, limit_nodes=limit_nodes, temp_dir=temp_dir_loc, rows_in_mem=rows_in_mem, limit_racks=limit_racks)
50-
change_detector = ChangeLevelDetector(buffer_queue, change_queue, delta=delta, clock=clock)
51-
state_builder = StateBuilder(change_queue, state_queue)
52-
state_persister = StatePersister(state_queue, output_file=output_file)
72+
# Create the stop event (multiprocessing.Event)
73+
stop_event = multiprocessing.Event()
5374

54-
# Create threads
55-
threads = [
56-
threading.Thread(target=lambda: node_manager.iterate_batches(stop_event=stop_event, final_log_frequency=5000), name="NodeManagerThread"),
57-
threading.Thread(target=change_detector.run, name="ChangeLevelDetectorThread"),
58-
threading.Thread(target=state_builder.run, name="StateBuilderThread"),
59-
threading.Thread(target=state_persister.run, name="StatePersisterThread"),
75+
# Create processes with function-based targets that can be pickled
76+
processes = [
77+
multiprocessing.Process(
78+
target=node_manager_process,
79+
args=(buffer_queue, stop_event, limit_nodes, limit_racks, temp_dir_loc, rows_in_mem),
80+
name="NodeManagerProcess"
81+
),
82+
multiprocessing.Process(
83+
target=change_detector_process,
84+
args=(buffer_queue, change_queue, delta, clock),
85+
name="ChangeLevelDetectorProcess"
86+
),
87+
multiprocessing.Process(
88+
target=state_builder_process,
89+
args=(change_queue, state_queue),
90+
name="StateBuilderProcess"
91+
),
92+
multiprocessing.Process(
93+
target=state_persister_process,
94+
args=(state_queue, output_file),
95+
name="StatePersisterProcess"
96+
),
6097
]
6198

62-
# Start threads
63-
for t in threads:
64-
t.start()
99+
# Start processes
100+
for p in processes:
101+
p.start()
65102

66-
logger.info(f"Started all threads")
103+
logger.info(f"Started all processes")
67104

68105
try:
69-
while any(t.is_alive() for t in threads):
106+
while any(p.is_alive() for p in processes):
70107
# Monitor memory usage
71108
memory_monitor.check_memory("Pipeline-Main")
72109

73-
for t in threads:
74-
t.join(timeout=0.5)
110+
for p in processes:
111+
p.join(timeout=0.5)
75112
except KeyboardInterrupt:
76113
logger.info("KeyboardInterrupt received! Setting stop event and sending sentinels.")
77114
stop_event.set()
@@ -80,8 +117,8 @@ def run():
80117
change_queue.put(None)
81118
state_queue.put(None)
82119
logger.info("Sentinels sent to all queues.")
83-
for t in threads:
84-
t.join(timeout=5)
120+
for p in processes:
121+
p.join(timeout=5)
85122
logger.info("Pipeline killed by user.")
86123

87124
if __name__ == "__main__":

0 commit comments

Comments
 (0)