diff --git a/config/datasource/assemblyline/d1-goods-delivery.yaml b/config/datasource/assemblyline/d1-goods-delivery.yaml index 72f6148..314bda4 100644 --- a/config/datasource/assemblyline/d1-goods-delivery.yaml +++ b/config/datasource/assemblyline/d1-goods-delivery.yaml @@ -2,7 +2,7 @@ kind: datasource name: "GoodsDelivery" spec: name: "GoodsDelivery" - group: "factory" + group: "GoodsDelivery" selection: driftingProbability startDistribution: [0.1, 0.7, 0.2] endDistribution: [0.1, 0.2, 0.7] diff --git a/config/datasource/assemblyline/d2-material-preparation.yaml b/config/datasource/assemblyline/d2-material-preparation.yaml index 8d855ff..431c58e 100644 --- a/config/datasource/assemblyline/d2-material-preparation.yaml +++ b/config/datasource/assemblyline/d2-material-preparation.yaml @@ -2,7 +2,7 @@ kind: datasource name: "MaterialPreparation" spec: name: "MaterialPreparation" - group: "factory" + group: "MaterialPreparation" selection: genericProbability distribution: [ 0.25, 0.7, 0.05 ] eventData: diff --git a/config/datasource/assemblyline/d3-assembly-line-setup.yaml b/config/datasource/assemblyline/d3-assembly-line-setup.yaml index a74c939..d5283dd 100644 --- a/config/datasource/assemblyline/d3-assembly-line-setup.yaml +++ b/config/datasource/assemblyline/d3-assembly-line-setup.yaml @@ -2,7 +2,7 @@ kind: datasource name: "AssemblyLineSetup" spec: name: "AssemblyLineSetup" - group: "factory" + group: "AssemblyLineSetup" selection: genericProbability distribution: [ 0.3, 0.4, 0.15, 0.05, 0.1 ] eventData: diff --git a/config/datasource/assemblyline/d4-assembling.yaml b/config/datasource/assemblyline/d4-assembling.yaml index eac32a2..e5403de 100644 --- a/config/datasource/assemblyline/d4-assembling.yaml +++ b/config/datasource/assemblyline/d4-assembling.yaml @@ -2,7 +2,7 @@ kind: datasource name: "Assembling" spec: name: "Assembling" - group: "factory" + group: "Assembling" selection: genericProbability distribution: [ 0.8, 0.1, 0.08, 0.02 ] eventData: diff --git a/config/datasource/assemblyline/d5-quality-control.yaml b/config/datasource/assemblyline/d5-quality-control.yaml index c25d814..97585d6 100644 --- a/config/datasource/assemblyline/d5-quality-control.yaml +++ b/config/datasource/assemblyline/d5-quality-control.yaml @@ -2,7 +2,7 @@ kind: datasource name: "QualityControl" spec: name: "QualityControl" - group: "factory" + group: "QualityControl" selection: genericProbability distribution: [ 0.3, 0.1, 0.6 ] eventData: diff --git a/config/datasource/assemblyline/d6-packaging.yaml b/config/datasource/assemblyline/d6-packaging.yaml index cd47264..95ee253 100644 --- a/config/datasource/assemblyline/d6-packaging.yaml +++ b/config/datasource/assemblyline/d6-packaging.yaml @@ -2,7 +2,7 @@ kind: datasource name: "Packaging" spec: name: "Packaging" - group: "factory" + group: "Packaging" selection: genericProbability distribution: [ 1.0 ] eventData: diff --git a/config/datasource/assemblyline/d7-shipping.yaml b/config/datasource/assemblyline/d7-shipping.yaml index ea1232d..b96b29c 100644 --- a/config/datasource/assemblyline/d7-shipping.yaml +++ b/config/datasource/assemblyline/d7-shipping.yaml @@ -2,7 +2,7 @@ kind: datasource name: "Shipping" spec: name: "Shipping" - group: "factory" + group: "Shipping" selection: genericProbability distribution: [ 0.8, 0.2 ] eventData: diff --git a/config/datasource/concept-drift/cd-a.yaml b/config/datasource/concept-drift/cd-a.yaml index 87d0190..06f1fa6 100644 --- a/config/datasource/concept-drift/cd-a.yaml +++ b/config/datasource/concept-drift/cd-a.yaml @@ -2,7 +2,7 @@ kind: datasource name: "A" spec: name: "A" - group: "concept-drift" + group: "A" selection: driftingProbability startDistribution: [ 0.0, 1.0 ] endDistribution: [ 1.0, 0.0 ] diff --git a/config/datasource/concept-drift/cd-b.yaml b/config/datasource/concept-drift/cd-b.yaml index fe28d01..7304f96 100644 --- a/config/datasource/concept-drift/cd-b.yaml +++ b/config/datasource/concept-drift/cd-b.yaml @@ -2,7 +2,7 @@ kind: datasource name: "B" spec: name: "B" - group: "concept-drift" + group: "B" selection: genericProbability distribution: [1.0] eventData: diff --git a/config/datasource/concept-drift/cd-c.yaml b/config/datasource/concept-drift/cd-c.yaml index 9951c10..1d742d8 100644 --- a/config/datasource/concept-drift/cd-c.yaml +++ b/config/datasource/concept-drift/cd-c.yaml @@ -2,7 +2,7 @@ kind: datasource name: "C" spec: name: "C" - group: "concept-drift" + group: "C" selection: genericProbability distribution: [1.0] eventData: diff --git a/config/datasource/concept-drift/cd-d.yaml b/config/datasource/concept-drift/cd-d.yaml index e870917..8321d38 100644 --- a/config/datasource/concept-drift/cd-d.yaml +++ b/config/datasource/concept-drift/cd-d.yaml @@ -2,7 +2,7 @@ kind: datasource name: "D" spec: name: "D" - group: "concept-drift" + group: "D" selection: genericProbability distribution: [1.0] eventData: diff --git a/config/datasource/concept-drift/cd-e.yaml b/config/datasource/concept-drift/cd-e.yaml index 2ff77a8..6a1a9f8 100644 --- a/config/datasource/concept-drift/cd-e.yaml +++ b/config/datasource/concept-drift/cd-e.yaml @@ -2,7 +2,7 @@ kind: datasource name: "E" spec: name: "E" - group: "concept-drift" + group: "E" selection: genericProbability distribution: [1.0] eventData: diff --git a/config/datasource/image/image-a.yaml b/config/datasource/image/image-a.yaml new file mode 100644 index 0000000..bef5bd4 --- /dev/null +++ b/config/datasource/image/image-a.yaml @@ -0,0 +1,13 @@ +kind: datasource +name: "A" +spec: + name: "A" + group: "A" + selection: genericProbability + distribution: [1.0] + eventData: + - activity: + type: image + activity: "n02085620-Chihuahua/n02085620_7" + transition: "" + duration: 1 \ No newline at end of file diff --git a/config/datasource/image/start.yaml b/config/datasource/image/start.yaml new file mode 100644 index 0000000..6c21fb4 --- /dev/null +++ b/config/datasource/image/start.yaml @@ -0,0 +1,11 @@ +kind: datasource +name: "" +spec: + name: "" + group: "A" + selection: genericProbability + distribution: [1.0] + eventData: + - activity: "" + transition: "A" + duration: 0 \ No newline at end of file diff --git a/config/simulation/stream.yaml b/config/simulation/stream.yaml index 854b44d..a9d9dd6 100644 --- a/config/simulation/stream.yaml +++ b/config/simulation/stream.yaml @@ -6,4 +6,4 @@ spec: maxConcurrentCases: 5 load: loadBehavior: constant - load: 10 \ No newline at end of file + load: 1 \ No newline at end of file diff --git a/config/sink/group/http-sink-assembling.yaml b/config/sink/group/http-sink-assembling.yaml new file mode 100644 index 0000000..65a0fb0 --- /dev/null +++ b/config/sink/group/http-sink-assembling.yaml @@ -0,0 +1,8 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8090/event" + dataSourceRefs: + - "Assembling" diff --git a/config/sink/group/http-sink-assemblyline.yaml b/config/sink/group/http-sink-assemblyline.yaml new file mode 100644 index 0000000..22bd7c1 --- /dev/null +++ b/config/sink/group/http-sink-assemblyline.yaml @@ -0,0 +1,8 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8091/event" + dataSourceRefs: + - "AssemblyLineSetup" \ No newline at end of file diff --git a/config/sink/group/http-sink-goodsdelivery.yaml b/config/sink/group/http-sink-goodsdelivery.yaml new file mode 100644 index 0000000..9ba11bf --- /dev/null +++ b/config/sink/group/http-sink-goodsdelivery.yaml @@ -0,0 +1,8 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8092/event" + dataSourceRefs: + - "GoodsDelivery" diff --git a/config/sink/group/http-sink-materialpreparation.yaml b/config/sink/group/http-sink-materialpreparation.yaml new file mode 100644 index 0000000..ba4e0f2 --- /dev/null +++ b/config/sink/group/http-sink-materialpreparation.yaml @@ -0,0 +1,8 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8093/event" + dataSourceRefs: + - "MaterialPreparation" diff --git a/config/sink/group/http-sink-packaging.yaml b/config/sink/group/http-sink-packaging.yaml new file mode 100644 index 0000000..f4bf4dd --- /dev/null +++ b/config/sink/group/http-sink-packaging.yaml @@ -0,0 +1,8 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8094/event" + dataSourceRefs: + - "Packaging" \ No newline at end of file diff --git a/config/sink/group/http-sink-qualitycontrol.yaml b/config/sink/group/http-sink-qualitycontrol.yaml new file mode 100644 index 0000000..e5a0161 --- /dev/null +++ b/config/sink/group/http-sink-qualitycontrol.yaml @@ -0,0 +1,8 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8095/event" + dataSourceRefs: + - "QualityControl" \ No newline at end of file diff --git a/config/sink/group/http-sink-shipping.yaml b/config/sink/group/http-sink-shipping.yaml new file mode 100644 index 0000000..670fa82 --- /dev/null +++ b/config/sink/group/http-sink-shipping.yaml @@ -0,0 +1,8 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8096/event" + dataSourceRefs: + - "Shipping" diff --git a/config/sink/http-sink.yaml b/config/sink/http-sink.yaml new file mode 100644 index 0000000..a5942c0 --- /dev/null +++ b/config/sink/http-sink.yaml @@ -0,0 +1,14 @@ +kind: sink +name: http-sink +spec: + type: http + id: "Sensor" + url: "http://localhost:8082/event" + dataSourceRefs: + - "GoodsDelivery" + - "MaterialPreparation" + - "AssemblyLineSetup" + - "Assembling" + - "QualityControl" + - "Packaging" + - "Shipping" diff --git a/config/sink/traffic-fine.yaml b/config/sink/traffic-fine.yaml new file mode 100644 index 0000000..79675bd --- /dev/null +++ b/config/sink/traffic-fine.yaml @@ -0,0 +1,17 @@ +kind: sink +name: console-sink +spec: + type: console + id: "Sensor" + dataSourceRefs: + - "Create Fine" + - "Send Fine" + - "Appeal to Judge" + - "Insert Fine Notification" + - "Send Appeal to Prefecture" + - "Insert Date Appeal to Prefecture" + - "Receive Result Appeal from Prefecture" + - "Payment" + - "Notify Result Appeal to Offender" + - "Send for Credit Collection" + - "Add penalty" \ No newline at end of file diff --git a/distributed_event_factory/core/abstract_datasource.py b/distributed_event_factory/core/abstract_datasource.py index b0d24da..26e2405 100644 --- a/distributed_event_factory/core/abstract_datasource.py +++ b/distributed_event_factory/core/abstract_datasource.py @@ -8,10 +8,6 @@ class DataSource(ABC): - @abstractmethod - def emit_event(self, case, activity_name, timestamp) -> None: - pass - @abstractmethod def get_event_data(self) -> EventData: pass diff --git a/distributed_event_factory/core/datasource.py b/distributed_event_factory/core/datasource.py index 90e4493..e46aa1f 100644 --- a/distributed_event_factory/core/datasource.py +++ b/distributed_event_factory/core/datasource.py @@ -27,16 +27,5 @@ def get_event_provider(self): def get_event_data(self): return self.event_provider.get_event_data() - def emit_event(self, case, activity_name, timestamp) -> Event: - event = Event( - timestamp=timestamp.strftime("%Y-%m-%d %H:%M:%S"), - sensor_value=activity_name, - case_id=case, - sensor_name=self.sensor_id.get_name(), - group_id=self.group_id - ) - self.event_log.append(event) - return event - def get_event_log(self) -> List[Event]: return self.event_log diff --git a/distributed_event_factory/core/datasource_id.py b/distributed_event_factory/core/datasource_id.py index 58ab972..b98ddef 100644 --- a/distributed_event_factory/core/datasource_id.py +++ b/distributed_event_factory/core/datasource_id.py @@ -14,3 +14,4 @@ def get_name(self): START_SENSOR_ID = DataSourceId("") END_DATA_SOURCE_ID = DataSourceId("") +END_DATA_SOURCE = "" diff --git a/distributed_event_factory/event_factory.py b/distributed_event_factory/event_factory.py index e232f1a..bd5c647 100644 --- a/distributed_event_factory/event_factory.py +++ b/distributed_event_factory/event_factory.py @@ -9,6 +9,7 @@ from distributed_event_factory.parser.simulation.load.load_parser import LoadParser from distributed_event_factory.parser.sink.sink_parser import SinkParser from distributed_event_factory.provider.sink.sink_provider import Sink +from distributed_event_factory.simulation.process_simulation import DefProcessSimulator class EventFactory: @@ -16,6 +17,7 @@ def __init__(self): self.sinks = dict() self.simulations = dict() self.datasources = dict() + self.process_simulator = None self.datasources[""] = EndDataSource() self.parser = ParserRegistry() @@ -43,6 +45,10 @@ def add_selection_parser(self, key: str, parser: SinkParser): self.parser.probability_selection_parser.add_dependency(key, parser) return self + def add_process_simulator(self, process_simulator): + self.process_simulator = process_simulator + return self + def get_datasource(self, datasource_key): return self.datasources[datasource_key] @@ -79,8 +85,11 @@ def add_file(self, filename): self.add_datasource(name, parsed_object) elif kind == "sink": self.add_sink(name, parsed_object) + elif kind == "processSimulator": + self.add_process_simulator(parsed_object) + return self def run(self, hook=lambda: None): for simulation in self.simulations: - self.simulations[simulation].run_simulation(self.datasources, self.sinks, hook) \ No newline at end of file + self.simulations[simulation].run_simulation(self.process_simulator, self.datasources, self.sinks, hook) \ No newline at end of file diff --git a/distributed_event_factory/event_factory_main.py b/distributed_event_factory/event_factory_main.py index 616e1ff..0ca7a4f 100644 --- a/distributed_event_factory/event_factory_main.py +++ b/distributed_event_factory/event_factory_main.py @@ -3,6 +3,13 @@ from dotenv import load_dotenv from distributed_event_factory.event_factory import EventFactory +from distributed_event_factory.provider.data.case_provider import CaseIdProvider +from distributed_event_factory.provider.data.constant_count_provider import ConstantCountProvider +from distributed_event_factory.provider.data.count_provider import CountProvider +from distributed_event_factory.provider.data.increasing_case import IncreasingCaseIdProvider +from distributed_event_factory.simulation.process_simulation import DefProcessSimulator +from distributed_event_factory.simulation.xes_process_simulator import XesProcessSimulator +from drift_conformance_checking_sink import DriftConformanceCheckingSink if __name__ == '__main__': if "local" in sys.argv: @@ -22,5 +29,23 @@ (event_factory .add_directory(f"{content_root}/config/datasource/{datasource}") .add_file(f"{content_root}/config/simulation/{simulation}") - .add_file(f"{content_root}/config/sink/{sink}") - ).run() + #.add_file(f"{content_root}/config/sink/{sink}") + .add_process_simulator( + DefProcessSimulator( + case_id_provider=IncreasingCaseIdProvider(), + data_sources=dict(), + max_concurrent_cases=ConstantCountProvider(1) + ) + ).add_sink("Hi", DriftConformanceCheckingSink(["A","B","C","D","E"])) + .run() + # XesProcessSimulator( + # "../config/Road_Traffic_Fine_Management_Process.xes" + # ) + #DefProcessSimulator( + # case_id_provider=IncreasingCaseIdProvider(), + # data_sources=dict(), + # max_concurrent_cases=ConstantCountProvider(1) + #) + ) + #.add_sink("hello_algorithm", SayHelloAlgorithm(["GoodsDelivery"])) + #).run() diff --git a/distributed_event_factory/parser/datasource/event/activity/activity_parser.py b/distributed_event_factory/parser/datasource/event/activity/activity_parser.py index 9d555f9..e52f6be 100644 --- a/distributed_event_factory/parser/datasource/event/activity/activity_parser.py +++ b/distributed_event_factory/parser/datasource/event/activity/activity_parser.py @@ -12,6 +12,6 @@ def add_dependency(self, key: str, dependency): return self def parse(self, config): - if isinstance(config, str): - return ConstantActivityProvider(config) - return self.dependencies[config["type"]].parse(config) \ No newline at end of file + if "type" in config: + return self.dependencies[config["type"]].parse(config) + return ConstantActivityProvider(config) diff --git a/distributed_event_factory/parser/datasource/event/activity/constant_activitiy_parser.py b/distributed_event_factory/parser/datasource/event/activity/constant_activitiy_parser.py new file mode 100644 index 0000000..63e5992 --- /dev/null +++ b/distributed_event_factory/parser/datasource/event/activity/constant_activitiy_parser.py @@ -0,0 +1,11 @@ +from distributed_event_factory.parser.parser import Parser +from distributed_event_factory.provider.activity.activity_provider import ConstantActivityProvider + + +class ConstantActivityParser(Parser): + + def add_dependency(self, key: str, dependency): + pass + + def parse(self, config): + return ConstantActivityProvider(config) \ No newline at end of file diff --git a/distributed_event_factory/parser/datasource/event/activity/image_activity_parser.py b/distributed_event_factory/parser/datasource/event/activity/image_activity_parser.py new file mode 100644 index 0000000..688a11c --- /dev/null +++ b/distributed_event_factory/parser/datasource/event/activity/image_activity_parser.py @@ -0,0 +1,11 @@ +from distributed_event_factory.parser.parser import Parser +from distributed_event_factory.provider.activity.image_activity_provider import ImageActivityProvider + + +class ImageActivityParser(Parser): + + def add_dependency(self, key: str, dependency): + pass + + def parse(self, config): + return ImageActivityProvider(activity=config["activity"]) \ No newline at end of file diff --git a/distributed_event_factory/parser/parser_registry.py b/distributed_event_factory/parser/parser_registry.py index 6b4c13e..e848bf8 100644 --- a/distributed_event_factory/parser/parser_registry.py +++ b/distributed_event_factory/parser/parser_registry.py @@ -1,5 +1,7 @@ from distributed_event_factory.parser.base.constant_count_parser import ConstantCountParser from distributed_event_factory.parser.datasource.event.activity.activity_parser import ActivityParser +from distributed_event_factory.parser.datasource.event.activity.constant_activitiy_parser import ConstantActivityParser +from distributed_event_factory.parser.datasource.event.activity.image_activity_parser import ImageActivityParser from distributed_event_factory.parser.datasource.event.duration.constant_duration_parser import \ ConstantDurationParser from distributed_event_factory.parser.datasource.event.duration.duration_parser import DurationParser @@ -27,6 +29,9 @@ from distributed_event_factory.parser.simulation.load.load_parser import LoadParser from distributed_event_factory.parser.simulation.load.sinus_load_parser import SinusLoadParser from distributed_event_factory.parser.simulation.simulation_parser import SimulationParser +from distributed_event_factory.parser.simulation.simulator.def_simulator_parser import DefSimulationParser +from distributed_event_factory.parser.simulation.simulator.process_simulation_parser import ProcessSimulationParser +from distributed_event_factory.parser.simulation.simulator.xes_simulator_parser import XesSimulationParser from distributed_event_factory.parser.simulation.variant.countbased_simulation_parser import CountBasedSimulationParser from distributed_event_factory.parser.simulation.variant.loadtest_simulation_parser import LoadTestSimulationParser from distributed_event_factory.parser.simulation.variant.stream_simulation_parser import StreamSimulationParser @@ -35,10 +40,10 @@ from distributed_event_factory.parser.sink.kafka.constant_partition_parser import ConstantPartitionParser from distributed_event_factory.parser.sink.kafka.kafka_sink_parser import KafkaSinkParser from distributed_event_factory.parser.sink.kafka.partition_parser import PartitionParser +from distributed_event_factory.parser.sink.load_test_http_sink_parser import LoadTestHttpSinkParser from distributed_event_factory.parser.sink.print_console_sink_parser import PrintConsoleSinkParser from distributed_event_factory.parser.sink.sink_parser import SinkParser from distributed_event_factory.parser.sink.ui_sink_parser import UiSinkParser -from distributed_event_factory.provider.data.constant_count_provider import ConstantCountProvider from distributed_event_factory.provider.data.increasing_case import IncreasingCaseIdProvider @@ -59,15 +64,21 @@ def __init__(self): self.kafka_sink_parser = (KafkaSinkParser()).add_dependency("partition", self.partition_parser) self.console_sink_parser = (PrintConsoleSinkParser()) self.ui_sink_parser = (UiSinkParser()) + self.load_test_parser = (LoadTestHttpSinkParser()) self.http_sink_parser = (HttpSinkParser()) self.sink_parser = (SinkParser() .add_dependency("console", self.console_sink_parser) .add_dependency("ui", self.ui_sink_parser) + .add_dependency("loadtest", self.load_test_parser) .add_dependency("http", self.http_sink_parser) .add_dependency("kafka", self.kafka_sink_parser)) ########## # Activity - self.activity_parser = ActivityParser() + self.constant_activity_parser = ConstantActivityParser() + self.image_activity_parser = ImageActivityParser() + self.activity_parser = (ActivityParser() + .add_dependency("constant", self.constant_activity_parser) + .add_dependency("image", self.image_activity_parser)) # Duration self.constant_duration_parser = ConstantDurationParser() @@ -112,12 +123,21 @@ def __init__(self): self.datasource_parser = (DataSourceParser() .add_dependency("eventData", self.event_selection_parser)) + ########## # Case self.increasing_case_id_parser = IncreasingCaseIdProvider() self.case_id_parser: CaseIdParser = (CaseIdParser() .add_dependency("increasing", self.increasing_case_id_parser)) + # Process Simulation + self.xes_simulation_parser = XesSimulationParser() + self.def_simulation_parser = DefSimulationParser() + self.process_simulation_parser = (ProcessSimulationParser() + .add_dependency("xes", self.xes_simulation_parser) + .add_dependency("def", self.def_simulation_parser) + ) + # Load self.constant_load_parser = ConstantLoadParser() self.gradual_load_parser = GradualLoadParser() @@ -155,4 +175,5 @@ def __init__(self): self.kind_parser: KindParser = (KindParser() .add_dependency("sink", self.sink_parser) .add_dependency("datasource", self.datasource_parser) - .add_dependency("simulation", self.simulation_parser)) + .add_dependency("simulation", self.simulation_parser) + .add_dependency("process_simulation", self.process_simulation_parser)) diff --git a/distributed_event_factory/parser/simulation/simulator/__init__.py b/distributed_event_factory/parser/simulation/simulator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/distributed_event_factory/parser/simulation/simulator/def_simulator_parser.py b/distributed_event_factory/parser/simulation/simulator/def_simulator_parser.py new file mode 100644 index 0000000..8f38eaf --- /dev/null +++ b/distributed_event_factory/parser/simulation/simulator/def_simulator_parser.py @@ -0,0 +1,18 @@ +from distributed_event_factory.parser.parser import Parser +from distributed_event_factory.simulation.process_simulation import DefProcessSimulator + +class DefSimulationParser(Parser): + + def __init__(self): + self.dependencies = dict() + + def add_dependency(self, key: str, dependency): + self.dependencies[key] = dependency + return self + + def parse(self, config): + return DefProcessSimulator( + case_id_provider=self.dependencies["caseId"].parse(config["caseId"]), + max_concurrent_cases=self.dependencies["maxConcurrentCases"].parse(config["maxConcurrentCases"]), + data_sources=self.dependencies["dataSource"].parse(config["dataSource"]) + ) \ No newline at end of file diff --git a/distributed_event_factory/parser/simulation/simulator/process_simulation_parser.py b/distributed_event_factory/parser/simulation/simulator/process_simulation_parser.py new file mode 100644 index 0000000..fee70eb --- /dev/null +++ b/distributed_event_factory/parser/simulation/simulator/process_simulation_parser.py @@ -0,0 +1,14 @@ +from distributed_event_factory.parser.parser import Parser + + +class ProcessSimulationParser(Parser): + + def __init__(self): + self.dependencies = dict() + + def add_dependency(self, key: str, dependency): + self.dependencies[key] = dependency + return self + + def parse(self, config): + return self.dependencies[config["type"]].parse(config) diff --git a/distributed_event_factory/parser/simulation/simulator/xes_simulator_parser.py b/distributed_event_factory/parser/simulation/simulator/xes_simulator_parser.py new file mode 100644 index 0000000..8d9507c --- /dev/null +++ b/distributed_event_factory/parser/simulation/simulator/xes_simulator_parser.py @@ -0,0 +1,11 @@ +from distributed_event_factory.parser.parser import Parser +from distributed_event_factory.simulation.xes_process_simulator import XesProcessSimulator + + +class XesSimulationParser(Parser): + + def add_dependency(self, key: str, dependency): + pass + + def parse(self, config): + return XesProcessSimulator(config["filePath"]) diff --git a/distributed_event_factory/parser/simulation/variant/stream_simulation_parser.py b/distributed_event_factory/parser/simulation/variant/stream_simulation_parser.py index a2aedaa..abd42fd 100644 --- a/distributed_event_factory/parser/simulation/variant/stream_simulation_parser.py +++ b/distributed_event_factory/parser/simulation/variant/stream_simulation_parser.py @@ -12,7 +12,7 @@ def add_dependency(self, key: str, dependency): def parse(self, config): return StreamSimulation( - case_id_provider=self.dependencies["caseId"].parse(config["caseId"]), load_provider=self.dependencies["load"].parse(config["load"]), + case_id_provider=self.dependencies["caseId"].parse(config["caseId"]), max_concurrent_cases=self.dependencies["maxConcurrentCases"].parse(config["maxConcurrentCases"]) ) \ No newline at end of file diff --git a/distributed_event_factory/parser/sink/http_sink_parser.py b/distributed_event_factory/parser/sink/http_sink_parser.py index 1d31963..4e3a970 100644 --- a/distributed_event_factory/parser/sink/http_sink_parser.py +++ b/distributed_event_factory/parser/sink/http_sink_parser.py @@ -1,15 +1,12 @@ from distributed_event_factory.parser.parser import Parser -from distributed_event_factory.provider.sink.http.http_sink import LoadTestHttpSink -from distributed_event_factory.provider.sink.ui.terminal_ui_sink import UiSink - +from distributed_event_factory.provider.sink.http.http_sink import HttpSink class HttpSinkParser(Parser): def add_dependency(self, key: str, dependency): pass def parse(self, config): - return LoadTestHttpSink( + return HttpSink( url=config["url"], data_source_ref=config["dataSourceRefs"], - frame_duration=config["timeframe"] ) \ No newline at end of file diff --git a/distributed_event_factory/parser/sink/load_test_http_sink_parser.py b/distributed_event_factory/parser/sink/load_test_http_sink_parser.py new file mode 100644 index 0000000..f833b51 --- /dev/null +++ b/distributed_event_factory/parser/sink/load_test_http_sink_parser.py @@ -0,0 +1,13 @@ +from distributed_event_factory.parser.parser import Parser +from distributed_event_factory.provider.sink.loadtest.loadtest_sink import LoadTestHttpSink + +class LoadTestHttpSinkParser(Parser): + def add_dependency(self, key: str, dependency): + pass + + def parse(self, config): + return LoadTestHttpSink( + url=config["url"], + data_source_ref=config["dataSourceRefs"], + frame_duration=config["timeframe"] + ) \ No newline at end of file diff --git a/distributed_event_factory/provider/activity/image_activity_provider.py b/distributed_event_factory/provider/activity/image_activity_provider.py new file mode 100644 index 0000000..ea692da --- /dev/null +++ b/distributed_event_factory/provider/activity/image_activity_provider.py @@ -0,0 +1,19 @@ +import base64 + +import cv2 + +from distributed_event_factory.provider.activity.activity_provider import ActivityProvider + +class ImageActivityProvider(ActivityProvider): + def __init__(self, activity): + self.dependencies = dict() + self.activity = activity + + def add_dependency(self, key: str, dependency): + self.dependencies[key] = dependency + return self + + def get_activity(self): + img = cv2.imread(f"/home/hre/Repo/scalablemine/EventFactoryConfigs/Images/images/{self.activity}.jpg") + _, buffer = cv2.imencode('.jpg', img) + return base64.b64encode(buffer) \ No newline at end of file diff --git a/distributed_event_factory/provider/event/event_data.py b/distributed_event_factory/provider/event/event_data.py index f806895..129e767 100644 --- a/distributed_event_factory/provider/event/event_data.py +++ b/distributed_event_factory/provider/event/event_data.py @@ -5,10 +5,10 @@ class EventData: def __init__( - self, - duration_provider: DurationProvider, - activity_provider: ActivityProvider, - transition_provider: TransitionProvider + self, + duration_provider: DurationProvider, + activity_provider: ActivityProvider, + transition_provider: TransitionProvider ): self.duration_provider = duration_provider self.activity_provider = activity_provider diff --git a/distributed_event_factory/provider/sink/console/console_sink.py b/distributed_event_factory/provider/sink/console/console_sink.py index eba87b2..d94bb02 100644 --- a/distributed_event_factory/provider/sink/console/console_sink.py +++ b/distributed_event_factory/provider/sink/console/console_sink.py @@ -10,7 +10,6 @@ def __init__(self, id, data_source_ref): self.id = id def send(self, event: Event) -> None: - #print(event.timestamp) print("Sensor " + event.node + ": " + str(event)) def start_timeframe(self): diff --git a/distributed_event_factory/provider/sink/http/http_sink.py b/distributed_event_factory/provider/sink/http/http_sink.py index 2b24a42..2f40124 100644 --- a/distributed_event_factory/provider/sink/http/http_sink.py +++ b/distributed_event_factory/provider/sink/http/http_sink.py @@ -1,48 +1,17 @@ -import json - -from process_mining_core.datastructure.core.event import Event - -from distributed_event_factory.provider.sink.sink_provider import Sink import requests -class TimeFrame: - def __init__(self, duration): - self.duration = duration - self.events = [] - - def add_event(self, event): - self.events.append(event) +from distributed_event_factory.provider.sink.sink_provider import Sink +from process_mining_core.datastructure.core.event import Event - def toJson(self): - return json.dumps( - self, - default=lambda o: o.__dict__, - sort_keys=True, - indent=4) -class LoadTestHttpSink(Sink): +class HttpSink(Sink): - def __init__(self, url, frame_duration, data_source_ref): + def __init__(self, url, data_source_ref): super().__init__(data_source_ref) - self.timeframe = None self.url = url - self.frame_duration = frame_duration - - def start_timeframe(self): - self.timeframe = TimeFrame(duration=self.frame_duration*1000) # conversion microseconds to milliseconds - - def add_event(self, event): - self.timeframe.add_event(event) - - def end_timeframe(self): - requests.post( - url=self.url + "/register", - data=self.timeframe.toJson(), - headers={'Content-Type': 'application/json'} - ) - - def start(self): - requests.post(url=self.url + "/start") def send(self, event: Event) -> None: - self.timeframe.add_event(event) + requests.post(url=self.url, json=event.__dict__) + + def get_datasource_ref(self): + return super().get_datasource_ref() diff --git a/distributed_event_factory/provider/sink/loadtest/__init__.py b/distributed_event_factory/provider/sink/loadtest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/distributed_event_factory/provider/sink/loadtest/loadtest_sink.py b/distributed_event_factory/provider/sink/loadtest/loadtest_sink.py new file mode 100644 index 0000000..92a7aca --- /dev/null +++ b/distributed_event_factory/provider/sink/loadtest/loadtest_sink.py @@ -0,0 +1,49 @@ +import json + +from process_mining_core.datastructure.core.event import Event + +from distributed_event_factory.provider.sink.sink_provider import Sink +import requests + +class TimeFrame: + def __init__(self, duration): + self.duration = duration + self.events = [] + + def add_event(self, event): + self.events.append(event) + + def toJson(self): + return json.dumps( + self, + default=lambda o: o.__dict__, + sort_keys=True, + indent=4 + ) + +class LoadTestHttpSink(Sink): + + def __init__(self, url, frame_duration, data_source_ref): + super().__init__(data_source_ref) + self.timeframe = None + self.url = url + self.frame_duration = frame_duration + + def start_timeframe(self): + self.timeframe = TimeFrame(duration=self.frame_duration*1000) # conversion microseconds to milliseconds + + def add_event(self, event): + self.timeframe.add_event(event) + + def end_timeframe(self): + requests.post( + url=self.url + "/register", + data=self.timeframe.toJson(), + headers={'Content-Type': 'application/json'} + ) + + def start(self): + requests.post(url=self.url + "/start") + + def send(self, event: Event) -> None: + self.timeframe.add_event(event) diff --git a/distributed_event_factory/provider/sink/newsink/newsink.py b/distributed_event_factory/provider/sink/newsink/newsink.py index 9e08ccd..67a9b36 100644 --- a/distributed_event_factory/provider/sink/newsink/newsink.py +++ b/distributed_event_factory/provider/sink/newsink/newsink.py @@ -5,7 +5,7 @@ from threading import Thread, Lock from scheduled_futures import ScheduledThreadPoolExecutor -from distributed_event_factory.provider.sink.http.http_sink import TimeFrame +from distributed_event_factory.provider.sink.loadtest.http_sink import TimeFrame class NewSink: diff --git a/distributed_event_factory/provider/sink/newsink/sinkqueue.py b/distributed_event_factory/provider/sink/newsink/sinkqueue.py index 948c6d5..9225445 100644 --- a/distributed_event_factory/provider/sink/newsink/sinkqueue.py +++ b/distributed_event_factory/provider/sink/newsink/sinkqueue.py @@ -1,6 +1,6 @@ from queue import Queue -from distributed_event_factory.provider.sink.http.http_sink import TimeFrame +from distributed_event_factory.provider.sink.loadtest.http_sink import TimeFrame class SinkQueue: diff --git a/distributed_event_factory/simulation/abstract_process_simulator.py b/distributed_event_factory/simulation/abstract_process_simulator.py new file mode 100644 index 0000000..e8d77db --- /dev/null +++ b/distributed_event_factory/simulation/abstract_process_simulator.py @@ -0,0 +1,11 @@ +from abc import ABC + +from process_mining_core.datastructure.core.event import Event + +class ProcessSimulator(ABC): + + def simulate(self) -> Event: + pass + + def add_datasource(self, name, data_source): + pass \ No newline at end of file diff --git a/distributed_event_factory/simulation/abstract_simulation.py b/distributed_event_factory/simulation/abstract_simulation.py index bcb3ff7..72fc74a 100644 --- a/distributed_event_factory/simulation/abstract_simulation.py +++ b/distributed_event_factory/simulation/abstract_simulation.py @@ -2,12 +2,13 @@ class Simulation(ABC): + def __init__(self): self.datasource_sink_mapping = dict() def send_event(self, event): - if event.node in self.datasource_sink_mapping: - for sink in self.datasource_sink_mapping[event.node]: + if event.group in self.datasource_sink_mapping: + for sink in self.datasource_sink_mapping[event.group]: sink.send(event) else: print(f"Skip event. No sink configured. Event: {event}") diff --git a/distributed_event_factory/simulation/countbased.py b/distributed_event_factory/simulation/countbased.py index 3880cff..9077816 100644 --- a/distributed_event_factory/simulation/countbased.py +++ b/distributed_event_factory/simulation/countbased.py @@ -1,8 +1,11 @@ +import sys +import time +from threading import Thread + from distributed_event_factory.provider.data.case_provider import CaseIdProvider from distributed_event_factory.provider.data.count_provider import CountProvider +from distributed_event_factory.simulation.abstract_process_simulator import ProcessSimulator from distributed_event_factory.simulation.abstract_simulation import Simulation -from distributed_event_factory.simulation.process_simulation import ProcessSimulator - class CountBasedSimulation(Simulation): @@ -13,15 +16,16 @@ def __init__(self, simulation_steps: int, case_id_provider: CaseIdProvider, max_ self.sinks = dict() self.max_concurrent_cases = max_concurrent_cases - def run_simulation(self, datasources, sinks, hook=lambda: None): + def run(self, process_simulator, steps, hook): + for i in range(steps): + self.send_event(process_simulator.simulate()) + #hook(i+1) + return + + def run_simulation(self, process_simulator: ProcessSimulator, data_sources, sinks, hook=lambda: None): self.setup_datasource_sink_mapping(sinks) - process_simulator = ProcessSimulator( - case_id_provider=self.case_id_provider, - data_sources=datasources, - max_concurrent_cases=self.max_concurrent_cases - ) + for data_source in data_sources: + process_simulator.add_datasource(name=data_source, data_source=data_sources[data_source]) + self.run(process_simulator, int(self.simulation_steps), hook) + - for i in range(self.simulation_steps): - event = process_simulator.simulate() - self.send_event(event) - hook() diff --git a/distributed_event_factory/simulation/loadtest.py b/distributed_event_factory/simulation/loadtest.py index 0609eea..24c2a59 100644 --- a/distributed_event_factory/simulation/loadtest.py +++ b/distributed_event_factory/simulation/loadtest.py @@ -3,9 +3,9 @@ from distributed_event_factory.provider.data.case_provider import CaseIdProvider from distributed_event_factory.provider.data.count_provider import CountProvider from distributed_event_factory.provider.load.load_provider import LoadProvider -from distributed_event_factory.provider.sink.http.http_sink import LoadTestHttpSink +from distributed_event_factory.provider.sink.loadtest.loadtest_sink import LoadTestHttpSink from distributed_event_factory.simulation.abstract_simulation import Simulation -from distributed_event_factory.simulation.process_simulation import ProcessSimulator +from distributed_event_factory.simulation.process_simulation import DefProcessSimulator class LoadTestSimulation(Simulation): @@ -41,7 +41,7 @@ def run_simulation(self, data_sources: Dict[str, DataSource], datasource_sink_ma for sink in datasource_sink_mapping: self.sink.append(datasource_sink_mapping[sink]) - process_simulator = ProcessSimulator( + process_simulator = DefProcessSimulator( case_id_provider=self.case_id_provider, data_sources=data_sources, max_concurrent_cases=self.max_concurrent_cases diff --git a/distributed_event_factory/simulation/process_simulation.py b/distributed_event_factory/simulation/process_simulation.py index 9e7e600..b07724f 100644 --- a/distributed_event_factory/simulation/process_simulation.py +++ b/distributed_event_factory/simulation/process_simulation.py @@ -4,26 +4,30 @@ from typing import Dict from distributed_event_factory.provider.data.count_provider import CountProvider +from distributed_event_factory.simulation.abstract_process_simulator import ProcessSimulator +from process_mining_core.datastructure.core.SEvent import SEvent from process_mining_core.datastructure.core.event import Event from distributed_event_factory.core.datasource import DataSource from distributed_event_factory.core.datasource_id import START_SENSOR_ID, END_DATA_SOURCE_ID, DataSourceId from distributed_event_factory.provider.data.case_provider import CaseIdProvider -class ProcessSimulator: +class DefProcessSimulator(ProcessSimulator): def __init__( - self, - data_sources: Dict[str, DataSource], - case_id_provider: CaseIdProvider, - max_concurrent_cases: CountProvider + self, + data_sources: Dict[str, DataSource], + case_id_provider: CaseIdProvider, + max_concurrent_cases: CountProvider ): - self.max_concurrent_cases = max_concurrent_cases self.tokens: PriorityQueue[Token] = PriorityQueue(self.max_concurrent_cases.get()) self.datasources: Dict[str, DataSource] = data_sources self.case_id_provider = case_id_provider self.last_timestamp = datetime.now() + def add_datasource(self, name, data_source): + self.datasources[name] = data_source + def simulate(self) -> Event: emit_event = None while not emit_event: @@ -58,12 +62,12 @@ def start_new_case(self): def _build_event(self, case, activity, timestamp, datasource): if hasattr(datasource, "sensor_id"): - return Event( + return SEvent( timestamp=timestamp.strftime("%Y-%m-%d %H:%M:%S"), activity=activity, - case_id=case, + caseid=case, node=datasource.sensor_id.get_name(), - group_id=datasource.group_id + group=datasource.group_id ) def _get_sensor_with_id(self, data_source_id) -> DataSource: diff --git a/distributed_event_factory/simulation/stream.py b/distributed_event_factory/simulation/stream.py index f4cddcb..510f008 100644 --- a/distributed_event_factory/simulation/stream.py +++ b/distributed_event_factory/simulation/stream.py @@ -4,8 +4,8 @@ from distributed_event_factory.provider.data.case_provider import CaseIdProvider from distributed_event_factory.provider.data.count_provider import CountProvider from distributed_event_factory.provider.load.load_provider import LoadProvider +from distributed_event_factory.simulation.abstract_process_simulator import ProcessSimulator from distributed_event_factory.simulation.abstract_simulation import Simulation -from distributed_event_factory.simulation.process_simulation import ProcessSimulator class StreamSimulation(Simulation): def __init__( @@ -20,13 +20,8 @@ def __init__( self.max_concurrent_cases=max_concurrent_cases self.sinks = dict() - def run_simulation(self, datasources, sinks, hook): + def run_simulation(self, process_simulator, datasources, sinks, hook): self.setup_datasource_sink_mapping(sinks) - process_simulator = ProcessSimulator( - case_id_provider=self.case_id_provider, - data_sources=datasources, - max_concurrent_cases=self.max_concurrent_cases - ) while True: with ScheduledThreadPoolExecutor() as executor: scheduler = executor.schedule( diff --git a/distributed_event_factory/simulation/xes_process_simulator.py b/distributed_event_factory/simulation/xes_process_simulator.py new file mode 100644 index 0000000..ec4e5b7 --- /dev/null +++ b/distributed_event_factory/simulation/xes_process_simulator.py @@ -0,0 +1,27 @@ +import pm4py + +from distributed_event_factory.simulation.abstract_process_simulator import ProcessSimulator +from process_mining_core.datastructure.core.SEvent import SEvent + +class XesProcessSimulator(ProcessSimulator): + + def __init__(self, xes_file_path): + self.log = pm4py.read_xes(xes_file_path) + self.length = len(self.log) + self.index = 0 + self.nodes = set() + + def simulate(self) -> SEvent: + i = self.index % self.length + node = self.log["org:resource"][i] + event = SEvent( + activity=self.log["concept:name"][i], + caseid=self.log["case"][i], + timestamp=self.log["time:timestamp"][i], + group=self.log["org:resource"][i], + node=self.log["org:resource"][i], + ) + self.index = self.index + 1 + self.nodes.add(node) + print(self.nodes) + return event \ No newline at end of file diff --git a/distributed_event_factory/xes_test.py b/distributed_event_factory/xes_test.py new file mode 100644 index 0000000..f218558 --- /dev/null +++ b/distributed_event_factory/xes_test.py @@ -0,0 +1,18 @@ +import pm4py + +from process_mining_core.datastructure.core.event import Event + +log = pm4py.read_xes("../config/Road_Traffic_Fine_Management_Process.xes") +activities = set() +for i in range(len(log)): + event = Event( + activity=log["concept:name"][i], + case_id=log["case:concept:name"][i], + timestamp=log["time:timestamp"][i], + group_id=log["concept:name"][i], + node=log["concept:name"][i], + ) + #print(event) + activities.add(log["concept:name"][i]) +print("Done") +print(activities) \ No newline at end of file