This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Features

Replicator features

Replicator offers the following features:

1 - Scavenge

Filter out events from deleted streams, expired by age or count, but not yet scavenged.

By default, Replicator would watch for events, which should not be present in the source cluster as they are considered as deleted, but not removed from the database due to lack of scavenge.

Example cases:

  • The database wasn’t scavenged for a while
  • A stream was deleted, but not scavenged yet
  • A stream was truncated either by max age or max count, but not scavenged yet

All those events will not be replicated.

This feature can be disabled by setting the replicator.scavenge option to false.

2 - Event filters

Filter out events by stream name or event type, using regular expressions. Positive and negative matches are supported.

You may want to prevent some events from being replicated. Those could be obsolete or “wrong” events, which your system doesn’t need.

We provide two filters (in addition to the special scavenge filter):

  • Event type filter
  • Stream name filter

Filter options:

Option Values Description
type eventType or streamName One of the available filters
include Regular expression Filter for allowing events to be replicated
exclude Regular expression Filter for preventing events from being replicated

For example:

replicator:
  filters:
  - type: eventType
    include: "."
    exclude: "((Bad|Wrong)\w+Event)"

You can configure zero or more filters. Scavenge filter is enabled by the scavenge setting and doesn’t need to be present in the filter list. You can specify either include or exclude regular expression, or both. When both include and exclude regular expressions are configured, the filter will check both, so the event must match the inclusion expression and not match the exclusion expression.

3 - Sinks

Supported event sinks

3.1 - EventStoreDB gRPC Sink

EventStoreDB sink with gRPC protocol, supported by latest versions (v20+).

When replicating events to Event Store Cloud, we recommend using the EventStoreDB gRPC sink.

You need to specify two configurations options for it:

  • replicator.sink.protocol - set to grpc
  • replicator.sink.connectionString - use the target cluster connection string, which you’d use for the gRPC client.

For example, for an Event Store Cloud cluster the connection string would look like:

esdb+discover://<username>:<password>@<cluster_id>.mesdb.eventstore.cloud.

Using gRPC gives you more predictable write operation time. For example, on a C4-size instance in Google Cloud Platform, one write would take 4-5 ms, and this number allows you to calculate the replication process throughput, as it doesn’t change much when the database size grows.

3.2 - EventStoreDB TCP Sink

EventStoreDB sink with TCP protocol, supported by older versions (v5 and earlier).

The TCP sink should only be used when migrating from one older version cluster to another older version cluster. As Event Store plans to phase out the TCP client and protocol, consider using the gRPC sink instead.

For the TCP sink, you need to specify two configurations options for it:

  • replicator.sink.protocol - set to tcp
  • replicator.sink.connectionString - use the target cluster connection string, which you’d use for the TCP client.

Check the connection string format and options in the TCP client documentation.

The risk of using the TCP sink is that you might get unstable write speed. The speed might go down when the database size grows, unlike gRPC sink write speed, which remains stable.

3.3 - Kafka Sink

Sink for Apache Kafka, using confirmed writes

The Kafka sink allows you to set up continuous replication from EventStoreDB to Apache Kafka. It might be useful, for example, to scale out subscriptions, as you can partition events in Kafka. Then, you can have a consumer group with concurrent consumers, which process individual partitions, instead of having a single partition on $all.

There’s no way to specify a custom partition, so the default (random) Kafka partitioner will be used.

The Kafka sink needs to be configured in the sink section of the Replicator configuration.

  • replicator.sink.protocol - set to kafka
  • replicator.sink.connectionString - Kafka connection string, which is a comma-separated list of connection options
  • replicator.sink.partitionCount - the number of Kafka partitions in the target topic
  • replicator.sink.router - optional JavaScript function to route events to topics and partitions

Example:

replicator:
  reader:
    connectionString: esdb+discover://admin:changeit@xyz.mesb.eventstore.cloud
    protocol: grpc
  sink:
    connectionString: bootstrap.servers=localhost:9092
    protocol: kafka
    partitionCount: 10
    router: ./config/route.js

Routing

Replicator needs to route events to Kafka. In particular, it needs to know the topic, where to write events to, and the partition key. By default, the topic is the stream “category” (similar to the category projection), which is part of the event stream before the dash. For example, an event from Customer-123 stream will be routed to the Customer topic. The stream name is used as the partition key to ensure events order within a stream.

It’s possible to customise both topic and partition key by using a routing function. You can supply a JavaScript code file, which will instruct Replicator about routing events to topics and partitions.

