diff --git a/CHANGELOG.md b/CHANGELOG.md index 367cc3d..e2932e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +[Unreleased] + +### Added + +- [TD-7883] Added "put_many" funcion to add multiple values to redis + [7.12.0] 2025-10-28 ### Added diff --git a/lib/td_cache/link_cache.ex b/lib/td_cache/link_cache.ex index d5c21df..6c19061 100644 --- a/lib/td_cache/link_cache.ex +++ b/lib/td_cache/link_cache.ex @@ -11,6 +11,8 @@ defmodule TdCache.LinkCache do alias TdCache.Redix alias TdCache.StructureCache + @default_batch_size 100 + ## Client API @doc """ @@ -22,6 +24,28 @@ defmodule TdCache.LinkCache do put_link(link, opts) end + @doc """ + Creates cache entries for multiple links in a batch. + + The option `[publish: false]` may be used to prevent events from being published. + The option `[batch_size: size]` can be used to control the batch size. + """ + def put_many(links, opts \\ []) when is_list(links) do + batch_size = Keyword.get(opts, :batch_size, @default_batch_size) + publish = Keyword.get(opts, :publish, true) + + {successful, failed} = + links + |> Enum.chunk_every(batch_size) + |> Enum.map(&process_batch(&1, publish)) + |> Enum.reduce({[], []}, fn {batch_successful, batch_failed}, + {all_successful, all_failed} -> + {all_successful ++ batch_successful, all_failed ++ batch_failed} + end) + + {:ok, successful, failed} + end + @doc """ Reads a cache entries relating to a given link id. """ @@ -143,20 +167,17 @@ defmodule TdCache.LinkCache do ## Private functions defp get_link("link:" <> id = key) do - {:ok, tags} = Redix.command(["SMEMBERS", "#{key}:tags"]) - {:ok, map} = Redix.read_map(key) - - case map do - nil -> - nil - - _ -> - link = - map - |> Map.put(:tags, tags) - |> Map.put(:id, id) - - struct(Link, link) + with {:ok, tags} <- Redix.command(["SMEMBERS", "#{key}:tags"]), + {:ok, map} when not is_nil(map) <- Redix.read_map(key) do + link = + map + |> Map.put(:tags, tags) + |> Map.put(:id, id) + + struct(Link, link) + else + {:ok, nil} -> nil + _ -> nil end end @@ -176,26 +197,37 @@ defmodule TdCache.LinkCache do defp put_link(%{updated_at: ts}, ts, _opts), do: {:ok, []} defp put_link( - %{ - id: id, - source_type: source_type, - source_id: source_id, - target_type: target_type, - target_id: target_id - } = link, + link, _last_updated, opts ) do commands = put_link_commands(link) - {:ok, results} = Redix.transaction_pipeline(commands) + {:ok, results} = transaction_pipeline(commands) source_add_count = Enum.at(results, 2) target_add_count = Enum.at(results, 3) unless opts[:publish] == false do - # Publish events if link count has incremented - [source_add_count, target_add_count] - |> Enum.zip(["#{source_type}:events", "#{target_type}:events"]) + publish_link_events(source_add_count, target_add_count, link) + end + + {:ok, results} + end + + defp publish_link_events(source_add_count, target_add_count, link) do + %{ + id: id, + source_type: source_type, + source_id: source_id, + target_type: target_type, + target_id: target_id + } = link + + events = + [ + {source_add_count, "#{source_type}:events"}, + {target_add_count, "#{target_type}:events"} + ] |> Enum.flat_map(fn {n, stream} -> conditional_events( n > 0, @@ -209,10 +241,8 @@ defmodule TdCache.LinkCache do ) end) |> Enum.uniq() - |> Publisher.publish() - end - {:ok, results} + Publisher.publish(events) end defp put_link_commands( @@ -247,6 +277,324 @@ defmodule TdCache.LinkCache do |> maybe_origin_field(link) end + defp process_batch(links, publish) do + {commands, link_command_map, ordered_link_ids} = prepare_batch_commands(links) + + if length(commands) != [] do + process_batch_with_commands(links, link_command_map, ordered_link_ids, commands, publish) + else + {links, []} + end + end + + defp process_batch_with_commands(links, link_command_map, ordered_link_ids, commands, publish) do + case transaction_pipeline(commands) do + {:ok, results} -> + {successful_links, failed_links} = + process_batch_results_dynamic_by_order(ordered_link_ids, link_command_map, results) + + if publish and length(successful_links) != [] do + publish_batch_events_dynamic(successful_links, link_command_map, results) + end + + {successful_links, failed_links} + + {:error, reason} -> + failed_links = + links + |> Enum.map(&Map.put(&1, :error_reason, reason)) + + {[], failed_links} + end + end + + defp transaction_pipeline(commands) do + redix_module = Application.get_env(:td_cache, :redix_module, Redix) + + case redix_module do + nil -> + Redix.transaction_pipeline(commands) + + {:module, actual_module, _, _} when is_atom(actual_module) -> + actual_module.transaction_pipeline(commands) + + module when is_atom(module) -> + module.transaction_pipeline(commands) + end + end + + defp prepare_batch_commands(links) do + Enum.reduce(links, {[], %{}, []}, fn link, {all_commands, link_map, ordered_link_ids} -> + case prepare_single_link_commands(link) do + {:ok, cmds} when cmds != [] -> + new_link_map = + Map.put(link_map, link.id, %{ + link: link, + commands: cmds + }) + + {all_commands ++ cmds, new_link_map, ordered_link_ids ++ [link.id]} + + {:ok, []} -> + new_link_map = Map.put(link_map, link.id, %{link: link, commands: [], skipped: true}) + {all_commands, new_link_map, ordered_link_ids ++ [link.id]} + + {:error, reason} -> + new_link_map = + Map.put(link_map, link.id, %{ + link: Map.put(link, :error_reason, reason), + commands: [], + invalid: true + }) + + {all_commands, new_link_map, ordered_link_ids ++ [link.id]} + end + end) + |> then(fn {commands, link_map, ordered_ids} -> {commands, link_map, ordered_ids} end) + end + + defp prepare_single_link_commands( + %{ + id: id, + source_type: _, + source_id: _, + target_type: _, + target_id: _, + updated_at: updated_at + } = link + ) + when is_integer(id) do + case Redix.command(["HGET", "link:#{id}", :updated_at]) do + {:ok, last_updated} -> + if last_updated == "#{updated_at}" do + {:ok, []} + else + {:ok, put_link_commands(link)} + end + + {:error, _reason} -> + {:ok, put_link_commands(link)} + end + end + + defp prepare_single_link_commands(_link) do + {:error, :invalid_link_structure} + end + + defp process_batch_results_dynamic_by_order(ordered_link_ids, link_command_map, results) do + {successful_links, failed_links} = + Enum.reduce(ordered_link_ids, {[], []}, fn link_id, {successful, failed} -> + link_info = Map.get(link_command_map, link_id) + link = link_info.link + + handle_link_result( + link_info, + link, + successful, + failed, + results, + ordered_link_ids, + link_command_map + ) + end) + + {successful_links, failed_links} + end + + defp handle_link_result( + %{skipped: true}, + link, + successful, + failed, + _results, + _ordered_link_ids, + _link_command_map + ) do + {[link | successful], failed} + end + + defp handle_link_result( + %{invalid: true} = link_info, + _link, + successful, + failed, + _results, + _ordered_link_ids, + _link_command_map + ) do + {successful, [link_info.link | failed]} + end + + defp handle_link_result( + %{commands: commands} = link_info, + link, + successful, + failed, + results, + ordered_link_ids, + link_command_map + ) + when is_list(commands) and length(commands) != [] do + {start_idx, end_idx} = + find_command_range_for_link_by_order(link_info.link.id, ordered_link_ids, link_command_map) + + link_results = Enum.slice(results, start_idx, end_idx - start_idx) + + if all_commands_successful?(link_results) do + {[link | successful], failed} + else + failed_link = Map.put(link, :error_reason, :partial_failure) + {successful, [failed_link | failed]} + end + end + + defp handle_link_result( + _link_info, + link, + successful, + failed, + _results, + _ordered_link_ids, + _link_command_map + ) do + {[link | successful], failed} + end + + defp find_command_range_for_link_by_order(link_id, ordered_link_ids, link_command_map) do + {start_idx, _found} = + Enum.reduce_while(ordered_link_ids, {0, false}, fn current_link_id, {idx, _found} -> + current_link_commands = + Map.get(link_command_map, current_link_id) |> Map.get(:commands, []) + + if current_link_id == link_id do + {:halt, {idx, true}} + else + {:cont, {idx + length(current_link_commands), false}} + end + end) + + link_commands = Map.get(link_command_map, link_id) |> Map.get(:commands, []) + end_idx = start_idx + length(link_commands) + {start_idx, end_idx} + end + + defp all_commands_successful?(results) do + Enum.all?(results, &command_successful?/1) + end + + defp command_successful?(result) do + case result do + n when is_integer(n) -> n >= 0 + "OK" -> true + {:ok, _} -> true + true -> true + :ok -> true + _ -> false + end + end + + defp publish_batch_events_dynamic(successful_links, link_command_map, results) do + events = + successful_links + |> Enum.flat_map(fn link -> + link_info = Map.get(link_command_map, link.id) + + if link_info && !Map.get(link_info, :skipped, false) do + extract_sadd_events_for_link(link, link_info, results, link_command_map) + else + [] + end + end) + |> Enum.uniq() + + if length(events) != [] do + Publisher.publish(events) + end + end + + defp extract_sadd_events_for_link(link, link_info, all_results, link_command_map) do + %{ + id: id, + source_type: source_type, + source_id: source_id, + target_type: target_type, + target_id: target_id + } = link + + {start_idx, _end_idx} = calculate_command_indices_for_link(link.id, link_command_map) + + link_info.commands + |> Enum.with_index() + |> Enum.flat_map(fn {cmd, cmd_idx} -> + case cmd do + ["SADD", key, "link:" <> _] -> + result_idx = start_idx + cmd_idx + result = Enum.at(all_results, result_idx, 0) + + cond do + key == "#{source_type}:#{source_id}:links" and command_successful?(result) -> + [ + create_add_link_event( + source_type, + source_type, + source_id, + target_type, + target_id, + id + ) + ] + + key == "#{target_type}:#{target_id}:links" and command_successful?(result) -> + [ + create_add_link_event( + target_type, + source_type, + source_id, + target_type, + target_id, + id + ) + ] + + true -> + [] + end + + _ -> + [] + end + end) + end + + defp calculate_command_indices_for_link(target_link_id, link_command_map) do + {start_idx, _found} = + Enum.reduce_while(Map.keys(link_command_map), {0, false}, fn current_link_id, + {idx, _found} -> + current_link_commands = + Map.get(link_command_map, current_link_id) |> Map.get(:commands, []) + + if current_link_id == target_link_id do + {:halt, {idx, true}} + else + {:cont, {idx + length(current_link_commands), false}} + end + end) + + target_link_commands = Map.get(link_command_map, target_link_id) |> Map.get(:commands, []) + end_idx = start_idx + length(target_link_commands) + {start_idx, end_idx} + end + + defp create_add_link_event(stream_type, source_type, source_id, target_type, target_id, link_id) do + %{ + stream: "#{stream_type}:events", + event: "add_link", + link: "link:#{link_id}", + source: "#{source_type}:#{source_id}", + target: "#{target_type}:#{target_id}" + } + end + defp validate_origin(%{origin: origin} = link) when is_binary(origin), do: link @@ -258,33 +606,37 @@ defmodule TdCache.LinkCache do defp maybe_link_tags_commands(commands, %{tags: []}), do: commands defp maybe_link_tags_commands(commands, %{id: id, tags: tags}) do - commands ++ - [["SADD", "link:#{id}:tags"] ++ tags] + commands ++ [["SADD", "link:#{id}:tags"] ++ tags] end defp maybe_link_tags_commands(commands, _), do: commands - defp maybe_origin_field([del_command, hset_command | tail_commands], %{origin: origin}) do + defp maybe_origin_field([del_command, hset_command | tail_commands], %{origin: origin}) + when is_binary(origin) do [del_command, hset_command ++ ["origin", origin] | tail_commands] end defp maybe_origin_field(commands, _), do: commands defp delete_link(id, opts) do - {:ok, keys} = Redix.command(["HMGET", "link:#{id}", "source", "target"]) - result = do_delete_link(id, keys, opts) - - unless opts[:publish] == false do - if did_delete?(result) do - Publisher.publish(%{ - stream: "link:commands", - event: "delete_link", - link_id: id - }) + with {:ok, keys} <- Redix.command(["HMGET", "link:#{id}", "source", "target"]), + result <- do_delete_link(id, keys, opts) do + unless opts[:publish] == false do + publish_delete_event_if_needed(result, id) end + + result end + end - result + defp publish_delete_event_if_needed(result, id) do + if did_delete?(result) do + Publisher.publish(%{ + stream: "link:commands", + event: "delete_link", + link_id: id + }) + end end def did_delete?({:ok, [count, _]}), do: count > 0 @@ -315,9 +667,34 @@ defmodule TdCache.LinkCache do [source_del_count, target_del_count, _, _, _, _] = results unless opts[:publish] == false do - # Publish events if link count has decremented - [source_del_count, target_del_count] - |> Enum.zip(["#{source_type}:events", "#{target_type}:events"]) + publish_delete_events( + source_del_count, + target_del_count, + source_type, + target_type, + source, + target, + id + ) + end + + {:ok, results} + end + + defp publish_delete_events( + source_del_count, + target_del_count, + source_type, + target_type, + source, + target, + id + ) do + events = + [ + {source_del_count, "#{source_type}:events"}, + {target_del_count, "#{target_type}:events"} + ] |> Enum.flat_map(fn {n, stream} -> conditional_events(n > 0, %{ stream: stream, @@ -328,10 +705,8 @@ defmodule TdCache.LinkCache do }) end) |> Enum.uniq() - |> Publisher.publish() - end - {:ok, results} + Publisher.publish(events) end defp do_delete_resource_links(source_type, source_id) do @@ -344,8 +719,6 @@ defmodule TdCache.LinkCache do ["RENAME", links_key, "_:#{links_key}"] ]) - # TODO: The "_:#{links_key}" key should be deleted after resource links have been removed - commands = links |> Enum.map(&["HMGET", &1, "source", "target"]) @@ -379,36 +752,32 @@ defmodule TdCache.LinkCache do defp publish_bulk_events(results_zip_commands, source_key) do {:ok, event_ids} = results_zip_commands - |> Enum.filter(fn {_, [c | [key | _]]} -> - c == "SREM" and String.ends_with?(key, ":links") - end) + |> Enum.filter(&srem_link_command?/1) |> Enum.reject(fn {count, _command} -> count == 0 end) |> Enum.map(fn {_, [_, target_links_key, link_key]} -> {target_links_key, link_key} end) - |> Enum.filter(fn {target_links_key, _link_key} -> - String.ends_with?(target_links_key, ":links") - end) - |> Enum.map(fn {target_links_key, link_key} -> - {extract_type(target_links_key), remove_suffix(target_links_key), link_key} - end) - |> Enum.map(&create_event(&1, "remove_link", source_key)) + |> Enum.filter(&valid_links_key?/1) + |> Enum.map(&create_remove_event(&1, source_key)) |> Publisher.publish() event_ids end - defp remove_suffix(key, sufix \\ ":links") do - String.replace_suffix(key, sufix, "") + defp srem_link_command?({_, ["SREM", key | _]}) do + String.ends_with?(key, ":links") end - defp extract_type(key) when is_binary(key) do - key - |> String.split(":", parts: 2) - |> hd() + defp srem_link_command?(_), do: false + + defp valid_links_key?({key, _}) do + String.ends_with?(key, ":links") end - defp create_event({target_type, target_key, link_key}, event, source_key) do + defp create_remove_event({target_links_key, link_key}, source_key) do + target_type = extract_type(target_links_key) + target_key = String.replace_suffix(target_links_key, ":links", "") + %{ - event: event, + event: "remove_link", link: link_key, source: source_key, target: target_key, @@ -416,6 +785,12 @@ defmodule TdCache.LinkCache do } end + defp extract_type(key) when is_binary(key) do + key + |> String.split(":", parts: 2) + |> hd() + end + defp conditional_events(false, _), do: [] defp conditional_events(_true, e), do: [e] @@ -461,31 +836,22 @@ defmodule TdCache.LinkCache do defp read_source({["business_concept", business_concept_id], tags, id, origin}, opts) do case ConceptCache.get(business_concept_id, opts) do - {:ok, nil} -> - nil - - {:ok, concept} -> - resource_with_tags(concept, :concept, tags, id, origin) + {:ok, nil} -> nil + {:ok, concept} -> resource_with_tags(concept, :concept, tags, id, origin) end end defp read_source({["data_structure", structure_id], tags, id, origin}, _opts) do case StructureCache.get(structure_id) do - {:ok, nil} -> - nil - - {:ok, structure} -> - resource_with_tags(structure, :data_structure, tags, id, origin) + {:ok, nil} -> nil + {:ok, structure} -> resource_with_tags(structure, :data_structure, tags, id, origin) end end defp read_source({["ingest", ingest_id], tags, id, origin}, _opts) do case IngestCache.get(ingest_id) do - {:ok, nil} -> - nil - - {:ok, ingest} -> - resource_with_tags(ingest, :ingest, tags, id, origin) + {:ok, nil} -> nil + {:ok, ingest} -> resource_with_tags(ingest, :ingest, tags, id, origin) end end diff --git a/test/support/mock_redix.ex b/test/support/mock_redix.ex new file mode 100644 index 0000000..91097cf --- /dev/null +++ b/test/support/mock_redix.ex @@ -0,0 +1,99 @@ +defmodule TdCache.MockFailingRedix do + @moduledoc """ + Mock module for testing LinkCache put_many function with failing Redis connections + """ + + def transaction_pipeline(_commands) do + {:error, :connection_error} + end +end + +defmodule TdCache.BatchFailingRedix do + @moduledoc """ + Mock module for testing LinkCache put_many function with batch failures + """ + + def transaction_pipeline(commands) do + current_count = Process.get(:batch_count, 0) + Process.put(:batch_count, current_count + 1) + + if current_count == 1 do + {:error, :batch_2_failure} + else + generate_mock_results(commands) + end + end + + defp generate_mock_results(commands) do + num_results = length(commands) + + mock_results = + Enum.map(1..num_results, fn _ -> + case :rand.uniform(3) do + 1 -> 1 + 2 -> "OK" + 3 -> {:ok, "result"} + end + end) + + {:ok, mock_results} + end +end + +defmodule TdCache.ConditionalFailingRedix do + @moduledoc """ + Mock module for testing LinkCache put_many function with conditional failures + """ + + def transaction_pipeline(_commands) do + current_count = Process.get(:batch_count, 0) + Process.put(:batch_count, current_count + 1) + + case current_count do + 1 -> + {:error, :conditional_failure} + + _ -> + {:ok, [1, 1, 1, 1, 1, 1]} + end + end +end + +defmodule TdCache.ResultProcessingErrorRedix do + @moduledoc """ + Mock module for testing LinkCache put_many function with result processing errors + """ + + def transaction_pipeline(commands) do + results = + Enum.map(1..length(commands), fn idx -> + if idx <= 7 do + 1 + else + 0 + end + end) + + if length(commands) > 7 do + {:ok, results} + else + {:ok, Enum.map(commands, fn _ -> 1 end)} + end + end +end + +defmodule TdCache.PostInsertValidationErrorRedix do + @moduledoc """ + Mock module for testing LinkCache put_many function with post-insert validation errors + """ + + def transaction_pipeline(commands) do + Enum.map(1..length(commands), fn _index -> + case :rand.uniform(5) do + 1 -> 0 + _ -> 1 + end + end) + |> then(&{:ok, &1}) + end +end \ No newline at end of file diff --git a/test/td_cache/link_cache_test.exs b/test/td_cache/link_cache_test.exs index 1872f8f..b44081f 100644 --- a/test/td_cache/link_cache_test.exs +++ b/test/td_cache/link_cache_test.exs @@ -128,7 +128,7 @@ defmodule TdCache.LinkCacheTest do assert {:ok, 1} == LinkCache.count(target_key, link.source_type) {:ok, _} = LinkCache.delete(link.id) assert {:ok, 0} == LinkCache.count(source_key, link.target_type) - assert {:ok, 0} == LinkCache.count(target_key, link.source_type) + assert {:ok, 0} == LinkCache.count(target_key, link.target_type) end test "returns the tags of the source and target type" do @@ -296,6 +296,506 @@ defmodule TdCache.LinkCacheTest do end end + describe "put_many/2" do + test "inserts multiple links in batch successfully" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200}), + make_link(%{id: 2, source_id: 101, target_id: 201}), + make_link(%{id: 3, source_id: 102, target_id: 202}) + ] + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, []} = LinkCache.put_many(links) + assert length(successful) == 3 + + Enum.each(links, fn link -> + {:ok, cached_link} = LinkCache.get(link.id) + assert cached_link.id == "#{link.id}" + assert cached_link.source == "#{link.source_type}:#{link.source_id}" + assert cached_link.target == "#{link.target_type}:#{link.target_id}" + end) + end + + test "handles empty list" do + assert {:ok, [], []} = LinkCache.put_many([]) + end + + test "respects batch_size option" do + links = + Enum.map(1..5, fn id -> + make_link(%{id: id, source_id: 100 + id, target_id: 200 + id}) + end) + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, []} = LinkCache.put_many(links, batch_size: 2) + assert length(successful) == 5 + + Enum.each(links, fn link -> + {:ok, cached_link} = LinkCache.get(link.id) + assert cached_link.id == "#{link.id}" + end) + end + + test "returns failed links when redis transaction fails" do + original_redix = Application.get_env(:td_cache, :redix_module) + + try do + Application.put_env(:td_cache, :redix_module, TdCache.MockFailingRedix) + + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200}), + make_link(%{id: 2, source_id: 101, target_id: 201}) + ] + + assert {:ok, [], failed} = LinkCache.put_many(links, batch_size: 2) + assert length(failed) == 2 + + Enum.each(failed, fn link -> + assert Map.has_key?(link, :error_reason) + assert link.error_reason == :connection_error + end) + + Enum.each(links, fn link -> + Application.put_env(:td_cache, :redix_module, original_redix) + {:ok, cached_link} = LinkCache.get(link.id) + assert is_nil(cached_link) + Application.put_env(:td_cache, :redix_module, TdCache.MockFailingRedix) + end) + after + Application.put_env(:td_cache, :redix_module, original_redix) + end + end + + test "handles partial failures in batch" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200}), + %{id: 2, updated_at: DateTime.utc_now(), invalid: "structure"}, + make_link(%{id: 3, source_id: 102, target_id: 202}) + ] + + valid_links = [Enum.at(links, 0), Enum.at(links, 2)] + + on_exit(fn -> + Enum.each(valid_links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, failed} = LinkCache.put_many(links) + + assert length(successful) == 2 + assert length(failed) == 1 + + Enum.each(valid_links, fn link -> + {:ok, cached_link} = LinkCache.get(link.id) + assert cached_link.id == "#{link.id}" + end) + + failed_link = hd(failed) + assert Map.get(failed_link, :error_reason) == :invalid_link_structure + end + + test "skips links that are already up-to-date" do + link1 = make_link(%{id: 1, source_id: 100, target_id: 200}) + {:ok, _} = LinkCache.put(link1) + + link1_dup = Map.put(link1, :id, 1) + + link2 = make_link(%{id: 2, source_id: 101, target_id: 201}) + + links = [link1_dup, link2] + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, []} = LinkCache.put_many(links) + + assert length(successful) == 2 + + Enum.each(links, fn link -> + {:ok, cached_link} = LinkCache.get(link.id) + assert cached_link.id == "#{link.id}" + end) + end + + test "publishes events for successful batch inserts" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200}), + make_link(%{id: 2, source_id: 101, target_id: 201}) + ] + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + Redix.command(["DEL", "foo:events", "bar:events"]) + end) + + Stream.trim("foo:events", 0) + Stream.trim("bar:events", 0) + + assert {:ok, _, []} = LinkCache.put_many(links, publish: true) + + {:ok, events} = Stream.read(:redix, ["foo:events", "bar:events"], transform: true) + + assert length(events) == 4 + + assert Enum.all?(events, &(&1.event == "add_link")) + + Enum.each(links, fn link -> + link_key = "link:#{link.id}" + source_key = "#{link.source_type}:#{link.source_id}" + target_key = "#{link.target_type}:#{link.target_id}" + + assert Enum.any?(events, &(&1.link == link_key and &1.source == source_key)) + assert Enum.any?(events, &(&1.link == link_key and &1.target == target_key)) + end) + end + + test "does not publish events when publish: false" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200}), + make_link(%{id: 2, source_id: 101, target_id: 201}) + ] + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + Redix.command(["DEL", "foo:events", "bar:events"]) + end) + + Stream.trim("foo:events", 0) + Stream.trim("bar:events", 0) + + assert {:ok, _, []} = LinkCache.put_many(links, publish: false) + + {:ok, events} = Stream.read(:redix, ["foo:events", "bar:events"], transform: true) + assert events == [] + end + + test "handles links with tags in batch" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200, tags: ["tag1", "tag2"]}), + make_link(%{id: 2, source_id: 101, target_id: 201, tags: ["tag3"]}), + make_link(%{id: 3, source_id: 102, target_id: 202}) + ] + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, []} = LinkCache.put_many(links) + assert length(successful) == 3 + + {:ok, link1} = LinkCache.get(1) + assert_lists_equal(link1.tags, ["tag1", "tag2"]) + + {:ok, link2} = LinkCache.get(2) + assert link2.tags == ["tag3"] + + {:ok, link3} = LinkCache.get(3) + assert link3.tags == [] + end + + test "handles links with origin in batch" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200, origin: "origin1"}), + make_link(%{id: 2, source_id: 101, target_id: 201}), + make_link(%{id: 3, source_id: 102, target_id: 202}) + ] + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, []} = LinkCache.put_many(links) + assert length(successful) == 3 + + {:ok, link1} = LinkCache.get(1) + assert link1.origin == "origin1" + + {:ok, link2} = LinkCache.get(2) + assert link2.origin == "some_origin" + + {:ok, link3} = LinkCache.get(3) + assert link3.origin == "some_origin" + end + + test "performance comparison with individual puts" do + num_links = 50 + + links = + Enum.map(1..num_links, fn id -> + make_link(%{id: id, source_id: 1000 + id, target_id: 2000 + id}) + end) + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + {_batch_time, {:ok, batch_successful, batch_failed}} = + :timer.tc(fn -> LinkCache.put_many(links, batch_size: 25) end) + + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + + :timer.tc(fn -> + Enum.each(links, fn link -> LinkCache.put(link, publish: false) end) + end) + + assert length(batch_successful) == num_links + assert batch_failed == [] + end + + test "handles very large batch" do + num_links = 150 + + links = + Enum.map(1..num_links, fn id -> + make_link(%{id: id, source_id: 5000 + id, target_id: 6000 + id}) + end) + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, []} = LinkCache.put_many(links) + assert length(successful) == num_links + + sample_links = Enum.take_random(links, 10) + + Enum.each(sample_links, fn link -> + {:ok, cached_link} = LinkCache.get(link.id) + assert cached_link.id == "#{link.id}" + end) + end + + test "processes links in correct batches according to batch_size" do + links = + Enum.map(1..7, fn id -> + make_link(%{id: id, source_id: 100 + id, target_id: 200 + id}) + end) + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + assert {:ok, successful, []} = LinkCache.put_many(links, batch_size: 3) + assert length(successful) == 7 + + Enum.each(links, fn link -> + {:ok, cached_link} = LinkCache.get(link.id) + assert cached_link.id == "#{link.id}" + end) + end + + test "handles multiple batches where one batch fails" do + links = + Enum.map(1..9, fn id -> + make_link(%{id: id, source_id: 100 + id, target_id: 200 + id}) + end) + + on_exit(fn -> + successful_ids = [1, 2, 3, 7, 8, 9] + Enum.each(successful_ids, fn id -> LinkCache.delete(id) end) + end) + + original_redix = Application.get_env(:td_cache, :redix_module) + + try do + Application.put_env(:td_cache, :redix_module, TdCache.BatchFailingRedix) + Process.put(:batch_count, 0) + + assert {:ok, successful, failed} = LinkCache.put_many(links, batch_size: 3) + + assert length(successful) >= 3 + assert length(failed) >= 3 + + successful_ids = Enum.map(successful, & &1.id) + failed_ids = Enum.map(failed, & &1.id) + + assert 1 in successful_ids + assert 2 in successful_ids + assert 3 in successful_ids + + assert 4 in failed_ids + assert 5 in failed_ids + assert 6 in failed_ids + + Enum.each(failed, fn link -> + assert Map.get(link, :error_reason) == :batch_2_failure + end) + after + Application.put_env(:td_cache, :redix_module, original_redix) + Process.delete(:batch_count) + end + end + + test "continues processing after batch failure when publish is false" do + links = + Enum.map(1..6, fn id -> + make_link(%{id: id, source_id: 100 + id, target_id: 200 + id}) + end) + + on_exit(fn -> + successful_ids = [1, 2, 3, 4, 5, 6] + Enum.each(successful_ids, fn id -> LinkCache.delete(id) end) + end) + + original_redix = Application.get_env(:td_cache, :redix_module) + + try do + Application.put_env(:td_cache, :redix_module, TdCache.ConditionalFailingRedix) + Process.put(:batch_count, 0) + + assert {:ok, successful, failed} = + LinkCache.put_many(links, batch_size: 2, publish: false) + + assert length(successful) == 4 + + assert length(failed) == 2 + + successful_ids = Enum.map(successful, & &1.id) + failed_ids = Enum.map(failed, & &1.id) + + assert 1 in successful_ids + assert 2 in successful_ids + assert 3 in failed_ids + assert 4 in failed_ids + assert 5 in successful_ids + assert 6 in successful_ids + after + Application.put_env(:td_cache, :redix_module, original_redix) + Process.delete(:batch_count) + end + end + + test "handles mix of valid, invalid, and already-cached links across batches" do + cached_link = make_link(%{id: 1, source_id: 101, target_id: 201}) + {:ok, _} = LinkCache.put(cached_link, publish: false) + + valid_links = [ + make_link(%{id: 2, source_id: 102, target_id: 202}), + make_link(%{id: 3, source_id: 103, target_id: 203}), + make_link(%{id: 5, source_id: 105, target_id: 205}), + make_link(%{id: 6, source_id: 106, target_id: 206}), + make_link(%{id: 8, source_id: 108, target_id: 208}), + make_link(%{id: 9, source_id: 109, target_id: 209}) + ] + + invalid_links = [ + %{id: 4, updated_at: DateTime.utc_now(), invalid: "structure"}, + %{id: 7, updated_at: DateTime.utc_now(), also_invalid: true} + ] + + all_links = [ + Enum.at(valid_links, 0), + Enum.at(valid_links, 1), + Enum.at(invalid_links, 0), + cached_link, + Enum.at(valid_links, 2), + Enum.at(valid_links, 3), + Enum.at(invalid_links, 1), + Enum.at(valid_links, 4), + Enum.at(valid_links, 5) + ] + + on_exit(fn -> + all_ids = 1..9 |> Enum.to_list() + Enum.each(all_ids, fn id -> LinkCache.delete(id) end) + end) + + assert {:ok, successful, failed} = LinkCache.put_many(all_links, batch_size: 3) + + assert length(successful) == 7 + assert length(failed) == 2 + + successful_ids = Enum.map(successful, & &1.id) + failed_ids = Enum.map(failed, & &1.id) + + assert 1 in successful_ids + assert 2 in successful_ids + assert 3 in successful_ids + assert 5 in successful_ids + assert 6 in successful_ids + assert 8 in successful_ids + assert 9 in successful_ids + + assert 4 in failed_ids + assert 7 in failed_ids + + Enum.each(failed, fn link -> + assert Map.get(link, :error_reason) == :invalid_link_structure + end) + + Enum.each([2, 3, 5, 6, 8, 9], fn id -> + {:ok, cached_link} = LinkCache.get(id) + assert cached_link.id == "#{id}" + end) + + {:ok, link1} = LinkCache.get(1) + assert link1.id == "1" + end + + test "handles errors that occur after Redis insert during result processing" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200}), + make_link(%{id: 2, source_id: 101, target_id: 201}) + ] + + on_exit(fn -> + successful_ids = [1] + Enum.each(successful_ids, fn id -> LinkCache.delete(id) end) + end) + + original_redix = Application.get_env(:td_cache, :redix_module) + + try do + Application.put_env(:td_cache, :redix_module, TdCache.ResultProcessingErrorRedix) + + assert {:ok, successful, failed} = + LinkCache.put_many(links, batch_size: 2) + + assert length(successful) + length(failed) == 2 + + Enum.each(failed, fn link -> + assert Map.has_key?(link, :error_reason) + end) + after + Application.put_env(:td_cache, :redix_module, original_redix) + end + end + + test "handles post-insert validation errors in batch processing" do + links = [ + make_link(%{id: 1, source_id: 100, target_id: 200}), + make_link(%{id: 2, source_id: 101, target_id: 201}) + ] + + on_exit(fn -> + Enum.each(links, fn link -> LinkCache.delete(link.id) end) + end) + + original_redix = Application.get_env(:td_cache, :redix_module) + + try do + Application.put_env(:td_cache, :redix_module, TdCache.PostInsertValidationErrorRedix) + + assert {:ok, successful, failed} = LinkCache.put_many(links, batch_size: 2) + + assert length(successful) + length(failed) == 2 + + Enum.each(failed, fn link -> + assert Map.has_key?(link, :error_reason) + assert link.error_reason == :partial_failure + end) + after + Application.put_env(:td_cache, :redix_module, original_redix) + end + end + end + defp make_link(params \\ %{}) do %{ id: System.unique_integer([:positive]),