diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index a9026392c..f99908b03 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -7,6 +7,10 @@ list(APPEND GENERAL_SOURCES tag.c port.c mixed_radix.c reactor_common.c lf_token if (DEFINED LF_TRACE) message(STATUS "Including sources specific to tracing.") list(APPEND GENERAL_SOURCES trace.c) + if (DEFINED LF_TRACE_SYSTEM) + message(STATUS "System Tracing is ENABLED") + add_compile_definitions(LF_TRACE_SYSTEM=1) + endif () endif() # Store all sources used to build the reactor-c lib in INFO_SOURCES @@ -114,6 +118,7 @@ define(FEDERATED_AUTHENTICATED) define(LF_REACTION_GRAPH_BREADTH) define(LF_THREADED) define(LF_TRACE) +define(LF_TRACE_SYSTEM) define(LF_UNTHREADED) define(LOG_LEVEL) define(MODAL_REACTORS) diff --git a/core/trace.c b/core/trace.c index 1e9703c09..2963c781a 100644 --- a/core/trace.c +++ b/core/trace.c @@ -106,6 +106,11 @@ int register_user_trace_event(void *self, char* description) { return _lf_register_trace_event(trace, description, NULL, trace_user, description); } +int register_user_stats_event(void *self, char *description) { + lf_assert(self, "Need a pointer to a self struct to register a user trace event"); + trace_t * trace = ((self_base_t *) self)->environment->trace; + return _lf_register_trace_event(trace, description, NULL, stats_user, description); +} /** * Write the trace header information. @@ -329,7 +334,9 @@ void tracepoint( * @param worker The thread number of the worker thread or 0 for unthreaded execution. */ void tracepoint_reaction_starts(trace_t* trace, reaction_t* reaction, int worker) { - tracepoint(trace, reaction_starts, reaction->self, NULL, worker, worker, reaction->number, NULL, NULL, 0, true); + if (LF_ALLOW_SYSTEM_TRACES) { + tracepoint(trace, reaction_starts, reaction->self, NULL, worker, worker, reaction->number, NULL, NULL, 0, true); + } } /** @@ -338,7 +345,9 @@ void tracepoint_reaction_starts(trace_t* trace, reaction_t* reaction, int worker * @param worker The thread number of the worker thread or 0 for unthreaded execution. */ void tracepoint_reaction_ends(trace_t* trace, reaction_t* reaction, int worker) { - tracepoint(trace, reaction_ends, reaction->self, NULL, worker, worker, reaction->number, NULL, NULL, 0, false); + if (LF_ALLOW_SYSTEM_TRACES) { + tracepoint(trace, reaction_ends, reaction->self, NULL, worker, worker, reaction->number, NULL, NULL, 0, false); + } } /** @@ -351,6 +360,9 @@ void tracepoint_schedule(trace_t* trace, trigger_t* trigger, interval_t extra_de // or timer. If there is such a reaction, find its reactor's self struct and // put that into the tracepoint. We only have to look at the first reaction. // If there is no reaction, insert NULL for the reactor. + if (!LF_ALLOW_SYSTEM_TRACES) { + return; + } void* reactor = NULL; if (trigger->number_of_reactions > 0 && trigger->reactions[0] != NULL) { @@ -416,12 +428,23 @@ void tracepoint_user_value(void* self, char* description, long long value) { lf_critical_section_exit(env); } + +void tracepoint_user_stats(void *self, char *description, long long value) { + environment_t *env = ((self_base_t *)self)->environment; + trace_t *trace = env->trace; + lf_critical_section_enter(env); + tracepoint(trace, user_stats, description, NULL, -1, -1, -1, NULL, NULL, value, false); + lf_critical_section_exit(env); +} + /** * Trace the start of a worker waiting for something to change on the event or reaction queue. * @param worker The thread number of the worker thread or 0 for unthreaded execution. */ void tracepoint_worker_wait_starts(trace_t* trace, int worker) { - tracepoint(trace, worker_wait_starts, NULL, NULL, worker, worker, -1, NULL, NULL, 0, true); + if (LF_ALLOW_SYSTEM_TRACES) { + tracepoint(trace, worker_wait_starts, NULL, NULL, worker, worker, -1, NULL, NULL, 0, true); + } } /** @@ -429,7 +452,9 @@ void tracepoint_worker_wait_starts(trace_t* trace, int worker) { * @param worker The thread number of the worker thread or 0 for unthreaded execution. */ void tracepoint_worker_wait_ends(trace_t* trace, int worker) { - tracepoint(trace, worker_wait_ends, NULL, NULL, worker, worker, -1, NULL, NULL, 0, false); + if (LF_ALLOW_SYSTEM_TRACES) { + tracepoint(trace, worker_wait_ends, NULL, NULL, worker, worker, -1, NULL, NULL, 0, false); + } } /** @@ -437,7 +462,9 @@ void tracepoint_worker_wait_ends(trace_t* trace, int worker) { * appear on the event queue. */ void tracepoint_scheduler_advancing_time_starts(trace_t* trace) { - tracepoint(trace, scheduler_advancing_time_starts, NULL, NULL, -1, -1, -1, NULL, NULL, 0, true); + if (LF_ALLOW_SYSTEM_TRACES) { + tracepoint(trace, scheduler_advancing_time_starts, NULL, NULL, -1, -1, -1, NULL, NULL, 0, true); + } } /** @@ -445,7 +472,9 @@ void tracepoint_scheduler_advancing_time_starts(trace_t* trace) { * appear on the event queue. */ void tracepoint_scheduler_advancing_time_ends(trace_t* trace) { - tracepoint(trace, scheduler_advancing_time_ends, NULL, NULL, -1, -1, -1, NULL, NULL, 0, false); + if (LF_ALLOW_SYSTEM_TRACES) { + tracepoint(trace, scheduler_advancing_time_ends, NULL, NULL, -1, -1, -1, NULL, NULL, 0, false); + } } /** @@ -454,7 +483,10 @@ void tracepoint_scheduler_advancing_time_ends(trace_t* trace) { * @param worker The thread number of the worker thread or 0 for unthreaded execution. */ void tracepoint_reaction_deadline_missed(trace_t* trace, reaction_t *reaction, int worker) { - tracepoint(trace, reaction_deadline_missed, reaction->self, NULL, worker, worker, reaction->number, NULL, NULL, 0, false); + if (LF_ALLOW_SYSTEM_TRACES) { + tracepoint(trace, reaction_deadline_missed, reaction->self, NULL, worker, worker, reaction->number, NULL, NULL, + 0, false); + } } void stop_trace(trace_t* trace) { diff --git a/include/core/trace.h b/include/core/trace.h index 72e2d466a..195a43e9c 100644 --- a/include/core/trace.h +++ b/include/core/trace.h @@ -59,6 +59,12 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "net_common.h" #endif // FEDERATED +#ifdef LF_TRACE_SYSTEM +#define LF_ALLOW_SYSTEM_TRACES true +#else +#define LF_ALLOW_SYSTEM_TRACES false +#endif + /** * Trace event types. If you update this, be sure to update the * string representation below. Also, create a tracepoint function @@ -72,6 +78,7 @@ typedef enum schedule_called, user_event, user_value, + user_stats, worker_wait_starts, worker_wait_ends, scheduler_advancing_time_starts, @@ -135,6 +142,7 @@ static const char *trace_event_names[] = { "Schedule called", "User-defined event", "User-defined valued event", + "User Stats", "Worker wait starts", "Worker wait ends", "Scheduler advancing time starts", @@ -212,7 +220,8 @@ typedef struct trace_record_t { typedef enum { trace_reactor, // Self struct. trace_trigger, // Timer or action (argument to schedule()). - trace_user // User-defined trace object. + trace_user, // User-defined trace object. + stats_user } _lf_trace_object_t; /** @@ -301,6 +310,8 @@ int _lf_register_trace_event(trace_t* trace, void* pointer1, void* pointer2, _lf */ int register_user_trace_event(void* self, char* description); +int register_user_stats_event(void *self, char *description); + /** * Open a trace file and start tracing. * @param filename The filename for the trace file. @@ -394,6 +405,8 @@ void tracepoint_user_event(void* self, char* description); */ void tracepoint_user_value(void* self, char* description, long long value); +void tracepoint_user_stats(void *self, char *description, long long value); + /** * Trace the start of a worker waiting for something to change on the reaction queue. * @param env The environment in which we are executing @@ -511,12 +524,14 @@ void tracepoint_rti_from_federate(trace_t* trace, trace_event_t event_type, int // empty definition in case we compile without tracing #define _lf_register_trace_event(...) #define register_user_trace_event(...) +#define register_user_stats_event(...) #define tracepoint(...) #define tracepoint_reaction_starts(...) #define tracepoint_reaction_ends(...) #define tracepoint_schedule(...) #define tracepoint_user_event(...) #define tracepoint_user_value(...) +#define tracepoint_user_stats(...) #define tracepoint_worker_wait_starts(...) #define tracepoint_worker_wait_ends(...) #define tracepoint_scheduler_advancing_time_starts(...); diff --git a/util/tracing/Makefile b/util/tracing/Makefile index 9700ac82f..d138fddb2 100644 --- a/util/tracing/Makefile +++ b/util/tracing/Makefile @@ -35,6 +35,8 @@ install: trace_to_csv trace_to_chrome trace_to_influxdb cp ./visualization/fedsd.py $(BIN_INSTALL_PATH) ln -f -s $(BIN_INSTALL_PATH)/fedsd.py $(BIN_INSTALL_PATH)/fedsd chmod +x $(BIN_INSTALL_PATH)/fedsd + +all: trace_to_csv trace_to_chrome trace_to_influxdb clean: rm -f *.o diff --git a/util/tracing/trace_to_csv.c b/util/tracing/trace_to_csv.c index 1abf08c08..ce6815f91 100644 --- a/util/tracing/trace_to_csv.c +++ b/util/tracing/trace_to_csv.c @@ -32,6 +32,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define LF_TRACE #include #include +#include +#include +#include +#include + #include "reactor.h" #include "trace.h" #include "trace_util.h" @@ -48,14 +53,77 @@ FILE* output_file = NULL; /** File for writing summary statistics. */ FILE* summary_file = NULL; +/** File for User Statistics */ +FILE* filtered_file = NULL; + /** Size of the stats table is object_table_size plus twice MAX_NUM_WORKERS. */ int table_size; +/** Structure for passed command line options */ +typedef struct { + char *r_name; // resource name + char *out_file; // Output Directory + trace_event_t filter; // Filter + bool is_dir; // Resource is Directory? + bool recursive; // Recursive? +} program_options_t; + +program_options_t opts = { + .filter = -1, + .is_dir = false, + .recursive = false +}; + +/** All Events of specified Types */ +enum filter_options_t { + reaction_events = NUM_EVENT_TYPES + 1, + user_events, + worker_wait_events, + scheduler_advancing_time_events, + send_events, + receive_events +}; + +/** Trace Processor Fptr */ +typedef void (*processor_f) (const char *fname); + +/** Available Filter Options */ +struct filter { + int value; + const char *str; +} filters[] = { + [user_event] = {.str = "user_event", .value = user_event }, + [user_value] = {.str = "user_value", .value = user_value }, + [user_stats] = {.str = "user_stats", .value = user_stats }, + [schedule_called] = {.str = "schedule_called", .value = schedule_called }, + [federated] = {.str = "federated", .value = federated }, + [NUM_EVENT_TYPES] = {.str = "none", .value = -1 }, + [reaction_events] = {.str = "reaction_events", .value = reaction_events }, + [user_events] = {.str = "user_events", .value = user_events}, + [worker_wait_events] = {.str = "worker_wait_events", .value = worker_wait_events}, + [scheduler_advancing_time_events] = {.str = "scheduler_advancing_time_events", .value = scheduler_advancing_time_events}, + [send_events] = {.str = "send_events", .value = send_events}, + [receive_events] = {.str = "receive_events", .value = receive_events} +}; + /** * Print a usage message. */ void usage() { - printf("\nUsage: trace_to_csv [options] trace_file_root (without .lft extension)\n\n"); + printf("\nBasic Usage (Single File Mode)\nUsage: trace_to_csv [options] trace_file_root (without .lft extension)\n\n"); + printf("Advanced Usage (Directory Mode)\nUsage: trace_to_csv [options] [args] path_to_root_directory\n\n"); + printf("Options:\n-r:\t Recursively find *.lft files on provided path\n"); + printf("-d:\t Root Directory Path\n"); + printf("-f:\t Filter events\n"); + printf("-o:\t Outfile Prefix\n"); + printf("\nFilters:\n["); + for (int i = 0; i < sizeof (filters) / sizeof (struct filter); ++i) { + if (filters[i].str != NULL && filters[i].value != -1) { + printf(" %s |", filters[i].str); + } + } + printf(" none ]\n"); + /* No options yet: printf("\nOptions: \n\n"); printf(" -f, --fast [true | false]\n"); @@ -96,6 +164,69 @@ summary_stats_t** summary_stats; /** Largest timestamp seen. */ instant_t latest_time = 0LL; +/** + * Checks if specified event ev is Filtered for this run + * @param ev Current Tracing Event + * @returns True if this event needs to be filtered or false otherwise + * */ +bool is_filtered_event(trace_event_t ev) { + if (ev == opts.filter) { + // Definitive Event + return true; + } else { + if (opts.filter == reaction_events) { + switch (ev) { + case reaction_starts: + case reaction_ends: + case reaction_deadline_missed: + return true; + default: + return false; + } + } else if (opts.filter == user_events) { + switch (ev) { + case user_value: + case user_event: + case user_stats: + return true; + default: + return false; + } + } else if (opts.filter == worker_wait_events) { + switch (ev) { + case worker_wait_starts: + case worker_wait_ends: + return true; + default: + return false; + } + } else if (opts.filter == scheduler_advancing_time_events) { + switch (ev) { + case scheduler_advancing_time_starts: + case scheduler_advancing_time_ends: + return true; + default: + return false; + } + } else if (opts.filter == send_events) { + switch (ev) { + case send_ACK ... send_ADR_QR: + return true; + default: + return false; + } + } else if (opts.filter == receive_events) { + switch (ev) { + case receive_ACK ... receive_UNIDENTIFIED: + return true; + default: + return false; + } + } + } + return false; +} + /** * Read a trace in the specified file and write it to the specified CSV file. * @return The number of records read or 0 upon seeing an EOF. @@ -116,6 +247,19 @@ size_t read_and_write_trace() { if (trigger_name == NULL) { trigger_name = "NO TRIGGER"; } + if (opts.filter != -1 && is_filtered_event(trace[i].event_type)) { + fprintf(filtered_file, "%s, %s, %d, %d, %lld, %d, %lld, %s, %lld\n", + trace_event_names[trace[i].event_type], + reactor_name, + trace[i].src_id, + trace[i].dst_id, + trace[i].logical_time - start_time, + trace[i].microstep, + trace[i].physical_time - start_time, + trigger_name, + trace[i].extra_delay + ); + } fprintf(output_file, "%s, %s, %d, %d, %lld, %d, %lld, %s, %lld\n", trace_event_names[trace[i].event_type], reactor_name, @@ -127,6 +271,7 @@ size_t read_and_write_trace() { trigger_name, trace[i].extra_delay ); + // Update summary statistics. if (trace[i].physical_time > latest_time) { latest_time = trace[i].physical_time; @@ -195,6 +340,7 @@ size_t read_and_write_trace() { stats = summary_stats[NUM_EVENT_TYPES + object_instance]; stats->description = reactor_name; break; + case user_stats: case user_value: // Although these are not exec times and not reactions, // commandeer the first entry in the reactions array to track values. @@ -394,17 +540,61 @@ void write_summary_file() { } } -int main(int argc, char* argv[]) { - if (argc != 2) { - usage(); - exit(0); +/** Filter Options Table */ +trace_event_t get_filtered_option(const char *opt) { + for (int i = 0; i < sizeof (filters) / sizeof (struct filter); ++i) { + if (filters[i].str != NULL && strcmp(opt, filters[i].str) == 0) { + return filters[i].value; + } } - // Open the trace file. - trace_file = open_file(argv[1], "r"); - if (trace_file == NULL) exit(1); + return -1; +} +/** Parse and set Program Options */ +void scan_program_options(int argc, char **argv) { + if (argc > 2) { + // scan for program options + int arg; + char cwd[MAXPATHLEN] = {0}; + getcwd(cwd, MAXPATHLEN); + while((arg = getopt(argc, argv, "rd:f:o:")) != -1) { + switch (arg) { + case 'r': + opts.recursive = true; + break; + case 'd': + opts.is_dir = true; + opts.r_name = optarg; + break; + case 'f': + opts.filter = get_filtered_option(optarg); + break; + case 'o': + opts.out_file = optarg; + break; + default: + break; + } + } + printf("------- options specified:\n\tDirectory Mode=%s\n" + "\tRecursive=%s\n" + "\tResource=%s\n" + "\tFilter=%s\n" + "\tOutfilePrefix=%s\n-------\n", + opts.is_dir ? "true" : "false", + opts.recursive ? "true" : "false", + opts.r_name, + filters[opts.filter].str, + opts.out_file); + } else { + opts.r_name = argv[1]; + opts.out_file = argv[1]; + } +} + +void open_output_files(const char *fname) { // Construct the name of the csv output file and open it. - char* root = root_name(argv[1]); + char* root = root_name(fname); char csv_filename[strlen(root) + 5]; strcpy(csv_filename, root); strcat(csv_filename, ".csv"); @@ -417,8 +607,29 @@ int main(int argc, char* argv[]) { strcat(summary_filename, "_summary.csv"); summary_file = open_file(summary_filename, "w"); if (summary_file == NULL) exit(1); + if (opts.filter != -1) { + char filtered_filename[strlen(root) + 13]; + strcpy(filtered_filename, root); + strcat(filtered_filename, "_"); + strcat(filtered_filename, filters[opts.filter].str); + strcat(filtered_filename, ".csv"); + filtered_file = open_file(filtered_filename, "w"); + if (filtered_file == NULL) exit(1); + } free(root); +} + +void close_output_files() { + fclose(output_file); + fclose(filtered_file); + fclose(summary_file); +} + +void trace_processor(const char *fname) { + // Open the trace file. + trace_file = open_file(fname, "r"); + if (trace_file == NULL) exit(1); if (read_header() >= 0) { // Allocate an array for summary statistics. @@ -427,10 +638,68 @@ int main(int argc, char* argv[]) { // Write a header line into the CSV file. fprintf(output_file, "Event, Reactor, Source, Destination, Elapsed Logical Time, Microstep, Elapsed Physical Time, Trigger, Extra Delay\n"); + if (opts.filter != -1) + fprintf(filtered_file, "Event, Reactor, Source, Destination, Elapsed Logical Time, Microstep, Elapsed Physical Time, Trigger, Extra Delay\n"); while (read_and_write_trace() != 0) {}; write_summary_file(); + } +} - // File closing is handled by termination function. +/** + * Invokes processor for each trace-file on current path + * @param trace_ext Extension of Trace File + * @param tmp_buffer temporary buffer space + * @param recursive Specifies if we need to examine sub-directories as well + * @param process Function Pointer to trace processor + * */ +int for_each_trace(char *trace_ext, char *tmp_buffer, bool recursive, processor_f process) { + DIR *d; + struct dirent *dir; + d = opendir("."); + if (d == NULL) { + return 1; + } + while ((dir = readdir(d))) { + if (strcmp(dir->d_name, ".") == 0 || + strcmp(dir->d_name, "..") == 0) { + continue; + } + if (dir->d_type == DT_DIR && recursive) { + chdir(dir->d_name); + for_each_trace(trace_ext, tmp_buffer, recursive, process); + chdir(".."); + } else { + if (strstr(dir->d_name, trace_ext) != NULL) { + size_t len; + getcwd(tmp_buffer, MAXPATHLEN); + len = strnlen(tmp_buffer, MAXPATHLEN); + snprintf(tmp_buffer + len, MAXPATHLEN - len, "/%s", dir->d_name); + printf("Found Trace file at: %s\n", tmp_buffer); + process(tmp_buffer); + } + } + } + closedir(d); + return *tmp_buffer == 0; +} + +int main(int argc, char* argv[]) { + if (argc < 2) { + usage(); + exit(0); + } + scan_program_options(argc, argv); + open_output_files(opts.out_file); + if (!opts.is_dir) { + trace_processor(opts.r_name); + } else { + char cwd[MAXPATHLEN] = {0}; + char tmp_buffer[MAXPATHLEN] = {0}; + getcwd(cwd, MAXPATHLEN); + chdir(opts.r_name); + for_each_trace(".lft", tmp_buffer, opts.recursive, trace_processor); + chdir(cwd); } + close_output_files(); }