@@ -4365,6 +4365,85 @@ details.
4365
4365
4366
4366
## quixstreams.sinks.community.file
4367
4367
4368
+ <a id="quixstreams.sinks.community.mongodb"></a>
4369
+
4370
+ ## quixstreams.sinks.community.mongodb
4371
+
4372
+ <a id="quixstreams.sinks.community.mongodb.MongoDBSink"></a>
4373
+
4374
+ ### MongoDBSink
4375
+
4376
+ ```python
4377
+ class MongoDBSink(BatchingSink)
4378
+ ```
4379
+
4380
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/mongodb.py#L65)
4381
+
4382
+ <a id="quixstreams.sinks.community.mongodb.MongoDBSink.__init__"></a>
4383
+
4384
+ #### MongoDBSink.\_\_init\_\_
4385
+
4386
+ ```python
4387
+ def __init__(
4388
+ url: str,
4389
+ db: str,
4390
+ collection: str,
4391
+ document_matcher: Callable[[SinkItem],
4392
+ MongoQueryFilter] = _default_document_matcher,
4393
+ update_method: Literal["UpdateOne", "UpdateMany",
4394
+ "ReplaceOne"] = "UpdateOne",
4395
+ upsert: bool = True,
4396
+ add_message_metadata: bool = False,
4397
+ add_topic_metadata: bool = False,
4398
+ value_selector: Optional[Callable[[MongoValue],
4399
+ MongoValue]] = None) -> None
4400
+ ```
4401
+
4402
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/mongodb.py#L66)
4403
+
4404
+ A connector to sink processed data to MongoDB in batches.
4405
+
4406
+ **Arguments**:
4407
+
4408
+ - `url`: MongoDB url; most commonly `mongodb://username:password@host:port`
4409
+ - `db`: MongoDB database name
4410
+ - `collection`: MongoDB collection name
4411
+ - `document_matcher`: How documents are selected to update.
4412
+ A callable that accepts a `BatchItem` and returns a MongoDB "query filter".
4413
+ If no match, will insert if `upsert=True`, where `_id` will be either the
4414
+ included value if specified, else a random `ObjectId`.
4415
+ - Default: matches on `_id`, with `_id` assumed to be the kafka key.
4416
+ - `upsert`: Create documents if no matches with `document_matcher`.
4417
+ - `update_method`: How documents found with `document_matcher` are updated.
4418
+ 'Update*' options will only update fields included in the kafka message.
4419
+ 'Replace*' option fully replaces the document with the contents of kafka message.
4420
+ "UpdateOne": Updates the first matching document (usually based on `_id`).
4421
+ "UpdateMany": Updates ALL matching documents (usually NOT based on `_id`).
4422
+ "ReplaceOne": Replaces the first matching document (usually based on `_id`).
4423
+ Default: "UpdateOne".
4424
+ - `add_message_metadata`: add key, timestamp, and headers as `__{field}`
4425
+ - `add_topic_metadata`: add topic, partition, and offset as `__{field}`
4426
+ - `value_selector`: An optional callable that allows final editing of the
4427
+ outgoing document (right before submitting it).
4428
+ Largely used when a field is necessary for `document_matcher`,
4429
+ but not otherwise.
4430
+ NOTE: metadata is added before this step, so don't accidentally
4431
+ exclude it here!
4432
+
4433
+ <a id="quixstreams.sinks.community.mongodb.MongoDBSink.write"></a>
4434
+
4435
+ #### MongoDBSink.write
4436
+
4437
+ ```python
4438
+ def write(batch: SinkBatch) -> None
4439
+ ```
4440
+
4441
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/mongodb.py#L138)
4442
+
4443
+ Note: Transactions could be an option here, but then each record requires a
4444
+ network call, and the transaction has size limits...so `bulk_write` is used
4445
+ instead, with the downside that duplicate writes may occur if errors arise.
4446
+
4368
4447
<a id="quixstreams.sinks.community.bigquery"></a>
4369
4448
4370
4449
## quixstreams.sinks.community.bigquery
@@ -4530,6 +4609,95 @@ because the batch size was less than 500. It waits for all futures to
4530
4609
complete, ensuring that all records are successfully sent to the Kinesis
4531
4610
stream.
4532
4611
4612
+ <a id="quixstreams.sinks.community.neo4j"></a>
4613
+
4614
+ ## quixstreams.sinks.community.neo4j
4615
+
4616
+ <a id="quixstreams.sinks.community.neo4j.Neo4jSink"></a>
4617
+
4618
+ ### Neo4jSink
4619
+
4620
+ ```python
4621
+ class Neo4jSink(BatchingSink)
4622
+ ```
4623
+
4624
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/neo4j.py#L30)
4625
+
4626
+ <a id="quixstreams.sinks.community.neo4j.Neo4jSink.__init__"></a>
4627
+
4628
+ #### Neo4jSink.\_\_init\_\_
4629
+
4630
+ ```python
4631
+ def __init__(host: str,
4632
+ port: int,
4633
+ username: str,
4634
+ password: str,
4635
+ cypher_query: str,
4636
+ chunk_size: int = 10000,
4637
+ **kwargs) -> None
4638
+ ```
4639
+
4640
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/neo4j.py#L31)
4641
+
4642
+ A connector to sink processed data to Neo4j.
4643
+
4644
+ **Arguments**:
4645
+
4646
+ - `host`: The Neo4j database hostname.
4647
+ - `port`: The Neo4j database port.
4648
+ - `username`: The Neo4j database username.
4649
+ - `password`: The Neo4j database password.
4650
+ - `cypher_query`: A Cypher Query to execute on each record.
4651
+ Behavior attempts to match other Neo4j connectors:
4652
+ - Uses "dot traversal" for (nested) dict key access; ex: "col_x.col_y.col_z"
4653
+ - Message value is bound to the alias "event"; ex: "event.field_a".
4654
+ - Message key, value, header and timestamp are bound to "__{attr}"; ex: "__key".
4655
+ - `chunk_size`: Adjust the size of a Neo4j transactional chunk.
4656
+ - This does NOT affect how many records can be written/flushed at once.
4657
+ - The chunks are committed only if ALL of them succeed.
4658
+ - Larger chunks are generally more efficient, but can encounter size issues.
4659
+ - This is only necessary to adjust when messages are especially large.
4660
+ - `kwargs`: Additional keyword arguments passed to the
4661
+ `neo4j.GraphDatabase.driver` instance.
4662
+
4663
+ Example Usage:
4664
+
4665
+ ```
4666
+ from quixstreams import Application
4667
+ from quixstreams.sinks.community.neo4j import Neo4jSink
4668
+
4669
+ app = Application(broker_address="localhost:9092")
4670
+ topic = app.topic("topic-name")
4671
+
4672
+ # records structured as:
4673
+ # {"name": {"first": "John", "last": "Doe"}, "age": 28, "city": "Los Angeles"}
4674
+
4675
+ # This assumes the given City nodes exist.
4676
+ # Notice the use of "event" to reference the message value.
4677
+ # Could also do things like __key, or __value.name.first.
4678
+ cypher_query = '''
4679
+ MERGE (p:Person {first_name: event.name.first, last_name: event.name.last})
4680
+ SET p.age = event.age
4681
+ MERGE (c:City {name: event.city})
4682
+ MERGE (p)-[:LIVES_IN]->(c)
4683
+ '''
4684
+
4685
+ # Configure the sink
4686
+ neo4j_sink = Neo4jSink(
4687
+ host="localhost",
4688
+ port=7687,
4689
+ username="neo4j",
4690
+ password="local_password",
4691
+ cypher_query=cypher_query,
4692
+ )
4693
+
4694
+ sdf = app.dataframe(topic=topic)
4695
+ sdf.sink(neo4j_sink)
4696
+
4697
+ if __name__ == "__main__":
4698
+ app.run()
4699
+ ```
4700
+
4533
4701
<a id="quixstreams.sinks.community"></a>
4534
4702
4535
4703
## quixstreams.sinks.community
0 commit comments