The code file must have a function called route, which accepts the following parameters:

  • stream - original stream name
  • eventType - original event type
  • data - event payload (data), only works with JSON
  • metadata - event metadata, only works with JSON

The function needs to return an object with two fields:

  • topic - target topic
  • partitionKey - partition key

For example:

function route(stream, eventType, data, meta) {
    return {
        topic: "myTopic",
        partitionKey: stream
    }
}

The example function will tell Replicator to produce all the events to the myTopic topic, using the stream name as partition key.

You need to specify the name of the while, which contains the route function, in the replicator.sink.router setting. Such a configuration is displayed in the sample configuration YAML snipped above.

3.4 - Sink Partitioning

Increase the speed of writes by enabling partitioned sinks.

Write modes

Replicator will read events from the source cluster using batched reads of 4096 (default) events per batch. As it reads from $all, one batch will contain events for different streams. Therefore, writing events requires a single write operation per event to ensure the correct order of events written to the target cluster.

If you don’t care much about events order in $all, you can configure Replicator to use concurrent writers, which will increase performance. The tool uses concurrent writers with a configurable concurrency limit. Writes are partitioned, and the order of written events within a partition is kept intact. Read more below about different available partitioning modes.

Partition by stream name

Writers can be partitioned by stream name. This guarantees that events in individual streams will be in the same order as in the source cluster, but the order of $all will be slightly off.

To enable concurrent writers partitioned by stream name, you need to change the replicator.sink.partitionCount setting. The default value is 1, so all the writes are sequential.

Custom partitions

You can also use a JavaScript function to use event data or metadata for partitioning writers. The function must be named partition, it accepts a single argument, which is an object with the following schema:

{
  "stream": "",
  "eventType": "",
  "data": {},
  "metadata": {}
}

The function must return a string, which is then used as a partition key.

For example, the following function will return the Tenant property of the event payload, to be used as the partition key:

function partition(event) {
    return event.data.Tenant;
}

There are two modes for custom partitions, described below.

Partitioning by hash

As with the stream name partitioning, the custom partition key is hashed, and the hash of the key is used to decide which partition will take the event. This method allows having less partitions than there are keys.

To use this mode you need to set the partition count using the replicator.sink.partitionCount setting, and also specify the file name of the partitioning function in the replicator.sink.partitioner setting. For example:

replicator:
  sink:
    partitionCount: 10
    partitioner: ./partitioner.js

Partition by value

In some cases, it’s better to assign a single partition for each partition key. Use this method only if the number of unique values for the partition key is upper bound. This strategy works well for partitioning by tenant, for example, if the number of tenants doesn’t exceed a hundred. You can also decide to go beyond this limit, but each partition uses some memory, so you need to allocate enough memory space for a high partition count. In addition, be aware of the performance concerns described in the next section. Those concerns might be less relevant though as not all the partitions will be active simultaneously if a single page doesn’t contain events for all tenants at once.

To use value-based partitioning, use the same partitioning function signature. The difference is that for each returned partition key there will be a separate partition. For example, if the function deterministically return 10 different values, there will be 10 partitions. You don’t need to configure the partition count, partitions will be dynamically created based on the number of unique keys.

The settings file, therefore, only needs the replicator.sink.partitioner setting configured.

Partition count considerations

Do not set this setting to a very high value, as it might lead to thread starvation, or the target database overload. For example, using six to ten partitions is reasonable for a C4 Event Store Cloud managed database, but higher value might cause degraded performance.

4 - Additional metadata

Additional metadata fields for the target, which include the original event number, position, and creation date.

In addition to copying the events, Replicator will also copy streams metadata. Therefore, changes in ACLs, truncations, stream deletions, setting max count and max age will all be propagated properly to the target cluster.

However, the max age won’t be working properly for events, which are going to exceed the age in the source cluster. That’s because in the target cluster all the events will appear as recently added.

To mitigate the issue, Replication will add the following metadata to all the copied events:

  • $originalCreatedDate
  • $originalEventNumber
  • $originalEventPosition

5 - Event transformations

Transform the event schema, add or remove the data, enrich, or filter out events using complex rules.

During the replication, you might want to transform events using some complex rules. For example, some fields need to be removed, the JSON schema should change, or some data need to be merged, split, or even enriched with external data.

Transformations allow you:

  • Move events to another stream
  • Change the event type
  • Manipulate the event data, like changing field names and values, or even the structure
  • Same, but for metadata
  • [WIP] Slit one event into multiple events

For this purpose, you can use the transformation function. Find out more about available transformation options on pages listed below.

5.1 - JavaScript transformation

Transform and filter events using in-proc JavaScript function

