Phoenix / RabbitMQ - Integration with Broadway

Sending and receiving messages with RabbitMQ

Introduction

RabbitMQ Setup

# AVK Service and Friends

1. have running GREM (using docker)
```bash
# starts up our entire ecosystem
docker compose up -d
# starts up GREM processes
bin/start
  1. enable avk-service within GREM

bin/rails c

# feom .env.development
# of course update / change as needed
# export AVK_SERVICE_CLIENT_ID="aBjduMM1Eh"
# export AVK_SERVICE_CLIENT_SECRET="979z8oNieHu6Dg5D9Ps5yywNxpKsLW30"

client_id = 'aBjduMM1Eh'
encrypted_client_secret = User.new(password: '979z8oNieHu6Dg5D9Ps5yywNxpKsLW30').encrypted_password

name = 'avk_service'
scopes = ['accounting', 'avk_service', 'supplier']
puts encrypted_client_secret

service = ExternalService.create(name:, client_id:, scopes:, encrypted_client_secret:)
# #<ExternalService:0x000000010a5117c8
#  id: 10,
#  name: "avk_service",
#  client_id: "aBjduMM1Eh",
#  encrypted_client_secret: "$2a$11$QyWLpyKozLFoY9oGBSvrbeK5kI1GhIiwmvjFENUPYSnMg2nWiidki",
#  scopes: ["accounting", "avk_service", "supplier"],
#  created_at: 2024-07-23 16:00:17.054981 +0200,
#  updated_at: 2024-07-23 16:00:17.054981 +0200,
#  objektkategorie_cds: [],
#  notification_kategorie_cd: nil,
#  active: true>

# or directly in DB
bin/rails db
INSERT INTO external_services(name,client_id,encrypted_client_secret,scopes,created_at,updated_at) VALUES('avk_service', 'aBjduMM1Eh', '$2b$12$K.aDGjHcbJgoeQpXrKdHcuHgAnO/AMpYgN.eEkicvuSH93LlEu/Ja', '["accounting", "avk_service", "supplier"]', NOW(), NOW());
# INSERT 0 1
\q

# or UPDATE
# service = ExternalService.find_by(name:, client_id:)
# service.encrypted_client_secret = encrypted_client_secret
# service.save
  1. setup graphql-backend (locally) - uses port 4000

Be sure to point to the same DB here (with the same values):

# config/dev_adapter_config.exs
config :garaio_rem_graphql, GaraioRemGraphql.Repo,
  adapter: Ecto.Adapters.Postgres,
  username: {:system, "DATABASE_USER", "rem2"},
  password: {:system, "DATABASE_PW", "xxxxx"},
  database: {:system, "DATABASE", "omit"},
  hostname: "localhost",
  port: 5432,
  pool_size: 10,
  plan_cache_mode: "force_custom_plan",
  types: GaraioRemGraphql.PostgresTypes

as here in GREM:

development: &development
  adapter: postgis
  username: rem2
  password: xxxx
  database: omit
  encoding: unicode
  host: localhost
  port: 5432
  gssencmode: disable # enable if Puma Server crashes with docker

now start graphql and test:

iex -S mix phx.server
http://localhost:4000/api/graphiql

# user Auth (deprecated)
query {
        authenticate(username: "admgar", password: "rem2InFuture!") {
          accessToken
        }
      }
# {
#   "data": {
#     "authenticate": {
#       "accessToken": "eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJHYXJhaW9SZW1HcmFwaHFsIiwiZXhwIjoxNzIxNzQ3NjA4LCJpYXQiOjE3MjE3NDQwMDgsImlzcyI6IkdhcmFpb1JlbUdyYXBocWwiLCJqdGkiOiI0YzM2MjlkYS1iODMzLTQzOWYtOTFlNi1iNDMwZTdkMDRjNTEiLCJuYmYiOjE3MjE3NDQwMDcsInN1YiI6IntcImlkXCI6NixcInR5cGVcIjpcInVzZXJcIn0iLCJ0eXAiOiJhY2Nlc3MifQ.Dpv_AkkAOkLg7jkljKJlVp1_zUSFf_eapQ9DjNjjJT0UZgPSDZ0g78e1t5JrtmRtZxHZMiVaRFKWXB6hnAN-ew"
#     }
#   }
# }

# deprecated
query {
       authenticateService(clientId: "aBjduMM1Eh", clientSecret: "979z8oNieHu6Dg5D9Ps5yywNxpKsLW30") {
          accessToken
        }
}
# response
{
  "data": {
    "authenticateService": {
      "accessToken": "eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJHYXJhaW9SZW1HcmFwaHFsIiwiZXhwIjoxNzI0MzU3ODg2LCJpYXQiOjE3MjE3NjU4ODYsImlzcyI6IkdhcmFpb1JlbUdyYXBocWwiLCJqdGkiOiI2MjNmNmE4MS1mMjI1LTQ1NTAtYmFlZC0xZmFjOWI0ZjE3NDciLCJuYmYiOjE3MjE3NjU4ODUsInN1YiI6IntcImlkXCI6MTEsXCJ0eXBlXCI6XCJzZXJ2aWNlXCJ9IiwidHlwIjoiYWNjZXNzIn0.T0Qq8bHRs5VsXSUI7Yj4GePuEWUu83QwvCw9JrgymudK2j_rE-hQl56Rhbqf1slIaRBJ393fq4NBrJfDlwrK7A"
    }
  }
}

# if you get this - check that you are using th same db everywhere!
# {
#   "data": {
#     "authenticateService": null
#   },
#   "errors": [
#     {
#       "message": "Incorrect client id or client secret",
#       "path": [
#         "authenticateService"
#       ],
#       "extensions": {
#         "code": "invalid_grant"
#       },
#       "locations": [
#         {
#           "line": 2,
#           "column": 8
#         }
#       ]
#     }
#   ]
# }

# test that you can login with the avk_service
curl --request POST \
  --url 'http://localhost:4000/api/graphql/authenticate/token' \
  --header 'content-type: application/x-www-form-urlencoded' \
  --data grant_type=client_credentials \
  --data client_id="aBjduMM1Eh" \
  --data client_secret="979z8oNieHu6Dg5D9Ps5yywNxpKsLW30"

# response
{"access_token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJHYXJhaW9SZW1HcmFwaHFsIiwiZXhwIjoxNzI0MzU4MDI1LCJpYXQiOjE3MjE3NjYwMjUsImlzcyI6IkdhcmFpb1JlbUdyYXBocWwiLCJqdGkiOiI5MDEyZDc0MC03MWE3LTQyM2MtOTI3ZS0yMzk4YzVhMmIxOGIiLCJuYmYiOjE3MjE3NjYwMjQsInN1YiI6IntcImlkXCI6MTEsXCJ0eXBlXCI6XCJzZXJ2aWNlXCJ9IiwidHlwIjoiYWNjZXNzIn0.hm1Awy8jNXjyazd4j1w9HLNzmG9r2XiJhZxmcNzuycA24bq2no71nmSHUDlC7O0MIQhVj427V4YfMecAsKhT9Q"}%
# if you get this - check that you are using th same db everywhere!
# {"error":"invalid_grant","error_description":"Incorrect client id or client secret"}%
  1. Setup RabbitMQ
# open Rabbit MQ 'Overview' Tab
https://mbus.garaio-rem.ch/#/

# open 'Export definitions'
# click on 'Download broker definitions'

now open:

# open Rabbit MQ 'Overview' Tab
http://localhost:15672/#/

# open 'Import definitions'
# click on 'Choose File'
# click on 'Upload broker definitions'

NOTE: NOW you will need to logout and login again with the credentials from the ‘official’ mbus server!

  1. setup avk-service (locally) - change port to 4040 (so as not to conflict with graphql backend)
export PHX_PORT=4040

iex -S mix phx.server

GREM rabbitMQ Setup

Now that RabbitMQ is setup check/configure the GREM setup:

# config/rabbitmq.yml
development: &development
  enabled: true
  host: localhost
  vhost: /
  port: 5672
  ssl: false
  username: admin
  password: same_as_mbus_now (if mbus is imported)
  exchange_name: grem_publish
  verify_peer: false

setup AVK-Service (User sicht)

Set Tenant (localhost) with:

name: localhost graphql url: localhost:4000/api

URL: https://avk-service.fly.dev/invoices/5498f54a-678c-4507-8809-67f569068e0e.pdf

send message ‘Invoicing.Invoice.Created’

response messages ‘Invoicing.Invoice.Accepted’

# app/amqp/handlers/avk_invoice_message_handler.rb

    def success_result
      Result.success(
        business_event: BusinessEvent.new_for_message(avk_verantwortlicher, @command.class.name, @command.command_object)
      )
    end

    def send_invoice_accepted
      send_reply(
        'Invoicing.Invoice.Accepted',
        {
          externalReference: context_object.externe_rechnungs_nr,
          reference: context_object.referenz,
          orderReference: kreditorenauftrag&.referenz,
          externalOrderReference: kreditorenauftrag&.externe_auftrags_nr
        }
      )
    end
headers:
{
  "app_id": "grem_derham-test",
  "NewRelicID": "VgIGVV9WGwICVVBXBQQ=",
  "unit_category_code": "n/a",
  "NewRelicTransaction": "PxQABFEHAFYBV1EHAAkEVgADFB8EBw8RVU4aVV0PUApWVApRVFYLUVRSBUNKQQ9VBlwAW1EJFTs="
}

Message:
{
  "eventType": "Invoicing.Invoice.Accepted",
  "data": {
    "reference": null,
    "externalReference": "derham-test-025"
  }
}

{"externalReference": "301186-17544"}

response messages ‘Invoicing.Invoice.Rejected’

# app/amqp/handlers/avk_invoice_message_handler.rb
    def error_result
      ErrorResponse
        .new({})
        .tap { _1.add_handler_errors(errors) }
        .then { Result.error(_1.reasons_hash) }
    end

    def send_invoice_rejected
      send_reply(
        'Invoicing.Invoice.Rejected',
        {
          externalReference: @communication.request_field('externalReference'),
          reference: @communication.request_field('reference'),
          reasons: build_reject_reasons
        }
      )
    end
headers:
{
  "app_id": "grem_derham-test",
  "NewRelicID": "VgIGVV9WGwICVVBXBQQ=",
  "unit_category_code": "n/a",
  "NewRelicTransaction": "PxRVVFFTDgVWVlkBUlNVB1ZSFB8EBw8RVU4aAA0PBAQFAwtZUgRRAAUEVENKQQ9VBlwAW1EJFTs="
}

message:
{
  "eventType": "Invoicing.Invoice.Rejected",
  "data": {
    "reasons": [
      {"reason": "Auf das Konto 1111.465000 darf nicht ohne MWST-Code gebucht werden", "attribute": "taxCode", "lineNumber": 1},
      {"reason": "Dieses Konto muss mit einem Lebenslauf-Objekt bebucht werden", "attribute": "masterdataReference", "lineNumber": 1},
      {"reason": "Dieses Konto verlangt eine Objekt Referenz und nicht die einer LG oder eines Hauses", "attribute": "masterdataReference", "lineNumber": 1},
      {"reason": "Buchungstext obligatorisch", "attribute": "bookingText", "lineNumber": 1},
      {"reason": "Angabe von LG / Haus / Objekt ist bei diesem Konto obligatorisch", "attribute": null, "lineNumber": 1},
      {"reason": "ist ungültig", "attribute": "accountNumber", "lineNumber": 2},
      {"reason": "ist ungültig", "attribute": "accountNumber", "lineNumber": 3}],
    "externalReference": "derham-test-027"
  }
}

{
  "reasons": [{"reason": "est non valable", "attribute": "iban", "lineNumber": null}],
  "externalReference": "1709"
}

Base Config

```elixir
import Config

