From cc96faa5f1f2d7ddb50fc0f2ed1f3d6aa8a0eb84 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Mon, 9 Oct 2023 12:31:50 +0530 Subject: [PATCH 1/2] PD-220325: megabus logs for provisioning issue --- .../bazaarvoice/megabus/refproducer/MegabusRefProducer.java | 5 +++++ .../megabus/tableevents/TableEventProcessor.java | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java b/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java index 06eb9e2604..9392a77315 100644 --- a/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java +++ b/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java @@ -59,6 +59,8 @@ public class MegabusRefProducer extends AbstractScheduledService { private final Meter _eventMeter; private final Meter _errorMeter; + private static final Logger logger = LoggerFactory.getLogger(MegabusRefProducer.class); + public MegabusRefProducer(MegabusRefProducerConfiguration config, DatabusEventStore eventStore, RateLimitedLogFactory logFactory, MetricRegistry metricRegistry, Producer producer, ObjectMapper objectMapper, Topic topic, @@ -147,6 +149,9 @@ boolean peekAndAckEvents() { .map(ref -> new MegabusRef(ref.getTable(), ref.getKey(), ref.getChangeId(), _clock.instant(), MegabusRef.RefType.NORMAL)) .collect(Collectors.groupingBy(ref -> { String key = Coordinate.of(ref.getTable(), ref.getKey()).toString(); + if(ref.getTable().contains("apikey")){ + logger.info("debugging mega-bus delay while provisioning apikey 2: = {}",key); + } return Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions(); }, Collectors.toList())) .entrySet() diff --git a/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java b/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java index 178038ee3a..8526a28b9c 100644 --- a/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java +++ b/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java @@ -37,7 +37,7 @@ public class TableEventProcessor extends AbstractScheduledService { private static final int FUTURE_BATCH_SIZE = 10000; - private static final Logger _log = LoggerFactory.getLogger(TableEventProcessor.class); + private static final Logger logger = LoggerFactory.getLogger(TableEventProcessor.class); private final TableEventRegistry _tableEventRegistry; private final MetricRegistry _metricRegistry; @@ -80,11 +80,13 @@ protected void runOneIteration() throws Exception { } private void processTableEvent(String table, String uuid, MegabusRef.RefType refType) { - Iterator> futures = _tableEventTools.getIdsForStorage(table, uuid) .map(key -> new MegabusRef(table, key, TimeUUIDs.minimumUuid(), null, refType)) .map(ref -> { String key = Coordinate.of(ref.getTable(), ref.getKey()).toString(); + if(table.contains("apikey")){ + logger.info("debugging mega-bus delay while provisioning apikey 1: = {}",key); + } return new ProducerRecord(_topic.getName(), Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions(), TimeUUIDs.newUUID().toString(),_objectMapper.valueToTree(Collections.singletonList(ref))); From 852b5642a98ffad77fa2805b167f2881a1a3368b Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Mon, 9 Oct 2023 14:06:41 +0530 Subject: [PATCH 2/2] PD-220325: added class to log --- .../com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java | 2 +- .../bazaarvoice/megabus/tableevents/TableEventProcessor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java b/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java index 9392a77315..77c5b63a89 100644 --- a/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java +++ b/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java @@ -150,7 +150,7 @@ boolean peekAndAckEvents() { .collect(Collectors.groupingBy(ref -> { String key = Coordinate.of(ref.getTable(), ref.getKey()).toString(); if(ref.getTable().contains("apikey")){ - logger.info("debugging mega-bus delay while provisioning apikey 2: = {}",key); + logger.info("debugging mega-bus delay while provisioning apikey in MegabusRefProducer: = {}",key); } return Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions(); }, Collectors.toList())) diff --git a/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java b/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java index 8526a28b9c..6f713d26ee 100644 --- a/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java +++ b/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java @@ -85,7 +85,7 @@ private void processTableEvent(String table, String uuid, MegabusRef.RefType ref .map(ref -> { String key = Coordinate.of(ref.getTable(), ref.getKey()).toString(); if(table.contains("apikey")){ - logger.info("debugging mega-bus delay while provisioning apikey 1: = {}",key); + logger.info("debugging mega-bus delay while provisioning apikey in TableEventProcessor: = {}",key); } return new ProducerRecord(_topic.getName(), Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions(),