Simple and reliable Postgres Change Data Capture (CDC) in Elixir.
WalEx allows you to listen to change events on your Postgres tables then send them on to destinations or perform callback-like actions with the data via the DSL. For example:
- Stream database changes to an event service like EventRelay
- Send a user a welcome email after they create a new account
- Augment an existing Postgres-backed application with business logic
- Send events to third party services (analytics, CRM, webhooks)
- Update index / invalidate cache whenever a record is changed
You can learn more about CDC and what you can do with it here: Why capture changes?
This library borrows liberally from realtime from Supabase, which in turn draws heavily on cainophile.
If available in Hex, the package can be installed
by adding walex
to your list of dependencies in mix.exs
:
def deps do
[
{:walex, "~> 3.8.0"}
]
end
WalEx only supports PostgreSQL. To get started, you first need to configure PostgreSQL for logical replication:
ALTER SYSTEM SET wal_level = 'logical';
Docker Compose:
command: [ "postgres", "-c", "wal_level=logical" ]
When you change the wal_level
variable, you'll need to restart your
PostgreSQL server. Once you've restarted, go ahead and create a
publication
for the tables you want to receive changes for:
All tables:
CREATE PUBLICATION events FOR ALL TABLES;
Or just specific tables:
CREATE PUBLICATION events FOR TABLE user, todo;
Filter based on row conditions (Postgres v15+ only):
CREATE PUBLICATION user_event FOR TABLE user WHERE (active IS TRUE);
WalEx supports all of the settings for REPLICA
IDENTITY.
Use FULL
if you can use it, as it will make tracking differences easier as
the old data will be sent alongside the new data. You'll need to set this for
each table.
Specific tables:
ALTER TABLE user REPLICA IDENTITY FULL;
ALTER TABLE todo REPLICA IDENTITY FULL;
Also, be mindful of replication gotchas.
Amazon (AWS) RDS Postgres allows you to configure logical replication.
- https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#setting-up-postgresql
- https://dev.to/vumdao/how-to-change-rds-postgresql-configurations-2kmk
When creating a new Postgres database on RDS, you'll need to set a Parameter Group with the following settings:
rds.logical_replication = 1
max_replication_slots = 5
max_slot_wal_keep_size = 2048
# config.exs
config :my_app, WalEx,
hostname: "localhost",
username: "postgres",
password: "postgres",
port: "5432",
database: "postgres",
publication: "events",
subscriptions: ["user", "todo"],
# optional
destinations: [
# WalEx assumes your module names match this pattern: MyApp.Events.User, MyApp.Events.ToDo, etc
# but you can also specify custom modules like so:
# modules: [MyApp.CustomModule, MyApp.OtherCustomModule],
webhooks: ["https://webhook.site/c2f32b47-33ef-425c-9ed2-f369529a0de8"],
event_relay_topic: "todos"
],
webhook_signing_secret: "9da89f5f8f4717099c698a17c0d3a1869ee227de06c27b18",
event_relay: [
host: "localhost",
port: "50051",
token:
"cmpiNmpFSGhtNVhORFVubDFzUW9OR1JqTlFZOVFFcjRwZWMxS2VWRzJIOnY5NkFRQVFjSVp0TWVmc3hpRl8ydVZuaW9FTC0wX3JrZjhXcTE4MS1EbnVLU1p5VF9OZkpBZGs1SlFuQlNNdVg="
],
name: MyApp
It is also possible to just define the URL configuration for the database
# config.exs
config :my_app, WalEx,
url: "postgres://username:password@hostname:port/database"
publication: "events",
subscriptions: ["user", "todo"],
name: MyApp
You can also dynamically update the config at runtime:
WalEx.Configs.add_config(MyApp, :subscriptions, ["new_subscriptions_1", "new_subscriptions_2"])
WalEx.Configs.remove_config(MyApp, :subscriptions, "subscriptions")
WalEx.Configs.replace_config(MyApp, :password, "new_password")
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{WalEx.Supervisor, Application.get_env(:my_app, WalEx)}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Returned change data is a List of %Event{} structs with changes. UPDATE event example where name field was changed):
[
%Walex.Event{
name: :user,
type: :update,
source: %WalEx.Event.Source{
name: "WalEx",
version: "3.8.0",
db: "todos",
schema: "public",
table: "user",
columns: %{
id: "integer",
name: "varchar",
created_at: "timestamptz"
}
},
new_record: %{
id: 1234,
name: "Chase Pursley",
created_at: #DateTime<2023-08-18 14:09:05.988369-04:00 -04 Etc/UTC-4>
},
# we don't show old_record for update to reduce payload size
# however, you can see any old values that changed under "changes"
old_record: nil,
changes: %{
name: %{
new_value: "Chase Pursley",
old_value: "Chase"
}
},
timestamp: ~U[2023-12-18 15:50:08.329504Z]
}
]
If your app is named MyApp and you have a subscription called :user (which represents a database table), WalEx assumes you have a module called MyApp.Events.User
that uses WalEx Event. But you can also define any custom module, just be sure to add it to the modules config under destinations.
Note that the result of events
is a list. This is because WalEx returns a List of transactions for a particular table when there's a change event. Often times this will just contain one result, but it could be many (for example, if you use database triggers to update a column after an insert).
defmodule MyApp.Events.User do
use WalEx.Event, name: MyApp
# any subscribed event
on_event(:all, fn events ->
IO.inspect(events: events)
end)
# any user event
on_event(:user, fn users ->
IO.inspect(on_event: users)
# do something with users data
end)
# any user insert event
on_insert(:user, fn users ->
IO.inspect(on_insert: users)
end)
on_update(:user, fn users ->
IO.inspect(on_update: users)
end)
on_delete(:user, fn users ->
IO.inspect(on_delete: users)
end)
A common scenario is where you want to "unsubscribe" from specific records (for example, temporarily for a migration or data fix). One way to accomplish this is to have a column with a value like event_subscribe: false
. Then you can ignore specific events by specifying their key and value to unwatched_records.
Another scenario is you might not care when just certain fields change. For example, maybe a database trigger sets updated_at after a record is updated. Or a count changes, or several do that you don't need to react to. In this case, you can ignore the event change by adding them to unwatched_fields.
Additional filter helpers available in the WalEx.TransactionFilter module.
defmodule MyApp.Events.User do
use WalEx.Event, name: MyApp
@filters %{
unwatched_records: %{event_subscribe: false},
unwatched_fields: ~w(event_id updated_at todos_count)a
}
on_insert(:user, @filters, fn users ->
IO.inspect(on_insert: users)
# resulting users data is filtered
end)
end
You can also provide a list of functions (as atoms) to be applied to each Event (after optional filters are applied). Each function is run as an async Task on each event. The functions must be defined in the current module and take a single event argument. Use with caution!
defmodule MyApp.Events.User do
use WalEx.Event, name: MyApp
@filters %{unwatched_records: %{event_subscribe: false}}
@functions ~w(send_welcome_email add_to_crm clear_cache)a
on_insert(:user, @filters, @functions, fn users ->
IO.inspect(on_insert: users)
# resulting users data is first filtered then functions are applied
end)
def send_welcome_email(user) do
# logic for sending welcome email to new user
end
def add_to_crm(user) do
# logic for adding user to crm system
end
def clear_cache(user) do
# logic for clearing user cache
end
end
You can optionally configure WalEx to automatically send events to the following destinations without needing to know Elixir:
Send subscribed events to one or more webhooks. WalEx supports the Standard Webhooks spec via the webhoox library (which can also be used to receive webhooks in Elixir).
If you need something more durable and flexible than webhooks, check out EventRelay.
In EventRelay, you'll need to create a topic matching what's in the WalEx destinations config. So, if your event_relay_topic is called todos (usually this is the database name), then your topic name in EventRelay should be todos
. Here's how to do it via grpcurl:
grpcurl -H "Authorization: Bearer {api_key_token}" -plaintext -proto event_relay.proto -d '{"name": "todos"}' localhost:50051 eventrelay.Topics.CreateTopic
More destinations coming. Pull requests welcome!
You'll need a local Postgres instance running
MIX_ENV=test mix walex.setup
MIX_ENV=test mix test
I would love to have your help! I do ask that if you do find a bug, please add a test to your PR that shows the bug and how it was fixed.
Thanks!