Introduction

When you need to perform simple changes in the event schema, change the stream name or event type based on the existing event details and data, you can use a JavaScript transform.

For this transform, you need to supply a code snippet, written in JavaScript, which does the transformation. The code snippet must be placed in a separate file, and it cannot have any external dependencies. There’s no limitation on how complex the code is. Replicator uses the V8 engine to execute JavaScript code. Therefore, this transform normally doesn’t create a lot of overhead for the replication.

Guidelines

You can configure Replicator to use a JavaScript transformation function using the following parameters:

  • replicator.transform.type - must be set to js
  • replicator.transform.config - name of the file, which contains the transformation function

For example:

replicator:
  transform:
    type: js
    config: ./transform.js

The function itself must be named transform. Replicator will call it with the following arguments:

  • Stream - original stream name
  • EventType - original event type
  • Data - event payload as an object
  • Metadata - event metadata as an object

The function must return an object, which contains Stream, EventType, Data and Metadata fields. Both Data and Metadata must be valid objects, the Metadata field can be undefined. If you haven’t changed the event data, you can pass Data and Metadata arguments, which the function receives as arguments.

Logging

You can log from JavaScript code directly to Replicator logs. Use the log object with debug, info, warn and error. You can use string interpolation as usual, or pass templated strings in Serilog format. The first parameter is the template string, plus you can pass up to five additional values, which could be values or objects.

For example:

log.info(
    "Transforming event {@Data} of type {Type}", 
    original.data, original.eventType
);

Remember that the default log level is Information, so debug logs won’t be shown. Enable debug-level logging by setting the REPLICATOR_DEBUG environment variable to true.

Example

Here is an example of a transformation function, which changes the event data, stream name, and event type:

function transform(original) {
    // Log the transform calls
    log.info(
        "Transforming event {Type} from {Stream}", 
        original.EventType, original.Stream
    );

    // Ignore some events
    if (original.Stream.length > 7) return undefined;

    // Create a new event version
    const newEvent = {
        // Copy original data
        ...original.Data,
        // Change an existing property value 
        Data1: `new${original.Data.Data1}`,
        // Add a new property
        NewProp: `${original.Data.Id} - ${original.Data.Data2}`
    };
    
    // Return the new proposed event with modified stream and type
    return {
        Stream: `transformed${original.Stream}`,
        EventType: `V2.${original.EventType}`,
        Data: newEvent,
        Meta: original.Meta
    }
}

If the function returns undefined, the event will not be replicated, so the JavaScript transform can also be used as an advanced filter. The same happens if the transform function returns an event, but either the stream name or event type is empty or undefined.

5.2 - HTTP transformation

Transform and filter events using out-of-process HTTP server

Introduction

An HTTP transformation can be used for more complex changes in event schema or data, which is done in an external process. It allows you to use any language and stack, and also call external infrastructure to enrich events with additional data.

When configured accordingly, Replicator will call an external HTTP endpoint for each event it processes, and expect a converted event back. As event data is delivered as-is (as bytes) to the transformer, there’s no limitation of the event content type and serialisation format.

Guidelines

Before using the HTTP transformation, you need to build and deploy the transformation function, which is accessible using an HTTP(S) endpoint. The endpoint is built and controlled by you. It must return a response with a transformed event with 200 status code, or 204 status code with no payload. When the Replicator receives a 204 back, it will not propagate the event, so it also works as an advanced filter.

The transformation configuration has two parameters:

  • replicator.transform.type - should be http for HTTP transform
  • replicator.transform.config - the HTTP(S) endpoint URL

For example:

replicator:
  transform:
    type: http
    config: http://transform-func.myapp.svc.cluster.local

Replicator doesn’t support any authentication, so the endpoint must be open and accessible. You can host it at the same place as Replicator itself to avoid the network latency, or elsewhere. For example, your transformation service can be running in the same Kubernetes cluster, so you can provide the internal DNS name to its service. Alternatively, you can use a serverless function.

Replicator will call your endpoint using a POST request with JSON body.

The request and response formats are the same:

{
    "eventType": "string",
    "streamName": "string",
    "metadata": "string",
    "payload": "string"
}

Your HTTP transformation can modify any of these four properties.

Both metadata and payload are UTF-8 encoded bytes as a string.

If you store your events in JSON format, payload will be a JSON string that you can deserialize.

metadata is always a JSON string.

If your endpoint returns HTTP status code 204, the event will be ignored and wont be replicated to the sink.

Example

Here is an example of a serverless function in GCP, which transforms each part of the original event:

