From 71fbe9b2ab2d06f2a0a2327881bcace2be3025c6 Mon Sep 17 00:00:00 2001 From: Matt Sutkowski Date: Sun, 21 Jan 2024 08:18:01 -0800 Subject: [PATCH] more flexible body handling --- lib/kafkaesque.ex | 4 ++-- lib/kafkaesque/clients/brod_client.ex | 3 ++- lib/kafkaesque/message.ex | 18 +++++++++++++++--- test/kafkaesque/message_test.exs | 2 +- test/kafkaesque_test.exs | 23 +++++++++++++++++++---- 5 files changed, 39 insertions(+), 11 deletions(-) diff --git a/lib/kafkaesque.ex b/lib/kafkaesque.ex index abd1d06..1e3d306 100644 --- a/lib/kafkaesque.ex +++ b/lib/kafkaesque.ex @@ -50,7 +50,7 @@ defmodule Kafkaesque do use the library. See the documentation of the `Kafkaesque` module for more information. """ - @spec publish(Ecto.Repo.t(), String.t(), term(), String.t(), String.t()) :: + @spec publish(Ecto.Repo.t(), String.t(), integer(), String.t(), term()) :: {:ok, Message.t()} | {:error, Ecto.Changeset.t()} def publish(repo, topic, partition, key, payload) do message = Message.new(topic, partition, key, payload) @@ -118,7 +118,7 @@ defmodule Kafkaesque do Kafkaesque.publish(unquote(repo), topic, partition, key, payload) end - @spec encode(term()) :: String.t() + @spec encode(term()) :: term() def encode(body) do body end diff --git a/lib/kafkaesque/clients/brod_client.ex b/lib/kafkaesque/clients/brod_client.ex index bffbd3b..73e1eab 100644 --- a/lib/kafkaesque/clients/brod_client.ex +++ b/lib/kafkaesque/clients/brod_client.ex @@ -34,7 +34,8 @@ defmodule Kafkaesque.Clients.BrodClient do @impl Kafkaesque.Client def publish(%{brod_client_id: client_id, task_supervisor: task_supervisor}, messages) do # We pre-process the message bodies to avoid copying unecessary data to the task - message_batches = Enum.group_by(messages, &{&1.partition, &1.topic}, &{&1.key, &1.body}) + message_batches = + Enum.group_by(messages, &{&1.partition, &1.topic}, & &1.body) task_results = task_supervisor diff --git a/lib/kafkaesque/message.ex b/lib/kafkaesque/message.ex index 1f7df60..06ca2f9 100644 --- a/lib/kafkaesque/message.ex +++ b/lib/kafkaesque/message.ex @@ -6,6 +6,18 @@ defmodule Kafkaesque.Message do use Ecto.Schema import Ecto.Changeset + defmodule Data do + use Ecto.Type + + def type, do: :binary + + def cast(term), do: {:ok, term} + + def load(data) when is_binary(data), do: {:ok, :erlang.binary_to_term(data)} + + def dump(term), do: {:ok, :erlang.term_to_binary(term)} + end + @type state() :: :failed | :pending @@ -16,7 +28,7 @@ defmodule Kafkaesque.Message do state: state(), topic: String.t(), partition: integer(), - body: String.t(), + body: term(), attempt: pos_integer(), attempted_by: String.t() | nil, attempted_at: NaiveDateTime.t() | nil, @@ -35,7 +47,7 @@ defmodule Kafkaesque.Message do field(:partition, :integer) field(:key, :string, default: "") - field(:body, :string) + field(:body, __MODULE__.Data, default: %{}) field(:attempt, :integer, default: 0) field(:attempted_by, :string) @@ -45,7 +57,7 @@ defmodule Kafkaesque.Message do timestamps() end - @spec new(String.t(), String.t(), String.t(), String.t()) :: Ecto.Changeset.t() + @spec new(String.t(), integer(), String.t(), term()) :: Ecto.Changeset.t() def new(topic, partition, key, body) do %__MODULE__{} |> cast(%{topic: topic, partition: partition, body: body, key: key}, [ diff --git a/test/kafkaesque/message_test.exs b/test/kafkaesque/message_test.exs index 7b71fae..9d2be99 100644 --- a/test/kafkaesque/message_test.exs +++ b/test/kafkaesque/message_test.exs @@ -22,7 +22,7 @@ defmodule Kafkaesque.MessageTest do key = 2 partition = "notanumber" - assert %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]} = + assert %Ecto.Changeset{errors: [topic: _, partition: _, key: _]} = Message.new(topic, partition, key, body) end end diff --git a/test/kafkaesque_test.exs b/test/kafkaesque_test.exs index 7c30d78..115209d 100644 --- a/test/kafkaesque_test.exs +++ b/test/kafkaesque_test.exs @@ -5,22 +5,37 @@ defmodule KafkaesqueTest do describe "publish/4" do test "inserts valid messages" do - {:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "", "content") + {:ok, + %Kafkaesque.Message{ + key: "test_key", + body: "content", + state: :pending, + topic: "test_topic" + }} = + Kafkaesque.publish(Repo, "test_topic", 0, "test_key", "content") + + {:ok, + %Kafkaesque.Message{ + key: "test_key", + body: %{test: "content"}, + state: :pending, + topic: "test_topic" + }} = + Kafkaesque.publish(Repo, "test_topic", 0, "test_key", %{test: "content"}) end test "errors for invalid messages" do invalid_topic = 1 - invalid_body = {1, 2} invalid_key = 2 invalid_partition = "notanumber" - assert {:error, %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]}} = + assert {:error, %Ecto.Changeset{errors: [topic: _, partition: _, key: _]}} = Kafkaesque.publish( Repo, invalid_topic, invalid_partition, invalid_key, - invalid_body + %{} ) end end