• Stars
    star
    188
  • Rank 205,563 (Top 5 %)
  • Language PLpgSQL
  • Created almost 9 years ago
  • Updated over 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

postgresql event sourcing

postgresql-event-sourcing

Experiment using PostgreSQL as a natively event sourcing database.

Uses triggers and functions to manage projections transactionally.

The basic flow of action is:

event -> after insert trigger -> trigger function -> projection function -> projection

The advantage of this model is that triggers ensure the projections are always up to date, but we do not lose the ability to replay the event stream with the same logic.

Events

Event Sourcing ensures that all changes to application state are stored as a sequence of events.

Events are stored in an events table.

We assume that all objects/entities in the system have a globally unique identifier.

Column Details
id Primary Key
uuid Unique ID of the entity the event references
type The event type, used when building projections
body Event data as JSON
inserted_at timestamp of event insert
CREATE TABLE "events" (
  "id" serial primary key not null,
  "uuid" uuid NOT NULL,
  "type" text NOT NULL,
  "body" jsonb NOT NULL,
  "inserted_at" timestamp(6) NOT NULL DEFAULT statement_timestamp()
);

An example event, tracking an update to the name of the user identified by a uuid:

insert into events (type, uuid, body)
values ('user_update', '11111111-1111-1111-1111-111111111111', '{"name": "blah"}');

Projection Triggers

Use after insert triggers on the events table to handle the incoming event actions.

In order to replay the events outside of the trigger mechanism, we wrap a general projection function inside the trigger. This will make more sense in a moment.

Below we create a trigger function and a trigger to execute. The trigger uses a conditional to only fire when the appropriate event type has been inserted.

create or replace function fn_trigger_user_create() returns trigger
  security definer
  language plpgsql
as $$
  begin
    perform fn_project_user_create(new.uuid, new.body);
    return new;
  end;
$$;

create trigger event_insert_user_create after insert on events
  for each row
  when (new.type = 'user_create')
  execute procedure fn_trigger_user_insert();

Projection Functions

A projection function does the actual work of handling the event data and mapping to the appropriate projection.

Multiple triggers and multiple functions can be added to handle different aspects of the same event type if required.

Assuming a users table with a name and uuid, the following function inserts a new user record into the table based on the user_create event.

create or replace function fn_project_user_create(uuid uuid, body jsonb) returns integer
  security definer
  language plpgsql as $$
  declare result int;
  begin
    insert into users(uuid, name, inserted_at, updated_at)
      values(uuid, body->>'name', NOW(), NOW())
      returning id into result;
    return result;
  end;
$$;

JSON can be referenced using the native operators in PostgreSQL 9.5. body->>'name' extracts the value of the name field from the body JSON.

Any constraints on the table will also be enforced, ensuring referential integrity.

Replay Event Stream

Using projection functions means that at any point the events can be replayed, simply by calling the function and passing the correct identifier and data.

The following code replays all user_create events in order

do language plpgsql $$
  declare
    e record;
  begin
    for e in select uuid body from events where type = 'user_create' order by inserted_at asc loop
      perform fn_project_user_create(e.uuid, e.body);
    end loop;
  end;
$$;

Any valid query can be used as the basis for the replay loop, and any combination of valid events.

The following code replays all events for the user identified by the specified uuid:

do language plpgsql $$
  declare
    e record;
  begin
    for e in select type, uuid, body from events where uuid = '11111111-1111-1111-1111-111111111111' order by inserted_at asc loop
    case e.type
      when 'user_create' then
        perform fn_project_user_create(e.uuid, e.body);
	   when 'user_update' then
        perform fn_project_user_update(e.uuid, e.body);
	  end case;
    end loop;
  end;
$$;

All of these functions will be executed in the same transaction block. This doesn't particularly matter in an event sourced system, but it is good to know.

PostgreSQL is not just limited to processing events iteratively.

Below is an example of using a materialized view to project the user data.

create materialized view users_view as
  with t as (
      select *, row_number() over(partition by uuid order by inserted_at desc) as row_number
      from events
      where type = 'user_update'
  )
  select uuid, body->>'name' as name, inserted_at from t where row_number = 1;

select * from users_view;

In this case we assume that the most recent update event contains the correct user data, and we query to find the most recent update_user event for each user identified.