if System.get_env("PHX_SERVER") do
  config :avk_service, AvkServiceWeb.Endpoint, server: true
end

if config_env() != :test do
  # RabbitMQ Config
  config :avk_service, :rabbitmq,
    connection_params: [
      host:
        System.get_env("RABBITMQ_HOST") ||
          raise("Missing env variable: RABBITMQ_HOST"),
      virtual_host:
        System.get_env("RABBITMQ_VHOST") ||
          raise("Missing env variable: RABBITMQ_VHOST"),
      port:
        String.to_integer(System.get_env("RABBITMQ_PORT")) ||
          raise("Missing env variable: RABBITMQ_PORT"),
      username:
        System.get_env("RABBITMQ_USER") ||
          raise("Missing env variable: RABBITMQ_USER"),
      password:
        System.get_env("RABBITMQ_PASSWORD") ||
          raise("Missing env variable: RABBITMQ_PASSWORD")
    ]
end

Send to a Queue

# Usage
# data = %{greeting: "Hello, Broadway!"}
# AvkService.Amqp.QueuePublisher.send_message(data, "grem_qs06")
defmodule AvkService.Amqp.QueuePublisher do
  @moduledoc """
  Handles publishing messages to RabbitMQ.
  """

  alias AMQP.{Basic, Channel, Connection}
  alias Jason

  def send_message(message, queue \\ "avk_service") do
    config = Application.fetch_env!(:avk_service, :rabbitmq)[:connection_params]
    json_message = Jason.encode!(message)

    with {:ok, connection} <- Connection.open(config),
         {:ok, channel} <- Channel.open(connection) do
      Basic.publish(channel, "", queue, json_message)
      IO.puts("Sent message: #{json_message}")

      Channel.close(channel)
      Connection.close(connection)
      :ok
    else
      {:error, reason} ->
        IO.puts("Failed to send message: #{inspect(reason)}")
    end
  end
