Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/kafkaesque.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule Kafkaesque do
use the library. See the documentation of the `Kafkaesque` module for more
information.
"""
@spec publish(Ecto.Repo.t(), String.t(), term(), String.t(), String.t()) ::
@spec publish(Ecto.Repo.t(), String.t(), integer(), String.t(), term()) ::
{:ok, Message.t()} | {:error, Ecto.Changeset.t()}
def publish(repo, topic, partition, key, payload) do
message = Message.new(topic, partition, key, payload)
Expand Down Expand Up @@ -118,7 +118,7 @@ defmodule Kafkaesque do
Kafkaesque.publish(unquote(repo), topic, partition, key, payload)
end

@spec encode(term()) :: String.t()
@spec encode(term()) :: term()
def encode(body) do
body
end
Expand Down
3 changes: 2 additions & 1 deletion lib/kafkaesque/clients/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ defmodule Kafkaesque.Clients.BrodClient do
@impl Kafkaesque.Client
def publish(%{brod_client_id: client_id, task_supervisor: task_supervisor}, messages) do
# We pre-process the message bodies to avoid copying unecessary data to the task
message_batches = Enum.group_by(messages, &{&1.partition, &1.topic}, &{&1.key, &1.body})
message_batches =
Enum.group_by(messages, &{&1.partition, &1.topic}, & &1.body)

task_results =
task_supervisor
Expand Down
18 changes: 15 additions & 3 deletions lib/kafkaesque/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ defmodule Kafkaesque.Message do
use Ecto.Schema
import Ecto.Changeset

defmodule Data do
use Ecto.Type

def type, do: :binary

def cast(term), do: {:ok, term}

def load(data) when is_binary(data), do: {:ok, :erlang.binary_to_term(data)}

def dump(term), do: {:ok, :erlang.term_to_binary(term)}
end

@type state() ::
:failed
| :pending
Expand All @@ -16,7 +28,7 @@ defmodule Kafkaesque.Message do
state: state(),
topic: String.t(),
partition: integer(),
body: String.t(),
body: term(),
attempt: pos_integer(),
attempted_by: String.t() | nil,
attempted_at: NaiveDateTime.t() | nil,
Expand All @@ -35,7 +47,7 @@ defmodule Kafkaesque.Message do

field(:partition, :integer)
field(:key, :string, default: "")
field(:body, :string)
field(:body, __MODULE__.Data, default: %{})
field(:attempt, :integer, default: 0)
field(:attempted_by, :string)

Expand All @@ -45,7 +57,7 @@ defmodule Kafkaesque.Message do
timestamps()
end

@spec new(String.t(), String.t(), String.t(), String.t()) :: Ecto.Changeset.t()
@spec new(String.t(), integer(), String.t(), term()) :: Ecto.Changeset.t()
def new(topic, partition, key, body) do
%__MODULE__{}
|> cast(%{topic: topic, partition: partition, body: body, key: key}, [
Expand Down
2 changes: 1 addition & 1 deletion test/kafkaesque/message_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Kafkaesque.MessageTest do
key = 2
partition = "notanumber"

assert %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]} =
assert %Ecto.Changeset{errors: [topic: _, partition: _, key: _]} =
Message.new(topic, partition, key, body)
end
end
Expand Down
23 changes: 19 additions & 4 deletions test/kafkaesque_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,37 @@ defmodule KafkaesqueTest do

describe "publish/4" do
test "inserts valid messages" do
{:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "", "content")
{:ok,
%Kafkaesque.Message{
key: "test_key",
body: "content",
state: :pending,
topic: "test_topic"
}} =
Kafkaesque.publish(Repo, "test_topic", 0, "test_key", "content")

{:ok,
%Kafkaesque.Message{
key: "test_key",
body: %{test: "content"},
state: :pending,
topic: "test_topic"
}} =
Kafkaesque.publish(Repo, "test_topic", 0, "test_key", %{test: "content"})
end

test "errors for invalid messages" do
invalid_topic = 1
invalid_body = {1, 2}
invalid_key = 2
invalid_partition = "notanumber"

assert {:error, %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]}} =
assert {:error, %Ecto.Changeset{errors: [topic: _, partition: _, key: _]}} =
Kafkaesque.publish(
Repo,
invalid_topic,
invalid_partition,
invalid_key,
invalid_body
%{}
)
end
end
Expand Down