Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.LinkedCaseInsensitiveMap;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -112,6 +113,58 @@ public void testProcessStartWithHeaders() {
);
}

@Test
@Deployment(resources = "org/flowable/engine/test/eventregistry/BpmnHeaderEventRegistryConsumerTest.testProcessStartWithHeaders.bpmn20.xml")
public void testProcessStartWithMissingHeaders() {
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery().processDefinitionKey("process").singleResult();
assertThat(processDefinition).isNotNull();

EventSubscription eventSubscription = runtimeService.createEventSubscriptionQuery()
.processDefinitionId(processDefinition.getId())
.scopeType(ScopeTypes.BPMN)
.singleResult();
assertThat(eventSubscription).isNotNull();
assertThat(eventSubscription.getEventType()).isEqualTo("myEvent");

assertThat(runtimeService.createProcessInstanceQuery().list()).isEmpty();

inboundEventChannelAdapter.triggerTestEventWithHeaders("payloadStartCustomer", null, null);
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processDefinitionKey("process").singleResult();
assertThat(runtimeService.getVariables(processInstance.getId()))
.containsOnly(
entry("customerIdVar", "payloadStartCustomer"),
entry("payload1", "Hello World"),
entry("myHeaderValue1", null),
entry("myHeaderValue2", null)
);
}

@Test
@Deployment(resources = "org/flowable/engine/test/eventregistry/BpmnHeaderEventRegistryConsumerTest.testProcessStartWithHeaders.bpmn20.xml")
public void testProcessStartWithCaseInsensitiveEventHeaders() {
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery().processDefinitionKey("process").singleResult();
assertThat(processDefinition).isNotNull();

EventSubscription eventSubscription = runtimeService.createEventSubscriptionQuery()
.processDefinitionId(processDefinition.getId())
.scopeType(ScopeTypes.BPMN)
.singleResult();
assertThat(eventSubscription).isNotNull();
assertThat(eventSubscription.getEventType()).isEqualTo("myEvent");

assertThat(runtimeService.createProcessInstanceQuery().list()).isEmpty();

inboundEventChannelAdapter.triggerTestEventWithCaseInsensitiveHeaders("payloadStartCustomer", "testHeader", 1234);
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processDefinitionKey("process").singleResult();
assertThat(runtimeService.getVariables(processInstance.getId()))
.containsOnly(
entry("customerIdVar", "payloadStartCustomer"),
entry("payload1", "Hello World"),
entry("myHeaderValue1", "testHeader"),
entry("myHeaderValue2", 1234)
);
}