end

Send to an Exchange (with Routing Key)

Implementation

defmodule AvkService.Boundary.AmqpPublisher do
  @moduledoc """
  Provides the function `publish_message/4` to publish a message on RabbitMQ
  """
  use AMQP
  require Logger

  @behaviour AvkService.Boundary.AmqpPublisherBehaviour
  def send(data, tenant_name, message_id, event_type, app_id) do
    exchange = "#{app_id}_publish"
    routing_key = event_type
    message = Jason.encode!(%{eventType: event_type, data: data})
    recipient = "grem_#{tenant_name}"

    metadata = [
      app_id: app_id,
      message_id: message_id,
      headers: [app_id: app_id, recipient: recipient]
    ]

    case publish_message(exchange, routing_key, message, metadata) do
      :ok ->
        :ok

      {:error, reason} ->
        Logger.error("ERROR: #{inspect(reason)}")
        {:error, reason}

      _ ->
        Logger.error("ERROR fetching rabbitmq config")
        {:error, :internal}
    end
  end

  defp publish_message(exchange, routing_key, message, metadata) do
    full_metadata = default_metadata() |> Keyword.merge(metadata)

    data_package = %{
      exchange: exchange,
      routing_key: routing_key,
      message: message,
      metadata: full_metadata
    }

    log_msg =
      "RabbitMQ Message Initiation: #{inspect(data_package)}"
      |> String.trim_trailing("\n")

    Logger.info(log_msg)

    with {:ok, rabbit_conn, channel} <- open_channel(),
         :ok <- Confirm.select(channel),
         :ok <- try_publish(channel, message, metadata, exchange, routing_key),
         :ok <- close_channel(rabbit_conn, channel) do
      :ok
    else
      {:error, reason} ->
        Logger.error("ERROR: #{inspect(reason)}")
        {:error, "publish error: '#{inspect(reason)}'"}

      reason ->
        {:error, "publish error: '#{inspect(reason)}'"}
    end
  end

  defp open_channel do
    with rabbitmq_config <- Application.fetch_env!(:avk_service, :rabbitmq),
         {:ok, rabbit_conn} <- Connection.open(rabbitmq_config[:connection_params]),
         {:ok, channel} <- Channel.open(rabbit_conn) do
      {:ok, rabbit_conn, channel}
    else
      {:error, :econnrefused} ->
        {:error, "Cannot connect to MBus, check RabbitMQ connection ENV variables"}

      {:error, reason} ->
        {:error, "channel error: '#{inspect(reason)}'"}

      reason ->
        {:error, "channel error: '#{inspect(reason)}'"}
    end
  end

  # Publishing to RabbitMQ is asynchronous by design. To catch a channel error
  # when publishing to an exchange that doesn't exist, we need to use Confirm
  # and explicitly handle an :exit exception.
  defp try_publish(channel, message, metadata, exchange, routing_key) do
    with :ok <- Basic.publish(channel, exchange, routing_key, message, metadata),
         true <- Confirm.wait_for_confirms_or_die(channel) do
      data_package = %{
        channel: channel,
        exchange: exchange,
        routing_key: routing_key,
        message: message,
        metadata: metadata
      }

      log_msg =
        "RabbitMQ Message Confirmed: #{inspect(data_package)}"
        |> String.trim_trailing("\n")

      Logger.info(log_msg)

      :ok
    else
      {:error, err} -> {:error, err}
      _ -> {:error, :unknown}
    end
  rescue
    err -> {:error, err}
  end

  defp close_channel(rabbit_conn, channel) do
    with :ok <- Channel.close(channel),
         :ok <- Connection.close(rabbit_conn) do
      :ok
    else
      {:error, err} -> {:error, err}
    end
  end

  defp default_metadata do
    [
      content_type: "application/json",
      timestamp: DateTime.utc_now() |> DateTime.to_unix(),
      persistent: true
    ]
  end
