Skip to content

Commit 51c5f72

Browse files
Lower requirements for consumer state machine (#154)
* lower requirements for kafka consumer * add twin test for kafka producer
1 parent 3987647 commit 51c5f72

File tree

3 files changed

+98
-2
lines changed

3 files changed

+98
-2
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ public final class KafkaConsumer: Sendable, Service {
319319
)
320320
}
321321
try client.subscribe(topicPartitionList: subscription)
322+
case .consumerClosed:
323+
throw KafkaError.connectionClosed(reason: "Consumer deinitialized before setup")
322324
}
323325
}
324326

@@ -339,6 +341,8 @@ public final class KafkaConsumer: Sendable, Service {
339341
let assignment = RDKafkaTopicPartitionList()
340342
assignment.setOffset(topic: topic, partition: partition, offset: Int64(offset.rawValue))
341343
try client.assign(topicPartitionList: assignment)
344+
case .consumerClosed:
345+
throw KafkaError.connectionClosed(reason: "Consumer deinitialized before setup")
342346
}
343347
}
344348

@@ -723,6 +727,8 @@ extension KafkaConsumer {
723727
/// Set up the connection through ``subscribe()`` or ``assign()``.
724728
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
725729
case setUpConnection(client: RDKafkaClient)
730+
/// The ``KafkaConsumer`` is closed.
731+
case consumerClosed
726732
}
727733

728734
/// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``.
@@ -737,8 +743,10 @@ extension KafkaConsumer {
737743
return .setUpConnection(client: client)
738744
case .running:
739745
fatalError("\(#function) should not be invoked more than once")
740-
case .finishing, .finished:
746+
case .finishing:
741747
fatalError("\(#function) should only be invoked when KafkaConsumer is running")
748+
case .finished:
749+
return .consumerClosed
742750
}
743751
}
744752

@@ -890,7 +898,7 @@ extension KafkaConsumer {
890898
case .uninitialized:
891899
fatalError("\(#function) invoked while still in state \(self.state)")
892900
case .initializing:
893-
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
901+
self.state = .finished
894902
case .running(let client, _):
895903
self.state = .running(client: client, messagePollLoopState: .finished)
896904
case .finishing, .finished:

Tests/KafkaTests/KafkaConsumerTests.swift

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,52 @@ final class KafkaConsumerTests: XCTestCase {
127127
let value = try metrics.expectGauge("operations").lastValue
128128
XCTAssertNotNil(value)
129129
}
130+
131+
func testConsumerConstructDeinit() async throws {
132+
let uniqueGroupID = UUID().uuidString
133+
let config = KafkaConsumerConfiguration(
134+
consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]),
135+
bootstrapBrokerAddresses: []
136+
)
137+
138+
_ = try KafkaConsumer(configuration: config, logger: .kafkaTest) // deinit called before run
139+
_ = try KafkaConsumer.makeConsumerWithEvents(configuration: config, logger: .kafkaTest)
140+
}
141+
142+
func testConsumerMessagesReadCancelledBeforeRun() async throws {
143+
let uniqueGroupID = UUID().uuidString
144+
let config = KafkaConsumerConfiguration(
145+
consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]),
146+
bootstrapBrokerAddresses: []
147+
)
148+
149+
let consumer = try KafkaConsumer(configuration: config, logger: .kafkaTest)
150+
151+
let svcGroupConfig = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest)
152+
let serviceGroup = ServiceGroup(configuration: svcGroupConfig)
153+
154+
// explicitly run and cancel message consuming task before serviceGroup.run()
155+
let consumingTask = Task {
156+
for try await record in consumer.messages {
157+
XCTFail("Unexpected record \(record))")
158+
}
159+
}
160+
161+
try await Task.sleep(for: .seconds(1))
162+
163+
// explicitly cancel message consuming task before serviceGroup.run()
164+
consumingTask.cancel()
165+
166+
try await withThrowingTaskGroup(of: Void.self) { group in
167+
// Run Task
168+
group.addTask {
169+
try await serviceGroup.run()
170+
}
171+
172+
try await Task.sleep(for: .seconds(1))
173+
174+
// Shutdown the serviceGroup
175+
await serviceGroup.triggerGracefulShutdown()
176+
}
177+
}
130178
}

Tests/KafkaTests/KafkaProducerTests.swift

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,4 +377,44 @@ final class KafkaProducerTests: XCTestCase {
377377
let value = try metrics.expectGauge("operations").lastValue
378378
XCTAssertNotNil(value)
379379
}
380+
381+
func testProducerConstructDeinit() async throws {
382+
let config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
383+
384+
_ = try KafkaProducer(configuration: config, logger: .kafkaTest) // deinit called before run
385+
_ = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest) // deinit called before run
386+
}
387+
388+
func testProducerEventsReadCancelledBeforeRun() async throws {
389+
let config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
390+
391+
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest)
392+
393+
let svcGroupConfig = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest)
394+
let serviceGroup = ServiceGroup(configuration: svcGroupConfig)
395+
396+
// explicitly run and cancel message consuming task before serviceGroup.run()
397+
let producerEventsTask = Task {
398+
for try await event in events {
399+
XCTFail("Unexpected record \(event))")
400+
}
401+
}
402+
403+
try await Task.sleep(for: .seconds(1))
404+
405+
// explicitly cancel message consuming task before serviceGroup.run()
406+
producerEventsTask.cancel()
407+
408+
try await withThrowingTaskGroup(of: Void.self) { group in
409+
// Run Task
410+
group.addTask {
411+
try await serviceGroup.run()
412+
}
413+
414+
try await Task.sleep(for: .seconds(1))
415+
416+
// Shutdown the serviceGroup
417+
await serviceGroup.triggerGracefulShutdown()
418+
}
419+
}
380420
}

0 commit comments

Comments
 (0)