private static class TestInboundEventChannelAdapter implements InboundEventChannelAdapter {

public InboundChannelModel inboundChannelModel;
Expand All @@ -135,8 +188,25 @@ public void setEventRegistry(EventRegistry eventRegistry) {
public void triggerTestEventWithHeaders(String customerId, String headerValue1, Integer headerValue2) {
ObjectNode eventNode = createTestEventNode(customerId, null);
Map<String, Object> headers = new HashMap<>();
headers.put("headerProperty1", headerValue1);
headers.put("headerProperty2", headerValue2);
if (headerValue1 != null) {
headers.put("headerProperty1", headerValue1);
}
if (headerValue2 != null) {
headers.put("headerProperty2", headerValue2);
}
try {
String event = objectMapper.writeValueAsString(eventNode);
eventRegistry.eventReceived(inboundChannelModel, new DefaultInboundEvent(event, headers));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public void triggerTestEventWithCaseInsensitiveHeaders(String customerId, String headerValue1, Integer headerValue2) {
ObjectNode eventNode = createTestEventNode(customerId, null);
Map<String, Object> headers = new LinkedCaseInsensitiveMap<>();
headers.put("HeaderProperty1", headerValue1);
headers.put("HeaderProperty2", headerValue2);
try {
String event = objectMapper.writeValueAsString(eventNode);
eventRegistry.eventReceived(inboundChannelModel, new DefaultInboundEvent(event, headers));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
package org.flowable.eventregistry.impl.payload;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import org.flowable.eventregistry.api.FlowableEventInfo;
import org.flowable.eventregistry.api.InboundEventInfoAwarePayloadExtractor;
Expand All @@ -35,52 +34,42 @@ public Collection<EventPayloadInstance> extractPayload(EventModel eventModel, Fl
if (headers.isEmpty()) {
return Collections.emptyList();
}
Map<String, Object> filteredHeaders = convertHeaderValues(event, eventModel);
return headers.stream()
.filter(headerDefinition -> filteredHeaders.containsKey(headerDefinition.getName()))
.map(headerDefinition -> new EventPayloadInstanceImpl(headerDefinition, filteredHeaders.get(headerDefinition.getName())))
.collect(Collectors.toList());
}

protected Map<String, Object> convertHeaderValues(FlowableEventInfo<T> eventInfo, EventModel eventModel) {
Map<String, Object> filteredHeaders = new HashMap<>();
if (eventInfo.getHeaders() != null) {
Map<String, Object> headers = eventInfo.getHeaders();
for (String headerName : headers.keySet()) {
EventPayload eventHeaderDef = eventModel.getPayload(headerName);
if (eventHeaderDef != null && eventHeaderDef.isHeader()) {
Object headerValueObject = headers.get(headerName);
if (headerValueObject instanceof byte[]) {
byte[] headerValue = (byte[]) headers.get(headerName);
convertBytesHeaderValue(headerName, headerValue, filteredHeaders, eventHeaderDef);
} else {
filteredHeaders.put(headerName, headerValueObject);
}
Map<String, Object> eventHeaders = event.getHeaders();
if (eventHeaders == null || eventHeaders.isEmpty()) {
return Collections.emptyList();
}
Collection<EventPayloadInstance> eventPayloadHeaderInstances = new ArrayList<>(headers.size());
for (EventPayload header : headers) {
if (eventHeaders.containsKey(header.getName())) {
Object headerValueObject = eventHeaders.get(header.getName());
Object headerValue = headerValueObject;
if (headerValueObject instanceof byte[] headerValueBytes) {
headerValue = convertBytesHeaderValue(headerValueBytes, header);
}
eventPayloadHeaderInstances.add(new EventPayloadInstanceImpl(header, headerValue));
}
}

return filteredHeaders;
return eventPayloadHeaderInstances;
}
protected void convertBytesHeaderValue(String headerName, byte[] headerValue, Map<String, Object> filteredHeaders, EventPayload eventHeaderDef) {

protected Object convertBytesHeaderValue(byte[] headerValue, EventPayload eventHeaderDef) {
if (EventPayloadTypes.STRING.equals(eventHeaderDef.getType())) {
filteredHeaders.put(headerName, convertBytesToString(headerValue));
return convertBytesToString(headerValue);

} else if (EventPayloadTypes.DOUBLE.equals(eventHeaderDef.getType())) {
filteredHeaders.put(headerName, Double.valueOf(convertBytesToString(headerValue)));
return Double.valueOf(convertBytesToString(headerValue));

} else if (EventPayloadTypes.INTEGER.equals(eventHeaderDef.getType())) {
filteredHeaders.put(headerName, Integer.valueOf(convertBytesToString(headerValue)));
return Integer.valueOf(convertBytesToString(headerValue));

} else if (EventPayloadTypes.LONG.equals(eventHeaderDef.getType())) {
filteredHeaders.put(headerName, Long.valueOf(convertBytesToString(headerValue)));
return Long.valueOf(convertBytesToString(headerValue));

} else if (EventPayloadTypes.BOOLEAN.equals(eventHeaderDef.getType())) {
filteredHeaders.put(headerName, Boolean.valueOf(convertBytesToString(headerValue)));
return Boolean.valueOf(convertBytesToString(headerValue));

} else {
filteredHeaders.put(headerName, headerValue);
return headerValue;
}
}

Expand Down