end

Behavior

behavior allows us to mock out the rabbitmq connection for testing.

defmodule AvkService.Boundary.AmqpPublisherBehaviour do
  @moduledoc """
  Defines the behaviour for publishing messages to RabbitMQ.
  Enables us to make the publishing of messages to for tests.
  """
  @callback send(
              message_data :: Map.t(),
              tenant_name :: String.t(),
              message_id :: String.t(),
              event_type :: String.t(),
              app_id :: String.t()
            ) :: {:ok} | {:error, String.t()}
end

Test setup

# test/support/mocks.ex
Mox.defmock(AvkService.Boundary.StorageMock, for: AvkService.Boundary.StorageBehaviour)
Mox.defmock(AvkService.Boundary.GraphQLMock, for: AvkService.Boundary.GraphQLBehaviour)

Mox.defmock(AvkService.Boundary.ExtractionApiMock,
  for: AvkService.Boundary.ExtractionApiBehaviour
)

Mox.defmock(AvkService.Boundary.AmqpPublisherMock,
  for: AvkService.Boundary.AmqpPublisherBehaviour
)
import Config

# Configure your database
#
# The MIX_TEST_PARTITION environment variable can be used
# to provide built-in test partitioning in CI environment.
# Run `mix help test` for more information.
config :avk_service, AvkService.Repo,
  username: System.get_env("POSTGRES_USER", "rem2"),
  password: System.get_env("POSTGRES_PW", ""),
  hostname: System.get_env("POSTGRES_HOST", "localhost"),
  database: "avk_service_test#{System.get_env("MIX_TEST_PARTITION")}",
  pool: Ecto.Adapters.SQL.Sandbox,
  pool_size: System.schedulers_online() * 2

# We don't run a server during test. If one is required,
# you can enable the server option below.
config :avk_service, AvkServiceWeb.Endpoint,
  http: [ip: {127, 0, 0, 1}, port: 4002],
  secret_key_base: "p8wGTGKppAKj3N+r7iTdo9w0WfWRSeRMt7Bzv8wNzw0mvvHn1LtlolXPijRZJSdX",
  server: false

# DO NOT use rabbitmq consumer in tests (its not available)
# AvkService.Boundary.AmqpConsumer,
config :avk_service, :application_children, [
  AvkServiceWeb.Telemetry,
  AvkService.Repo,
  {DNSCluster, query: Application.get_env(:avk_service, :dns_cluster_query) || :ignore},
  {Phoenix.PubSub, name: AvkService.PubSub},
  # Start the Finch HTTP client for sending emails
  {Finch, name: AvkService.Finch},
  # Start a worker by calling: AvkService.Worker.start_link(arg)
  # {AvkService.Worker, arg},
  # Start to serve requests, typically the last entry
  AvkServiceWeb.Endpoint,
  # PromEx should be started after the Endpoint, to avoid unnecessary error messages
  AvkService.PromEx,
  # invoice pdf processing pipeline
  AvkService.Pipeline.PdfFilesQueue,
  {AvkService.Pipeline.BroadwayPdfToInvoice, []}
]

config :avk_service, :parashift_api,
  token: "token",
  endpoint: "https://api.parashift.io/v2",
  classification_scope: "garaio-pp-invoice-ch"

config :avk_service, :dti_api,
  app_key: "123",
  endpoint: "https://cloudextract-studio.dti.ch/extraction/api/",
  plan_id: "123"

config :tesla, adapter: Tesla.Mock

# Stub Req requests in tests
config :avk_service, graphql_req_options: [plug: {Req.Test, AvkService.Boundary.GraphQL}]

