12
12
import pytest
13
13
from common .mtl_manager .mtlManager import MtlManager
14
14
from common .nicctl import Nicctl
15
- from create_pcap_file . netsniff import NetsniffRecorder
16
- from create_pcap_file .tcpdump import TcpDumpRecorder
15
+ from compliance . pcap_compliance import PcapComplianceClient
16
+ from create_pcap_file .netsniff import NetsniffRecorder , calculate_packets_per_frame
17
17
from mfd_common_libs .custom_logger import add_logging_level
18
18
from mfd_common_libs .log_levels import TEST_FAIL , TEST_INFO , TEST_PASS
19
19
from mfd_connect .exceptions import ConnectionCalledProcessError
20
- from mtl_engine .const import LOG_FOLDER , TESTCMD_LVL
20
+ from mtl_engine .const import DOWNLOAD_REPORT_TRIES , FRAMES_CAPTURE , LOG_FOLDER , TESTCMD_LVL
21
21
from mtl_engine .csv_report import (
22
22
csv_add_test ,
23
23
csv_write_report ,
24
24
update_compliance_result ,
25
25
)
26
26
27
27
# FIXME: Perhaps, it could be set less statically
28
+ from mtl_engine .execute import log_fail
28
29
from mtl_engine .ffmpeg_app import ip_dict , ip_dict_rgb24_multiple
29
30
from mtl_engine .ramdisk import Ramdisk
30
31
from mtl_engine .stash import (
38
39
)
39
40
from pytest_mfd_logging .amber_log_formatter import AmberLogFormatter
40
41
42
+
41
43
logger = logging .getLogger (__name__ )
42
44
phase_report_key = pytest .StashKey [Dict [str , pytest .CollectReport ]]()
43
45
@@ -273,126 +275,90 @@ def log_session():
273
275
if os .path .exists ("pytest.log" ):
274
276
shutil .copy ("pytest.log" , f"{ LOG_FOLDER } /latest/pytest.log" )
275
277
else :
276
- logging .warning ("pytest.log not found, skipping copy" )
278
+ logger .warning ("pytest.log not found, skipping copy" )
277
279
csv_write_report (f"{ LOG_FOLDER } /latest/report.csv" )
278
280
279
281
280
- def read_ebu_creds (config_path ):
281
- # Load EBU IP and credentials from YAML config
282
- import yaml
283
-
284
- if config_path :
285
- with open (config_path , "r" ) as f :
286
- config = yaml .safe_load (f )
287
- instance = config ["instances" ]
288
- ebu_ip = instance .get ("name" , None )
289
- user = instance .get ("username" , None )
290
- password = instance .get ("password" , None )
291
- proxy = instance .get ("proxy" , None )
292
- return ebu_ip , user , password , proxy
293
- return None , None , None , None
294
-
295
-
296
- @pytest .fixture (scope = "function" , autouse = False )
297
- def pcap_capture (
298
- test_config , hosts , nic_port_list , video_format , output_format , mtl_path
282
+ @pytest .fixture (scope = "function" )
283
+ def pcap_capture (request , media_file , test_config , hosts , mtl_path
299
284
):
300
285
capture_cfg = test_config .get ("capture_cfg" , {})
301
- capture_cfg ["test_name" ] = (
302
- f"test_rx_ffmpeg_tx_ffmpeg_dual_{ video_format } _{ output_format } "
303
- )
304
-
305
- host = hosts ["client" ] if "client" in hosts else list (hosts .values ())[0 ]
306
- # FIXME: If possible, change this not to be hardcoded
307
- src_ip = ip_dict ["tx_interfaces" ]
308
- dst_ip = ip_dict_rgb24_multiple ["p_tx_ip_2" ]
309
-
310
286
capturer = None
311
287
if capture_cfg and capture_cfg .get ("enable" ):
312
- if capture_cfg .get ("tool" ) == "tcpdump" :
313
- capturer = TcpDumpRecorder (
314
- host = host ,
315
- test_name = capture_cfg .get ("test_name" , "capture" ),
316
- pcap_dir = capture_cfg .get ("pcap_dir" , "/tmp" ),
317
- interface = capture_cfg .get ("interface" ),
288
+ host = hosts ["client" ] if "client" in hosts else list (hosts .values ())[0 ]
289
+ media_file_info , _ = media_file
290
+ test_name = request .node .name
291
+ if "frames_number" not in capture_cfg and "capture_time" not in capture_cfg :
292
+ capture_cfg ["packets_number" ] = FRAMES_CAPTURE * calculate_packets_per_frame (
293
+ media_file_info ["width" ],
294
+ media_file_info ["height" ]
318
295
)
319
- elif capture_cfg .get ("tool" ) in ["netsniff" , "netsniff-ng" ]:
320
- # Filtering
321
- capture_filter = ""
322
- # TODO: Enable filtering back when checked
323
- if src_ip :
324
- capture_filter += f"src { src_ip } "
325
- if dst_ip and not capture_filter == "" :
326
- capture_filter += f" and dst { dst_ip } "
327
- elif dst_ip :
328
- capture_filter += f"dst { dst_ip } "
329
- # Class prep
330
- capturer = NetsniffRecorder (
331
- host = host ,
332
- test_name = capture_cfg .get ("test_name" , "capture" ),
333
- pcap_dir = capture_cfg .get ("pcap_dir" , "/tmp" ),
334
- interface = capture_cfg .get ("interface" , "eth0" ),
335
- capture_filter = (
336
- capture_filter if capture_filter != "" else None
337
- ), # Avoid forcing an empty filter
296
+ logger .info (f"Capture { capture_cfg ['packets_number' ]} packets for { FRAMES_CAPTURE } frames" )
297
+ elif "frames_number" in capture_cfg :
298
+ capture_cfg ["packets_number" ] = capture_cfg .pop ("frames_number" ) * calculate_packets_per_frame (
299
+ media_file_info ["width" ],
300
+ media_file_info ["height" ]
338
301
)
339
- else :
340
- logging . error ( f"Unknown capture tool { capture_cfg . get ( 'tool' ) } " )
341
- if capturer :
342
- if isinstance ( capturer , TcpDumpRecorder ):
343
- # TODO: Perhaps it would need to be changed to use .start() instead of .capture()
344
- capturer . capture ( capture_time = capture_cfg . get ( "time" , test_time ))
345
- elif isinstance ( capturer , NetsniffRecorder ):
346
- capturer . start ()
347
- else :
348
- logging . error ( f"Unknown capturer class { capturer . __class__ . __name__ } " )
302
+ logger . info ( f"Capture { capture_cfg [ 'packets_number' ] } packets for { capture_cfg [ 'frames_number' ] } frames" )
303
+ capturer = NetsniffRecorder (
304
+ host = host ,
305
+ test_name = test_name ,
306
+ pcap_dir = capture_cfg . get ( "pcap_dir" , "/tmp" ),
307
+ interface = host . network_interfaces [ 0 ]. name ,
308
+ silent = capture_cfg . get ( "silent" , True ),
309
+ packets_capture = capture_cfg . get ( "packets_number" , None ),
310
+ capture_time = capture_cfg . get ( "capture_time" , None )
311
+ )
349
312
yield capturer
350
313
if capturer :
351
- capturer .stop ()
352
- if capture_cfg and capture_cfg .get ("compliance" , False ):
353
- # FIXME: This is generally a bad practice to call it like that, but for now it is the easiest way
354
- ebu_ip , ebu_login , ebu_passwd , ebu_proxy = read_ebu_creds (
355
- config_path = capture_cfg .get ("ebu_yaml_path" , "configs/ebu_list.yaml" )
356
- ) # Reads from executor
357
- proxy_cmd = f" --proxy { ebu_proxy } " if ebu_proxy else ""
358
- compliance_upl = host .connection .execute_command (
359
- "python3 ./tests/validation/compliance/upload_pcap.py"
360
- f" --ip { ebu_ip } "
361
- f" --login { ebu_login } "
362
- f" --password { ebu_passwd } "
363
- f" --pcap_file_path { capturer .pcap_file } { proxy_cmd } " ,
364
- cwd = f"{ str (mtl_path )} " ,
365
- )
366
- if compliance_upl .return_code != 0 :
367
- logging .error (f"PCAP upload failed: { compliance_upl .stderr } " )
368
- else :
369
- uuid = (
370
- compliance_upl .stdout .split (">>>UUID>>>" )[1 ]
371
- .split ("<<<UUID<<<" )[0 ]
372
- .strip ()
373
- )
374
- logging .debug (
375
- f"PCAP successfully uploaded to EBU LIST with UUID: { uuid } "
376
- )
377
-
378
-
379
- @pytest .fixture (scope = "session" , autouse = True )
380
- def compliance_report (request , log_session , test_config ):
381
- """
382
- This function is used for compliance check and report.
383
- """
384
- # TODO: Implement compliance check logic. When tcpdump pcap is enabled, at the end of the test session all pcaps
385
- # shall be send into EBU list.
386
- # Pcaps shall be stored in the ramdisk, and then moved to the compliance
387
- # folder or send into EBU list after each test finished and remove it from the ramdisk.
388
- # Compliance report generation logic goes here after yield. Or in another class / function but triggered here.
389
- # AFAIK names of pcaps contains test name so it can be matched with result of each test like in code below.
390
- yield
391
- if test_config .get ("compliance" , False ):
392
- logging .info ("Compliance mode enabled, updating compliance results" )
393
- for item in request .session .items :
394
- test_case = item .nodeid
395
- update_compliance_result (test_case , "Fail" )
314
+ ebu_server = test_config .get ("ebu_server" , {})
315
+ if not ebu_server :
316
+ logger .error ("EBU server configuration not found in test_config.yaml" )
317
+ return
318
+ ebu_ip = ebu_server .get ("name" , None )
319
+ ebu_login = ebu_server .get ("username" , None )
320
+ ebu_passwd = ebu_server .get ("password" , None )
321
+ ebu_proxy = ebu_server .get ("proxy" , None )
322
+ proxy_cmd = f" --proxy { ebu_proxy } " if ebu_proxy else ""
323
+ compliance_upl = host .connection .execute_command (
324
+ "python3 ./tests/validation/compliance/upload_pcap.py"
325
+ f" --ip { ebu_ip } "
326
+ f" --login { ebu_login } "
327
+ f" --password { ebu_passwd } "
328
+ f" --pcap_file_path { capturer .pcap_file } { proxy_cmd } " ,
329
+ cwd = f"{ str (mtl_path )} " ,
330
+ )
331
+ if compliance_upl .return_code != 0 :
332
+ logger .error (f"PCAP upload failed: { compliance_upl .stderr } " )
333
+ return
334
+ uuid = (
335
+ compliance_upl .stdout .split (">>>UUID>>>" )[1 ]
336
+ .split ("<<<UUID<<<" )[0 ]
337
+ .strip ()
338
+ )
339
+ logger .debug (
340
+ f"PCAP successfully uploaded to EBU LIST with UUID: { uuid } "
341
+ )
342
+ uploader = PcapComplianceClient (config_path = "" )
343
+ uploader .ebu_ip = ebu_ip
344
+ uploader .user = ebu_login
345
+ uploader .password = ebu_passwd
346
+ uploader .pcap_id = uuid
347
+ uploader .proxies = {"http" : ebu_proxy , "https" : ebu_proxy }
348
+ uploader .authenticate () # Authenticate with the EBU server
349
+ report = {"analyzed" : False }
350
+ tries = DOWNLOAD_REPORT_TRIES
351
+ while (not report .get ("analyzed" , False )) and tries > 0 :
352
+ time .sleep (1 )
353
+ tries -= 1
354
+ report = uploader .download_report ()
355
+ if not report .get ("analyzed" , False ):
356
+ logger .error (f"Report is not ready after { DOWNLOAD_REPORT_TRIES } seconds, skipping compliance check" )
357
+ return
358
+ if report .get ("not_compliant_streams" , 1 ) == 0 :
359
+ logger .info ("PCAP compliance check passed" )
360
+ else :
361
+ log_fail ("PCAP compliance check failed" )
396
362
397
363
398
364
@pytest .fixture (scope = "function" , autouse = True )
@@ -415,19 +381,19 @@ def log_case(request, caplog: pytest.LogCaptureFixture):
415
381
yield
416
382
report = request .node .stash [phase_report_key ]
417
383
if report ["setup" ].failed :
418
- logging .log (level = TEST_FAIL , msg = f"Setup failed for { case_id } " )
384
+ logger .log (level = TEST_FAIL , msg = f"Setup failed for { case_id } " )
419
385
os .chmod (logfile , 0o4755 )
420
386
result = "Fail"
421
387
elif ("call" not in report ) or report ["call" ].failed :
422
- logging .log (level = TEST_FAIL , msg = f"Test failed for { case_id } " )
388
+ logger .log (level = TEST_FAIL , msg = f"Test failed for { case_id } " )
423
389
os .chmod (logfile , 0o4755 )
424
390
result = "Fail"
425
391
elif report ["call" ].passed :
426
- logging .log (level = TEST_PASS , msg = f"Test passed for { case_id } " )
392
+ logger .log (level = TEST_PASS , msg = f"Test passed for { case_id } " )
427
393
os .chmod (logfile , 0o755 )
428
394
result = "Pass"
429
395
else :
430
- logging .log (level = TEST_INFO , msg = f"Test skipped for { case_id } " )
396
+ logger .log (level = TEST_INFO , msg = f"Test skipped for { case_id } " )
431
397
result = "Skip"
432
398
433
399
logger .removeHandler (fh )
0 commit comments