using System.Text.Json;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System.Threading.Tasks;

namespace HelloWorld {
    public class Function : IHttpFunction {
        public async Task HandleAsync(HttpContext context) {
            var original = await JsonSerializer
                .DeserializeAsync<HttpEvent>(context.Request.Body);

            var payload = JsonSerializer
                .Deserialize<TestEvent>(original.Payload);
            payload.EventProperty1 = $"Transformed {payload.EventProperty1}";

            var metadata = JsonSerializer
                .Deserialize<Metadata>(original.Metadata);
            metadata.MetadataProperty1 = $"Transformed {metadata.MetadataProperty1}";

            var proposed = new HttpEvent {
                StreamName = $"transformed-{original.StreamName}",
                EventType  = $"V2.{original.EventType}",
                Metadata   = JsonSerializer.Serialize(metadata),
                Payload    = JsonSerializer.Serialize(payload)
            };

            await context.Response
                .WriteAsync(JsonSerializer.Serialize(proposed));
        }

        class HttpEvent {
            public string EventType  { get; set; }
            public string StreamName { get; set; }
            public string Metadata   { get; set; }
            public string Payload    { get; set; }
        }

        class Metadata {
            public string MetadataProperty1 { get; set; }
            public string MetadataProperty2 { get; set; }
        }

        class TestEvent {
            public string EventProperty1 { get; set; }
            public string EventProperty2 { get; set; }
        }
    }
}

The TestEvent is the original event contract, which is kept the same. However, you are free to change the event schema too, if necessary.

6 - Checkpoints

Replication checkpoints

Replicator stores a checkpoint that represents the current position of replicated events.

This allows Replicator to resume processing after a restart, instead of starting from the beginning.

This is controlled via the checkpointAfter setting.

Configuring checkpointAfter

The checkpointAfter setting can be used to control the threshold number of events that must be replicated before a checkpoint is stored, like so:

replicator:
  checkpoint:
    checkpointAfter: 1000

By default, checkpointAfter is configured to store a checkpoint after every 1000 events replicated.

A lower checkpointAfter would mean the replication process has stronger data consistency/duplication guarantees, at the cost of performance.

For example, configuring checkpointAfter: 1 would result in a checkpoint being stored after every replicated event, which would achieve exactly-once processing.

This means in the event of a crash/restart, Replicator is guaranteed to not duplicate any events in the sink database, but comes at the cost of greatly reduced write performance to the sink database.

A higher checkpointAfter would mean writes to the sink database are more performant, at the cost of data consistency/duplication guarantees.

Configuring a higher checkpointAfter improves write performance by ensuring Replicator is not spending so much time saving checkpoints, but introduces a risk of events being duplicated during replication to the sink database in the event of a crash and restart, where a crash ocurred inside of the checkpoint window.

Configure the checkpointAfter to align with your data consistency and performance requirements.

Checkpoint stores

Replicator supports storing checkpoints in different stores. Only one store can be configured per Replicator instance.

If you want to run the replication again, from the same source, using the same Replicator instance and settings, you need to delete the checkpoint from the store.

See the currently supported checkpoint stores below:

6.1 - Checkpoint file

File system checkpoint store

By default, Replicator stores checkpoints in a local file.

The default configuration is:

replicator:
  checkpoint:
    type: file
    path: ./checkpoint
    checkpointAfter: 1000

The path can be configured to store the checkpoint file at a different location, which can be useful for deployments to Kubernetes that may use a custom PVC configuration, for example.

6.2 - Checkpoint in MongoDB

MongoDB checkpoint store

Although in many cases the file-based checkpoint works fine, storing the checkpoint outside the Replicator deployment is more secure. For that purpose you can use the MongoDB checkpoint store. This store writes the checkpoint as a MongoDB document to the specified database. It will unconditionally use the checkpoint collection.

Here are the minimum required settings for storing checkpoints in MongoDB:

replicator:
  checkpoint:
    type: mongo
    path: "mongodb://mongoadmin:secret@localhost:27017"
    checkpointAfter: 1000

The path setting must contain a pre-authenticated connection string.

By default, Replicator will use the replicator database, but can be configured to use another database, like so:

replicator:
  checkpoint:
    database: someOtherDatabase

If you run miltiple Replicator instances that store checkpoints in the same Mongo database, you can configure the instanceId setting to isolate checkpoints stored by other Replicator instances.

By default, Replicator will use default as the instance identifier under the assumption it is the only instance storing checkpoints in the database.

You may configure instanceId like so:

replicator:
  checkpoint:
    instanceId: someUniqueIdentifier