config :avk_service,
  cloud_extract_dti_api: [plug: {Req.Test, AvkService.Boundary.CloudextractDtiApi}],
  cloud_extract_parashift_api: [plug: {Req.Test, AvkService.Boundary.ParashiftApi}]

# In test we don't send emails.
config :avk_service, AvkService.Mailer, adapter: Swoosh.Adapters.Test

# Disable swoosh api client as it is only required for production adapters.
config :swoosh, :api_client, false

# Print only errors during test
config :logger, level: :none

# Initialize plugs at runtime for faster test compilation
config :phoenix, :plug_init_mode, :runtime

config :phoenix_live_view,
  # Enable helpful, but potentially expensive runtime checks
  enable_expensive_runtime_checks: true

config :junit_formatter,
  report_file: "report_file_test.xml",
  report_dir: "./ci_reports",
  print_report_file: true,
  prepend_project_name?: true

config :avk_service, :admin_basic_auth,
  username: "testadmin",
  password: "testpassword"

config :avk_service, :graphql_api,
  client_id: "test_client_id",
  client_secret: "test_client_secret"

# Boundary Modules where we cannot mock the API
config :avk_service, :boundaries, storage: AvkService.Boundary.StorageMock
config :avk_service, :boundaries, graphql: AvkService.Boundary.GraphQLMock
config :avk_service, :boundaries, extraction_api: AvkService.Boundary.ExtractionApiMock
config :avk_service, :boundaries, amqp_publisher: AvkService.Boundary.AmqpPublisherMock

# RabbitMQ Config
config :avk_service, :rabbitmq,
  connection_params: [
    host: "localhost",
    virtual_host: "/",
    port: 5672,
    username: "admin",
    password: "admin"
  ]

Test Implementation

defmodule AvkService.Pipeline.Worker.PublishInvoiceTest do
  use AvkService.DataCase, async: true
  import Mox
  import AvkService.Factory

  alias AvkService.Boundary.AmqpPublisherMock
  alias AvkService.Invoicing.Invoice
  alias AvkService.Pipeline.Worker.PublishInvoice
  alias AvkService.Repo

  describe "process/1" do
    setup do
      now = DateTime.utc_now() |> DateTime.to_date()
      due_date = Date.add(now, 30) |> Date.to_string()
      invoice_date = Date.add(now, -10) |> Date.to_string()

      storage_filepath = Ecto.UUID.generate() <> ".pdf"

      # purposely omitting external_order_reference - it should not included in the message
      invoice =
        insert(:invoice,
          iban: "CH9300762011623852957",
          external_reference: "123456789",
          storage_filepath: storage_filepath,
          accounting_reference: "1000",
          creditor_reference: "2345",
          order_reference: "34567",
          invoice_date: invoice_date,
          due_date: due_date,
          stage: "account_proposal",
          state: "OK"
        )

      insert_list(2, :invoice_line_item, invoice: invoice)
      invoice = invoice |> Repo.preload([:tenant, :line_items])

      %{invoice: invoice}
    end

    test "send expected 'Invoicing.Invoice.Created' message", %{invoice: invoice} do
      total_gross_amount =
        invoice.line_items
        |> Enum.map(& &1.booking_amount)
        |> Enum.reduce(Decimal.new("0"), &Decimal.add/2)
        |> Decimal.to_string()

      AmqpPublisherMock
      |> expect(:send, fn message_data, tenant_name, message_id, event_type, app_id ->
        assert app_id == "avk_service"
        assert event_type == "Invoicing.Invoice.Created"
        assert message_id == "invoice-#{invoice.id}"
        assert tenant_name == invoice.tenant.name

        assert message_data[:accountingReference] == invoice.accounting_reference
        assert message_data[:creditorReference] == invoice.creditor_reference
        assert message_data[:orderReference] == invoice.order_reference
        assert message_data[:totalGrossAmount] == total_gross_amount
        assert message_data[:languageCode] == invoice.language_code
        assert message_data[:iban] == invoice.iban
        assert message_data[:esrReference] == invoice.qr_reference
        assert message_data[:dueDate] == invoice.due_date
        assert message_data[:invoiceDate] == invoice.invoice_date

        assert message_data[:documentUrl] ==
                 "https://avk-service.fly.dev/invoices/" <> invoice.storage_filepath

        # Check the line_items exist with values
        assert Map.has_key?(message_data, :invoiceItems)
        invoice_items = message_data[:invoiceItems]

        # verify line items have the required fields
        keys = [:lineNumber, :taxCode, :accountNumber, :bookingAmount, :costCenterNumber]

        Enum.each(keys, fn key ->
          assert Enum.all?(invoice_items, fn item ->
                   Map.has_key?(item, key) and not is_nil(item[key])
                 end)
        end)

        # Verify that each accountNumber matches the pattern "accounting_reference.account_number"
        accounting_reference = invoice.accounting_reference

        assert Enum.all?(invoice_items, fn item ->
                 case String.split(item[:accountNumber], ".") do
                   [^accounting_reference, _account_number] -> true
                   _ -> false
                 end
               end)

        :ok
      end)
    end

    test "properly updates invoice", %{invoice: invoice} do
      AmqpPublisherMock
      |> expect(:send, fn _data, _tenant_name, _message_id, _event_type, _app_id ->
        :ok
      end)

      {:ok, published_invoice} = PublishInvoice.process(%{invoice: invoice})

      assert published_invoice.id == invoice.id
      assert published_invoice.stage == "publish_invoice"
      assert published_invoice.state == "PUBLISHED"
    end

    test "`totalGrossAmount` without line items", %{invoice: invoice} do
      hd(invoice.line_items)
      |> Ecto.Changeset.change(%{account_number: nil})
      |> Repo.update!()

      invoice =
        Repo.get!(Invoice, invoice.id)
        |> Repo.preload([:tenant, :line_items])

      total_gross_amount =
        invoice.line_items
        |> Enum.map(& &1.booking_amount)
        |> Enum.reduce(Decimal.new("0"), &Decimal.add/2)
        |> Decimal.to_string()

      AmqpPublisherMock
      |> expect(:send, fn message_data, _tenant_name, _message_id, _event_type, _api_id ->
        assert message_data[:totalGrossAmount] == total_gross_amount
        assert message_data[:invoiceItems] == []
        :ok
      end)

      {:ok, updated_invoice} = PublishInvoice.process(%{invoice: invoice})

      assert updated_invoice.stage == "publish_invoice"
      assert updated_invoice.state == "PUBLISHED"
    end

    test "absent optional data is not sent", %{invoice: invoice} do
      invoice =
        invoice
        |> Ecto.Changeset.change(%{due_date: nil})
        |> Repo.update!()

      AvkService.Boundary.AmqpPublisherMock
      |> expect(:send, fn message_data, _tenant_name, _message_id, _event_type, _app_id ->
        refute Map.has_key?(message_data, :due_date)
        :ok
      end)

      {:ok, updated_invoice} = PublishInvoice.process(%{invoice: invoice})

      assert updated_invoice.stage == "publish_invoice"
      assert updated_invoice.state == "PUBLISHED"
    end

    test "error with missing required line item data", %{invoice: invoice} do
      hd(invoice.line_items)
      |> Ecto.Changeset.change(%{booking_amount: nil})
      |> Repo.update!()

      AvkService.Boundary.AmqpPublisherMock
      |> expect(:send, fn _data, _tenant_name, _message_id, _event_type, _app_id ->
        {:error, "Missing required line item fields: : [:bookingAmount]"}
      end)

      assert PublishInvoice.process(%{invoice: invoice}) ==
               {:error, "Missing required line item fields: : [:bookingAmount]"}
    end

    test "error with missing invoice data fields" do
      invoice =
        insert(:invoice, accounting_reference: nil, creditor_reference: nil, language_code: nil)

      assert PublishInvoice.process(%{invoice: invoice}) ==
               {:error,
                "Missing required fields: [:externalReference, :creditorReference, :accountingReference, :iban, :languageCode, :invoiceDate]"}
    end

    test "log and returns {:error, reason}", %{invoice: invoice} do
      AvkService.Boundary.AmqpPublisherMock
      |> expect(:send, fn _data, _tenant_name, _message_id, _event_type, _app_id ->
        {:error, "some error"}
      end)

      assert PublishInvoice.process(%{invoice: invoice}) == {:error, "some error"}
    end
  end
end

Receive from a Queue

defmodule AvkService.Boundary.AmqpConsumer do
  @moduledoc """
  Broadway consumer for RabbitMQ messages.
  """
  use Broadway
  require Logger

  alias AvkService.Pipeline.Worker.InvoiceResponse
  alias Jason

  @queue "avk_service"

  def start_link(_opts \\ []) do
    config = Application.fetch_env!(:avk_service, :rabbitmq)

    producer =
      {BroadwayRabbitMQ.Producer,
       queue: @queue,
       connection: config[:connection_params],
       qos: [prefetch_count: 50],
       # need list attributes to read metadata fields
       metadata: [:correlation_id],
       on_failure: :reject_and_requeue}

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [module: producer, concurrency: 1],
      processors: [default: [concurrency: 50]],
      batchers: [default: [batch_size: 10, batch_timeout: 1500, concurrency: 5]]
    )
  end

  @impl true
  # must return messages (needed?)
  def handle_batch(_batcher, messages, _batch_info, _context), do: messages

  @impl true
  def handle_message(_, message, _) do
    with {:ok, payload} when is_map(payload) <- Jason.decode(message.data),
         correlation_id when not is_nil(correlation_id) <- message.metadata.correlation_id,
         event_type when not is_nil(event_type) <- payload["eventType"],
         id when is_integer(id) <- invoice_id(correlation_id),
         data when not is_nil(data) <- payload["data"] do
      InvoiceResponse.update_invoice(event_type, id, data)
      # must return a message!
      message
    else
      _ ->
        Logger.error("Invoice response ignored: #{inspect(message)}")
        Broadway.Message.failed(message, "unprocessable")
    end
  end

  defp invoice_id("invoice-" <> str_id) do
    {id, _str} = Integer.parse(str_id)
    id
  end

  defp invoice_id(correlation_id) do
    Logger.error("Unexpected Correlation ID: #{inspect(correlation_id)}")
    nil
  end
end

test config

import Config

# Configure your database
#
# The MIX_TEST_PARTITION environment variable can be used
# to provide built-in test partitioning in CI environment.
# Run `mix help test` for more information.
config :avk_service, AvkService.Repo,
  username: System.get_env("POSTGRES_USER", "rem2"),
  password: System.get_env("POSTGRES_PW", ""),
  hostname: System.get_env("POSTGRES_HOST", "localhost"),
  database: "avk_service_test#{System.get_env("MIX_TEST_PARTITION")}",
  pool: Ecto.Adapters.SQL.Sandbox,
  pool_size: System.schedulers_online() * 2

# We don't run a server during test. If one is required,
# you can enable the server option below.
config :avk_service, AvkServiceWeb.Endpoint,
  http: [ip: {127, 0, 0, 1}, port: 4002],
  secret_key_base: "p8wGTGKppAKj3N+r7iTdo9w0WfWRSeRMt7Bzv8wNzw0mvvHn1LtlolXPijRZJSdX",
  server: false

# DO NOT use rabbitmq consumer in tests (its not available)
# AvkService.Boundary.AmqpConsumer,
config :avk_service, :application_children, [
  AvkServiceWeb.Telemetry,
  AvkService.Repo,
  {DNSCluster, query: Application.get_env(:avk_service, :dns_cluster_query) || :ignore},
  {Phoenix.PubSub, name: AvkService.PubSub},
  # Start the Finch HTTP client for sending emails
  {Finch, name: AvkService.Finch},
  # Start a worker by calling: AvkService.Worker.start_link(arg)
  # {AvkService.Worker, arg},
  # Start to serve requests, typically the last entry
  AvkServiceWeb.Endpoint,
  # PromEx should be started after the Endpoint, to avoid unnecessary error messages
  AvkService.PromEx,
  # invoice pdf processing pipeline
  AvkService.Pipeline.PdfFilesQueue,
  {AvkService.Pipeline.BroadwayPdfToInvoice, []}
]

config :avk_service, :parashift_api,
  token: "token",
  endpoint: "https://api.parashift.io/v2",
  classification_scope: "garaio-pp-invoice-ch"

config :avk_service, :dti_api,
  app_key: "123",
  endpoint: "https://cloudextract-studio.dti.ch/extraction/api/",
  plan_id: "123"

config :tesla, adapter: Tesla.Mock

# Stub Req requests in tests
config :avk_service, graphql_req_options: [plug: {Req.Test, AvkService.Boundary.GraphQL}]

config :avk_service,
  cloud_extract_dti_api: [plug: {Req.Test, AvkService.Boundary.CloudextractDtiApi}],
  cloud_extract_parashift_api: [plug: {Req.Test, AvkService.Boundary.ParashiftApi}]

# In test we don't send emails.
config :avk_service, AvkService.Mailer, adapter: Swoosh.Adapters.Test

# Disable swoosh api client as it is only required for production adapters.
config :swoosh, :api_client, false

# Print only errors during test
config :logger, level: :none

# Initialize plugs at runtime for faster test compilation
config :phoenix, :plug_init_mode, :runtime

config :phoenix_live_view,
  # Enable helpful, but potentially expensive runtime checks
  enable_expensive_runtime_checks: true

config :junit_formatter,
  report_file: "report_file_test.xml",
  report_dir: "./ci_reports",
  print_report_file: true,
  prepend_project_name?: true

config :avk_service, :admin_basic_auth,
  username: "testadmin",
  password: "testpassword"

config :avk_service, :graphql_api,
  client_id: "test_client_id",
  client_secret: "test_client_secret"

# Boundary Modules where we cannot mock the API
config :avk_service, :boundaries, storage: AvkService.Boundary.StorageMock
config :avk_service, :boundaries, graphql: AvkService.Boundary.GraphQLMock
config :avk_service, :boundaries, extraction_api: AvkService.Boundary.ExtractionApiMock
config :avk_service, :boundaries, amqp_publisher: AvkService.Boundary.AmqpPublisherMock

# RabbitMQ Config
config :avk_service, :rabbitmq,
  connection_params: [
    host: "localhost",
    virtual_host: "/",
    port: 5672,
    username: "admin",
    password: "admin"
  ]

runtime config

import Config

if System.get_env("PHX_SERVER") do
  config :avk_service, AvkServiceWeb.Endpoint, server: true
end

if config_env() != :test do
  config :avk_service, :application_children, [
    AvkServiceWeb.Telemetry,
    AvkService.Repo,
    {DNSCluster, query: Application.get_env(:avk_service, :dns_cluster_query) || :ignore},
    {Phoenix.PubSub, name: AvkService.PubSub},
    # Start the Finch HTTP client for sending emails
    {Finch, name: AvkService.Finch},
    # Start a worker by calling: AvkService.Worker.start_link(arg)
    # {AvkService.Worker, arg},
    # Start to serve requests, typically the last entry
    AvkServiceWeb.Endpoint,
    # PromEx should be started after the Endpoint, to avoid unnecessary error messages
    AvkService.PromEx,
    # retrieves (rabbitmq) messages returned from Invoicing.Invoice.Created events
    AvkService.Boundary.AmqpConsumer,
    # invoice pdf processing pipeline
    AvkService.Pipeline.PdfFilesQueue,
    {AvkService.Pipeline.BroadwayPdfToInvoice, []}
  ]

  config :avk_service, :boundaries, amqp_publisher: AvkService.Boundary.AmqpPublisher

  # RabbitMQ Config
  config :avk_service, :rabbitmq,
    connection_params: [
      host:
        System.get_env("RABBITMQ_HOST") ||
          raise("Missing env variable: RABBITMQ_HOST"),
      virtual_host:
        System.get_env("RABBITMQ_VHOST") ||
          raise("Missing env variable: RABBITMQ_VHOST"),
      port:
        String.to_integer(System.get_env("RABBITMQ_PORT")) ||
          raise("Missing env variable: RABBITMQ_PORT"),
      username:
        System.get_env("RABBITMQ_USER") ||
          raise("Missing env variable: RABBITMQ_USER"),
      password:
        System.get_env("RABBITMQ_PASSWORD") ||
          raise("Missing env variable: RABBITMQ_PASSWORD")
    ]
end

testing

defmodule AvkService.Boundary.AmqpConsumerTest do
  use AvkService.DataCase, async: true
  import AvkService.Factory

  alias AvkService.Boundary.AmqpConsumer
  alias AvkService.Invoicing
  alias AvkService.Repo
  alias Broadway.Message

  describe "handle_message/3" do
    setup do
      now = DateTime.utc_now() |> DateTime.to_date()
      due_date = Date.add(now, 30) |> Date.to_string()
      invoice_date = Date.add(now, -10) |> Date.to_string()

      storage_filepath = Ecto.UUID.generate() <> ".pdf"

      invoice =
        insert(:invoice,
          iban: "CH9300762011623852957",
          external_reference: "123456789",
          storage_filepath: storage_filepath,
          accounting_reference: "1000",
          creditor_reference: "2345",
          order_reference: "34567",
          invoice_date: invoice_date,
          due_date: due_date,
          stage: "publish_invoice",
          state: "PUBLISHED"
        )

      insert_list(2, :invoice_line_item, invoice: invoice)
      invoice = invoice |> Repo.preload([:tenant, :line_items])

      accepted_data = %{key: "value"}
      rejected_data = %{reasons: [%{reason: "first reason"}, %{reason: "second reason"}]}

      accepted_message = %Message{
        data: Jason.encode!(%{eventType: "Invoicing.Invoice.Accepted", data: accepted_data}),
        metadata: %{correlation_id: "invoice-#{invoice.id}"},
        acknowledger: {AvkService.Boundary.AmqpConsumer, :ack_id, :ack_data}
      }

      rejected_message = %Message{
        data: Jason.encode!(%{eventType: "Invoicing.Invoice.Rejected", data: rejected_data}),
        metadata: %{correlation_id: "invoice-#{invoice.id}"},
        acknowledger: {AvkService.Boundary.AmqpConsumer, :ack_id, :ack_data}
      }

      %{invoice: invoice, accepted_message: accepted_message, rejected_message: rejected_message}
    end

    test "updates the invoice state when receiving an Invoicing.Invoice.Accepted message", %{
      invoice: invoice,
      accepted_message: accepted_message
    } do
      _processed_message = AmqpConsumer.handle_message(nil, accepted_message, nil)
      updated_invoice = Invoicing.get_invoice!(invoice.id)

      assert updated_invoice.stage == "invoice_response"
      assert updated_invoice.state == "OK"

      expected = Invoicing.get_invoice!(invoice.id)
      assert updated_invoice == expected
    end

    test "updates the invoice state when receiving an Invoicing.Invoice.Rejected", %{
      invoice: invoice,
      rejected_message: rejected_message
    } do
      _processed_message = AmqpConsumer.handle_message(nil, rejected_message, nil)
      updated_invoice = Invoicing.get_invoice!(invoice.id)

      assert updated_invoice.stage == "invoice_response"
      assert updated_invoice.state == "REJECTED"

      expected = Invoicing.get_invoice!(invoice.id)
      assert updated_invoice == expected
    end

    test "does not change the invoice state when receiving an unexpected event", %{
      invoice: invoice
    } do
      unexpected_message =
        %Message{
          data:
            Jason.encode!(%{eventType: "Invoicing.Invoice.Unexpected", data: %{key: "value"}}),
          metadata: %{correlation_id: "invoice-#{invoice.id}"},
          acknowledger: {AvkService.Boundary.AmqpConsumer, :ack_id, :ack_data}
        }

      _processed_message = AmqpConsumer.handle_message(nil, unexpected_message, nil)
      updated_invoice = Invoicing.get_invoice!(invoice.id)

      assert updated_invoice.stage == "publish_invoice"
      assert updated_invoice.state == "PUBLISHED"
    end

    test "does not change the invoice state when receiving an invalid correlation_id", %{
      invoice: invoice
    } do
      unexpected_message =
        %Message{
          data:
            Jason.encode!(%{eventType: "Invoicing.Invoice.Unexpected", data: %{key: "value"}}),
          metadata: %{correlation_id: "invoice-0"},
          acknowledger: {__MODULE__, :ack_id, :ack_data}
        }

      _processed_message = AmqpConsumer.handle_message(nil, unexpected_message, nil)
      updated_invoice = Invoicing.get_invoice!(invoice.id)

      assert updated_invoice.stage == "publish_invoice"
      assert updated_invoice.state == "PUBLISHED"
    end
  end
end

Resources

Bill Tihen
Bill Tihen
Developer, Data Enthusiast, Educator and Nature’s Friend

very curious – known to explore knownledge and nature