Learn more about Replicator concepts, features and guidelines.
This is the multi-page printable view of this section. Click here to print.
Documentation
- 1: Overview
- 2: Concepts
- 3: Features
- 3.1: Scavenge
- 3.2: Event filters
- 3.3: Sinks
- 3.3.1: EventStoreDB gRPC Sink
- 3.3.2: EventStoreDB TCP Sink
- 3.3.3: Kafka Sink
- 3.3.4: Sink Partitioning
- 3.4: Additional metadata
- 3.5: Event transformations
- 3.5.1: JavaScript transformation
- 3.5.2: HTTP transformation
- 3.6: Checkpoints
- 3.6.1: Checkpoint file
- 3.6.2: Checkpoint in MongoDB
- 4: Known limitations
- 5: Configuration
- 6: Deployment
- 6.1: Run Replicator in Docker
- 6.2: Kubernetes
- 6.2.1: Helm
- 6.2.2: Monitoring
1 - Overview
What is it?
Event Store Replicator, as the name suggests, aims to address the need to keep one EventStoreDB cluster in sync with another. It does so by establishing connections to both source and target clusters, then reading all the events from the source, and propagating them to the target. During the replication process, the tool can apply some rules, which allow you to exclude some events from being propagated to the target.
Why would you use it?
Common replication scenarios include:
-
Cloud migration: When migrating from a self-hosted cluster to Event Store Cloud, you might want to take the data with you. Replicator can help you with that, but it has some limitations on how fast it can copy the historical data.
-
Store migration: Migrating the whole store when your event schema changes severely, or you need to get rid of some obsolete data, you can do it using Replicator too. You can transform events from one contract to another, and filter out obsolete events. It allows you also to overcome the limitation of not being able to delete events in the middle of the stream. Greg Young promotes a complete store migration with transformation as part of the release cycle, to avoid event versioning issues. You can, for example, listen about it here.
-
Backup: You can also replicate data between two clusters, so in case of catastrophic failure, you will have a working cluster with recent data.
-
What is it not yet good for?: Replicator uses client protocols (TCP and gRPC), with all the limitations. For example, to keep the global event order intact, you must use a single writer. As the transaction scope is limited to one stream, you get sequential writes of one event at a time, which doesn’t deliver exceptional speed. Relaxing ordering guarantees helps to increase the performance, but for large databases (hundreds of millions events and more) and guaranteed global order, it might not be the tool for you.
Where should I go next?
Give your users next steps from the Overview. For example:
- Features: Check out Replicator features
- Limitations: Make sure you understand the tool limitations
2 - Concepts
Replicator is designed to copy events from one place to another, which sounds like a relatively simple task. Still, there are some concepts you need to understand before using the tool.
Reader
Reader is an adapter for the infrastructure, where you want to copy events from. The reader reads from a source.
Currently, we support readers for EventStoreDB, using TCP and gRPC protocols. Each reader type requires its own configuration, which is usually just a connection string, specific to each reader type.
The reader always reads events in sequence, but all the readers support batched reads.
There is only one reader per running Replicator instance.
Sink and writers
Reader is an adapter for the infrastructure, where you want to copy events to. The sink has one or more writers. By using multiple writers, one sink can improve performance by parallelising writes.
When using one writer for a sink, the order of events in the target remains exactly the same as it was in the source.
When using more than one writer, the global order of events in the source cannot be guaranteed. However, multiple writers also enable partitioning. The default partition key is the stream name, which guarantees the order of events in each stream.
You can only have one sink per running Replicator instance, but it might have multiple writers.
Checkpoint
A running Replicator instance progresses linearly over a given stream of events, so it knows at any time, which events were already processed. As the process might be shut down for different reasons, it needs to maintain the last processed event position, so in case of restart, Replicator will start from there, and not from the very beginning. This way, you don’t get duplicated events in the sink, and you can be sure that the replication process will eventually be completed.
The location of the last processed event in the source is known as checkpoint. Replicator supports storing the checkpoint in different stores. If you want to run the replication again, from the same source, using the same Replicator instance, you need to delete the checkpoint file.
Filters
As you might want to ignore some events during replication, Replicator supports different filters. Filters allow you to cover cases like preventing some obsolete events from being replicated, or splitting one source to two targets. In the latter case, you can run two Replicator instances with different filters, so events will be distributed to different sinks.
Transforms
After being in production for a while, most systems accumulate legacy data. You might want to remove some of it using filters, but you might also want to keep the data in a different format. Typical scenarios include evolution of event schema, missing fields, incorrect data format, oversharing (sensitive unprotected information), etc.
These cases can be handler by using transforms, which allow you to change any part of the event that comes from the source, before writing it to the sink.
3 - Features
Replicator offers the following features:
3.1 - Scavenge
By default, Replicator would watch for events, which should not be present in the source cluster as they are considered as deleted, but not removed from the database due to lack of scavenge.
Example cases:
- The database wasn’t scavenged for a while
- A stream was deleted, but not scavenged yet
- A stream was truncated either by max age or max count, but not scavenged yet
All those events will not be replicated.
This feature can be disabled by setting the replicator.scavenge
option to false
.
3.2 - Event filters
You may want to prevent some events from being replicated. Those could be obsolete or “wrong” events, which your system doesn’t need.
We provide two filters (in addition to the special scavenge filter):
- Event type filter
- Stream name filter
Filter options:
Option | Values | Description |
---|---|---|
type |
eventType or streamName |
One of the available filters |
include |
Regular expression | Filter for allowing events to be replicated |
exclude |
Regular expression | Filter for preventing events from being replicated |
For example:
replicator:
filters:
- type: eventType
include: "."
exclude: "((Bad|Wrong)\w+Event)"
You can configure zero or more filters. Scavenge filter is enabled by the scavenge
setting and doesn’t need to be present in the filter
list. You can specify either include
or exclude
regular expression, or both. When both include
and exclude
regular expressions are configured, the filter will check both, so the event must match the inclusion expression and not match the exclusion expression.
Tip
You can also use transformations for advanced filtering.3.3 - Sinks
3.3.1 - EventStoreDB gRPC Sink
When replicating events to Event Store Cloud, we recommend using the EventStoreDB gRPC sink.
You need to specify two configurations options for it:
replicator.sink.protocol
- set togrpc
replicator.sink.connectionString
- use the target cluster connection string, which you’d use for the gRPC client.
For example, for an Event Store Cloud cluster the connection string would look like:
esdb+discover://<username>:<password>@<cluster_id>.mesdb.eventstore.cloud
.
Using gRPC gives you more predictable write operation time. For example, on a C4-size instance in Google Cloud Platform, one write would take 4-5 ms, and this number allows you to calculate the replication process throughput, as it doesn’t change much when the database size grows.
3.3.2 - EventStoreDB TCP Sink
The TCP sink should only be used when migrating from one older version cluster to another older version cluster. As Event Store plans to phase out the TCP client and protocol, consider using the gRPC sink instead.
For the TCP sink, you need to specify two configurations options for it:
replicator.sink.protocol
- set totcp
replicator.sink.connectionString
- use the target cluster connection string, which you’d use for the TCP client.
Check the connection string format and options in the TCP client documentation.
The risk of using the TCP sink is that you might get unstable write speed. The speed might go down when the database size grows, unlike gRPC sink write speed, which remains stable.
3.3.3 - Kafka Sink
The Kafka sink allows you to set up continuous replication from EventStoreDB to Apache Kafka. It might be useful, for example, to scale out subscriptions, as you can partition events in Kafka. Then, you can have a consumer group with concurrent consumers, which process individual partitions, instead of having a single partition on $all
.
There’s no way to specify a custom partition, so the default (random) Kafka partitioner will be used.
The Kafka sink needs to be configured in the sink
section of the Replicator configuration.
replicator.sink.protocol
- set tokafka
replicator.sink.connectionString
- Kafka connection string, which is a comma-separated list of connection optionsreplicator.sink.partitionCount
- the number of Kafka partitions in the target topicreplicator.sink.router
- optional JavaScript function to route events to topics and partitions
Example:
replicator:
reader:
connectionString: esdb+discover://admin:[email protected]
protocol: grpc
sink:
connectionString: bootstrap.servers=localhost:9092
protocol: kafka
partitionCount: 10
router: ./config/route.js
Routing
Replicator needs to route events to Kafka. In particular, it needs to know the topic, where to write events to, and the partition key. By default, the topic is the stream “category” (similar to the category projection), which is part of the event stream before the dash. For example, an event from Customer-123
stream will be routed to the Customer
topic. The stream name is used as the partition key to ensure events order within a stream.
It’s possible to customise both topic and partition key by using a routing function. You can supply a JavaScript code file, which will instruct Replicator about routing events to topics and partitions.
The code file must have a function called route
, which accepts the following parameters:
stream
- original stream nameeventType
- original event typedata
- event payload (data), only works with JSONmetadata
- event metadata, only works with JSON
The function needs to return an object with two fields:
topic
- target topicpartitionKey
- partition key
For example:
function route(stream, eventType, data, meta) {
return {
topic: "myTopic",
partitionKey: stream
}
}
The example function will tell Replicator to produce all the events to the myTopic
topic, using the stream name as partition key.
You need to specify the name of the while, which contains the route
function, in the replicator.sink.router
setting. Such a configuration is displayed in the sample configuration YAML snipped above.
3.3.4 - Sink Partitioning
Write modes
Replicator will read events from the source cluster using batched reads of 4096 (default) events per batch. As it reads from $all
, one batch will contain events for different streams. Therefore, writing events requires a single write operation per event to ensure the correct order of events written to the target cluster.
Tip
You can change the batch size by setting thereplicator.reader.pageSize
setting. The maximum value is 4096
, which is also the default value. If you have large events, we recommend changing this setting. For example, you can set it to 1024
.
If you don’t care much about events order in $all
, you can configure Replicator to use concurrent writers, which will increase performance. The tool uses concurrent writers with a configurable concurrency limit. Writes are partitioned, and the order of written events within a partition is kept intact. Read more below about different available partitioning modes.
Note
Partitioning described on this page doesn’t apply to the Kafka sink, as it uses its own routing function.Partition by stream name
Writers can be partitioned by stream name. This guarantees that events in individual streams will be in the same order as in the source cluster, but the order of $all
will be slightly off.
To enable concurrent writers partitioned by stream name, you need to change the replicator.sink.partitionCount
setting. The default value is 1
, so all the writes are sequential.
Custom partitions
You can also use a JavaScript function to use event data or metadata for partitioning writers. The function must be named partition
, it accepts a single argument, which is an object with the following schema:
{
"stream": "",
"eventType": "",
"data": {},
"metadata": {}
}
The function must return a string, which is then used as a partition key.
For example, the following function will return the Tenant
property of the event payload, to be used as the partition key:
function partition(event) {
return event.data.Tenant;
}
There are two modes for custom partitions, described below.
Partitioning by hash
As with the stream name partitioning, the custom partition key is hashed, and the hash of the key is used to decide which partition will take the event. This method allows having less partitions than there are keys.
To use this mode you need to set the partition count using the replicator.sink.partitionCount
setting, and also specify the file name of the partitioning function in the replicator.sink.partitioner
setting. For example:
replicator:
sink:
partitionCount: 10
partitioner: ./partitioner.js
Partition by value
In some cases, it’s better to assign a single partition for each partition key. Use this method only if the number of unique values for the partition key is upper bound. This strategy works well for partitioning by tenant, for example, if the number of tenants doesn’t exceed a hundred. You can also decide to go beyond this limit, but each partition uses some memory, so you need to allocate enough memory space for a high partition count. In addition, be aware of the performance concerns described in the next section. Those concerns might be less relevant though as not all the partitions will be active simultaneously if a single page doesn’t contain events for all tenants at once.
To use value-based partitioning, use the same partitioning function signature. The difference is that for each returned partition key there will be a separate partition. For example, if the function deterministically return 10 different values, there will be 10 partitions. You don’t need to configure the partition count, partitions will be dynamically created based on the number of unique keys.
The settings file, therefore, only needs the replicator.sink.partitioner
setting configured.
Partition count considerations
Do not set this setting to a very high value, as it might lead to thread starvation, or the target database overload. For example, using six to ten partitions is reasonable for a C4
Event Store Cloud managed database, but higher value might cause degraded performance.
3.4 - Additional metadata
In addition to copying the events, Replicator will also copy streams metadata. Therefore, changes in ACLs, truncations, stream deletions, setting max count and max age will all be propagated properly to the target cluster.
However, the max age won’t be working properly for events, which are going to exceed the age in the source cluster. That’s because in the target cluster all the events will appear as recently added.
To mitigate the issue, Replication will add the following metadata to all the copied events:
$originalCreatedDate
$originalEventNumber
$originalEventPosition
Note:
Replicator can only add metadata to events, which don’t have metadata, or have metadata in JSON format.3.5 - Event transformations
During the replication, you might want to transform events using some complex rules. For example, some fields need to be removed, the JSON schema should change, or some data need to be merged, split, or even enriched with external data.
Transformations allow you:
- Move events to another stream
- Change the event type
- Manipulate the event data, like changing field names and values, or even the structure
- Same, but for metadata
- [WIP] Slit one event into multiple events
For this purpose, you can use the transformation function. Find out more about available transformation options on pages listed below.
3.5.1 - JavaScript transformation
Introduction
When you need to perform simple changes in the event schema, change the stream name or event type based on the existing event details and data, you can use a JavaScript transform.
Warning:
JavaScript transforms only work with JSON payloads.For this transform, you need to supply a code snippet, written in JavaScript, which does the transformation. The code snippet must be placed in a separate file, and it cannot have any external dependencies. There’s no limitation on how complex the code is. Replicator uses the V8 engine to execute JavaScript code. Therefore, this transform normally doesn’t create a lot of overhead for the replication.
Guidelines
You can configure Replicator to use a JavaScript transformation function using the following parameters:
replicator.transform.type
- must be set tojs
replicator.transform.config
- name of the file, which contains the transformation function
For example:
replicator:
transform:
type: js
config: ./transform.js
The function itself must be named transform
. Replicator will call it with the following arguments:
Stream
- original stream nameEventType
- original event typeData
- event payload as an objectMetadata
- event metadata as an object
The function must return an object, which contains Stream
, EventType
, Data
and Metadata
fields. Both Data
and Metadata
must be valid objects, the Metadata
field can be undefined
. If you haven’t changed the event data, you can pass Data
and Metadata
arguments, which the function receives as arguments.
Logging
You can log from JavaScript code directly to Replicator logs. Use the log
object with debug
, info
, warn
and error
. You can use string interpolation as usual, or pass templated strings in Serilog format. The first parameter is the template string, plus you can pass up to five additional values, which could be values or objects.
For example:
log.info(
"Transforming event {@Data} of type {Type}",
original.data, original.eventType
);
Remember that the default log level is Information
, so debug logs won’t be shown. Enable debug-level logging by setting the REPLICATOR_DEBUG
environment variable to true
.
Example
Here is an example of a transformation function, which changes the event data, stream name, and event type:
function transform(original) {
// Log the transform calls
log.info(
"Transforming event {Type} from {Stream}",
original.EventType, original.Stream
);
// Ignore some events
if (original.Stream.length > 7) return undefined;
// Create a new event version
const newEvent = {
// Copy original data
...original.Data,
// Change an existing property value
Data1: `new${original.Data.Data1}`,
// Add a new property
NewProp: `${original.Data.Id} - ${original.Data.Data2}`
};
// Return the new proposed event with modified stream and type
return {
Stream: `transformed${original.Stream}`,
EventType: `V2.${original.EventType}`,
Data: newEvent,
Meta: original.Meta
}
}
If the function returns undefined
, the event will not be replicated, so the JavaScript transform can also be used as an advanced filter. The same happens if the transform function returns an event, but either the stream name or event type is empty or undefined
.
3.5.2 - HTTP transformation
Introduction
An HTTP transformation can be used for more complex changes in event schema or data, which is done in an external process. It allows you to use any language and stack, and also call external infrastructure to enrich events with additional data.
When configured accordingly, Replicator will call an external HTTP endpoint for each event it processes, and expect a converted event back. As event data is delivered as-is (as bytes) to the transformer, there’s no limitation of the event content type and serialisation format.
Note
Right now, events are sent to the transformer one by one. It guarantees the same order of events in the sink store, but it may be slow. We plan to enable event batching to speed up external transformations.Guidelines
Before using the HTTP transformation, you need to build and deploy the transformation function, which is accessible using an HTTP(S) endpoint. The endpoint is built and controlled by you. It must return a response with a transformed event with 200
status code, or 204
status code with no payload. When the Replicator receives a 204
back, it will not propagate the event, so it also works as an advanced filter.
The transformation configuration has two parameters:
replicator.transform.type
- should behttp
for HTTP transformreplicator.transform.config
- the HTTP(S) endpoint URL
For example:
replicator:
transform:
type: http
config: http://transform-func.myapp.svc.cluster.local
Replicator doesn’t support any authentication, so the endpoint must be open and accessible. You can host it at the same place as Replicator itself to avoid the network latency, or elsewhere. For example, your transformation service can be running in the same Kubernetes cluster, so you can provide the internal DNS name to its service. Alternatively, you can use a serverless function.
Replicator will call your endpoint using a POST
request with JSON body.
The request and response formats are the same:
{
"eventType": "string",
"streamName": "string",
"metadata": "string",
"payload": "string"
}
Your HTTP transformation can modify any of these four properties.
Both metadata
and payload
are UTF-8 encoded bytes as a string.
If you store your events in JSON format, payload
will be a JSON string that you can deserialize.
metadata
is always a JSON string.
If your endpoint returns HTTP status code 204
, the event will be ignored and wont be replicated to the sink.
Example
Here is an example of a serverless function in GCP, which transforms each part of the original event:
using System.Text.Json;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System.Threading.Tasks;
namespace HelloWorld {
public class Function : IHttpFunction {
public async Task HandleAsync(HttpContext context) {
var original = await JsonSerializer
.DeserializeAsync<HttpEvent>(context.Request.Body);
var payload = JsonSerializer
.Deserialize<TestEvent>(original.Payload);
payload.EventProperty1 = $"Transformed {payload.EventProperty1}";
var metadata = JsonSerializer
.Deserialize<Metadata>(original.Metadata);
metadata.MetadataProperty1 = $"Transformed {metadata.MetadataProperty1}";
var proposed = new HttpEvent {
StreamName = $"transformed-{original.StreamName}",
EventType = $"V2.{original.EventType}",
Metadata = JsonSerializer.Serialize(metadata),
Payload = JsonSerializer.Serialize(payload)
};
await context.Response
.WriteAsync(JsonSerializer.Serialize(proposed));
}
class HttpEvent {
public string EventType { get; set; }
public string StreamName { get; set; }
public string Metadata { get; set; }
public string Payload { get; set; }
}
class Metadata {
public string MetadataProperty1 { get; set; }
public string MetadataProperty2 { get; set; }
}
class TestEvent {
public string EventProperty1 { get; set; }
public string EventProperty2 { get; set; }
}
}
}
The TestEvent
is the original event contract, which is kept the same. However, you are free to change the event schema too, if necessary.
3.6 - Checkpoints
Replicator stores a checkpoint that represents the current position of replicated events.
This allows Replicator to resume processing after a restart, instead of starting from the beginning.
This is controlled via the checkpointAfter
setting.
Configuring checkpointAfter
The checkpointAfter
setting can be used to control the threshold number of events that must be replicated before a checkpoint is stored, like so:
replicator:
checkpoint:
checkpointAfter: 1000
By default, checkpointAfter
is configured to store a checkpoint after every 1000
events replicated.
A lower checkpointAfter
would mean the replication process has stronger data consistency/duplication guarantees, at the cost of performance.
For example, configuring checkpointAfter: 1
would result in a checkpoint being stored after every replicated event, which would achieve exactly-once processing.
This means in the event of a crash/restart, Replicator is guaranteed to not duplicate any events in the sink database, but comes at the cost of greatly reduced write performance to the sink database.
A higher checkpointAfter
would mean writes to the sink database are more performant, at the cost of data consistency/duplication guarantees.
Configuring a higher checkpointAfter
improves write performance by ensuring Replicator is not spending so much time saving checkpoints, but introduces a risk of events being duplicated during replication to the sink database in the event of a crash and restart, where a crash ocurred inside of the checkpoint window.
Configure the checkpointAfter
to align with your data consistency and performance requirements.
Checkpoint seeding
Replicator supports checkpoint seeding, which allows you to start replication from a specific event number. This is optional and the default is to not seed.
replicator:
checkpoint:
seeder:
type: none
When the type
of seeder is set to chaser
, you can seed a checkpoint store from a chaser.chk
file, like so:
replicator:
checkpoint:
seeder:
type: chaser
path: "path/to/chaser.chk"
This is useful when you want to start replication from the same event number as a backup’s chaser.chk
. It’s not recommended to use this feature unless you are sure the chaser.chk
file is immutable. This implies that the chaser.chk
file of a running EventStoreDB node should not be used.
Note that seeding will only happen if the checkpoint store has no corresponding stored checkpoint.
Checkpoint stores
Replicator supports storing checkpoints in different stores. Only one store can be configured per Replicator instance.
If you want to run the replication again, from the same source, using the same Replicator instance and settings, you need to delete the checkpoint from the store.
See the currently supported checkpoint stores below:
3.6.1 - Checkpoint file
By default, Replicator stores checkpoints in a local file.
The default configuration is:
replicator:
checkpoint:
type: file
path: ./checkpoint
checkpointAfter: 1000
The path
can be configured to store the checkpoint file at a different location, which can be useful for deployments to Kubernetes that may use a custom PVC configuration, for example.
3.6.2 - Checkpoint in MongoDB
Although in many cases the file-based checkpoint works fine, storing the checkpoint outside the Replicator deployment is more secure. For that purpose you can use the MongoDB checkpoint store. This store writes the checkpoint as a MongoDB document to the specified database. It will unconditionally use the checkpoint
collection.
Here are the minimum required settings for storing checkpoints in MongoDB:
replicator:
checkpoint:
type: mongo
path: "mongodb://mongoadmin:secret@localhost:27017"
checkpointAfter: 1000
The path
setting must contain a pre-authenticated connection string.
By default, Replicator will use the replicator
database, but can be configured to use another database, like so:
replicator:
checkpoint:
database: someOtherDatabase
If you run miltiple Replicator instances that store checkpoints in the same Mongo database, you can configure the instanceId
setting to isolate checkpoints stored by other Replicator instances.
By default, Replicator will use default
as the instance identifier under the assumption it is the only instance storing checkpoints in the database.
You may configure instanceId
like so:
replicator:
checkpoint:
instanceId: someUniqueIdentifier
4 - Known limitations
Performance
Replicator uses conventional client protocols: TCP and gRPC. We recommend using TCP for the source clusters connection (reading) and gRPC for the sink.
When copying the data, we must ensure that the order of events in the target cluster remains the same. The level of this guarantee depends on the selected write mode (single writer or partitioned concurrent writers), but events are still written one by one, as that’s the write mode supported by all the clients.
These factors impact the overall write performance. Considering the normal latency of a write operation via GRPC (3-6 ms, depending on the cluster instance size and the cloud provider), a single writer can only write 150-300 events per second. Event size, unless it’s very big, doesn’t play much of a role for the latency figure. Partitioned writes, running concurrently, can effectively reach the speed of more than 1000 events per second. Using more than six concurrent writers would not increase the performance as the bottleneck will shift to the server.
Based on the mentioned figures, we can expect to replicate around one million events per hour with a single writer, and 3.5 million events per hour when using concurrent writers. Therefore, the tool mainly aims to help customers with small to medium size databases. Replicating a multi-terabyte database with billions of events would probably never work as it won’t be able to catch up with frequent writes to the source cluster.
Therefore, an important indicator that replication will complete is observing the replication gap metric provided by the tool and ensure the gap is lowering. If the gap stays constant or is increasing, then the tool is not suitable for your database.
Created date
The system property, which holds the timestamp when the event was physically written to the database, won’t be propagated to the target cluster as it’s impossible to set this value using a conventional client. To mitigate this issue, Replicator will add a metadata field $originalCreatedDate
, which will contain the original event creation date.
Note
Replicator can only add metadata to events, which don’t have metadata, or have metadata in JSON format.Max age stream metadata
Replicator will copy all of the stream metadata. However the max age set on a stream will not be set as expected. because all the events in the target cluster will be assigned a new date. The $originalCreatedDate metadata field might help to mitigate this issue.
Replication of emitted streams
By default, the Replicator replicates all emitted streams, which can lead to unintended consequences, including disruptions in target cluster projections. To resolve this:
-
Apply Filters: Use filters to specify which streams should and should not be replicated. Properly configured filters enable selective control over the replication of emitted streams, ensuring only necessary data is transferred between clusters or instances.
-
Delete and Restart: If necessary, delete the emitted streams and restart the projection. Enabling the
track emitted events
option allows for resetting the projection, triggering the re-processing and rewriting of all emitted stream events.
5 - Configuration
Replicator uses a configuration file in YAML format. The file must be called appsettings.yaml
and located in the config
subdirectory, relative to the tool working directory.
The settings file has the replicator
root level, all settings are children to that root. It allows using the same format for the values override file when using Helm.
Available configuration options are:
Option | Description |
---|---|
replicator.reader.connectionString |
Connection string for the source cluster or instance |
replicator.reader.protocol |
Reader protocol (tcp or grpc ) |
replicator.reader.pageSize |
Reader page size (only applicable for TCP protocol |
replicator.sink.connectionString |
Connection string for the target cluster or instance |
replicator.sink.protocol |
Writer protocol (tcp or grpc ) |
replicator.sink.partitionCount |
Number of partitioned concurrent writers |
replicator.sink.partitioner |
Custom JavaScript partitioner |
replicator.sink.bufferSize |
Size of the sink buffer, 1000 events by default |
replicator.scavenge |
Enable real-time scavenge |
replicator.runContinuously |
Set to false if you want Replicator to stop when it reaches the end of $all stream. Default is true , so the replication continues until you stop it explicitly. |
replicator.filters |
Add one or more of provided filters |
replicator.transform |
Configure the event transformation |
replicator.transform.bufferSize |
Size of the prepare buffer (filtering and transformations), 1000 events by default |
replicator.checkpoint.type |
Type of checkpoint store (file or mongo ), file by default |
replicator.checkpoint.path |
The file path or connection string, ./checkpoint by default |
replicator.checkpoint.checkpointAfter |
The number of events that must be replicated before a checkpoint is stored, 1000 events by default |
replicator.checkpoint.database |
The name of the Mongo database, replicator by default |
replicator.checkpoint.instanceId |
The name of the replicator instance to isolate checkpoints with in the Mongo database, default by default |
replicator.checkpoint.seeder.type |
Type of checkpoint seeder to use (none or chaser ), none by default |
replicator.checkpoint.seeder.path |
The file path of the chaser.chk , empty by default |
replicator.restartDelayInSeconds |
The number of seconds between replication restarts, 5 by default |
replicator.reportMetricsFrequencyInSeconds |
The frequency at which to report certain metrics expressed in seconds, 5 by default |
Enable verbose logging
You can enable debug-level logging by setting the REPLICATOR_DEBUG
environment variable to any value.
Example configuration
The following example configuration will instruct Replicator to read all the events from a local cluster with three nodes (es1.acme.org
, es2.acme.org
and es3.acme.org
) using TCP protocol, and copy them over to the Event Store Cloud cluster with cluster ID c2etr1lo9aeu6ojco781
using gRPC protocol. Replicator will also call an HTTP transformation function at https://my.acme.org/transform
.
The global order of events will be the same, as partitionCount
is set to one.
Scavenge filter is disabled, so Replicator will also copy deleted events, which haven’t been scavenged by the server yet.
replicator:
reader:
protocol: tcp
connectionString: "GossipSeeds=es1.acme.org:2113,es2.acme.org:2113,es3.acme.org:2113; HeartBeatTimeout=500; DefaultUserCredentials=admin:changeit; UseSslConnection=false;"
pageSize: 2048
sink:
protocol: grpc
connectionString: "esdb://admin:[email protected]:2113"
partitionCount: 1
transform:
type: http
config: https://my.acme.org/transform
scavenge: false
filters: []
checkpoint:
path: "./checkpoint"
6 - Deployment
Replicator is optimised to run in Kubernetes, where you can deploy it using a Helm chart we provide. However, you can also run it on a virtual machine using Docker Compose.
6.1 - Run Replicator in Docker
You can run Replicator using Docker Compose, on any machine, which has Docker installed.
We prepared a complete set of files for this scenario. You find those files in the Replicator repository.
The Compose file includes the following components:
- Replicator itself
- Prometheus, pre-configured to scrape Replicator metrics endpoint
- Grafana, pre-configured to use Prometheus, with the Replicator dashboard included
Configuration
Before spinning up this setup, you need to change the replicator.yml
file. Find out about Replicator settings on the Configuration page. We included a sample configuration file to the repository.
The sample configuration enables verbose logging using the REPLICATOR_DEBUG
environment variable. For production deployments, you should remove it from the configuration.
Monitoring
When you start all the component using docker-compose up
, you’d be able to check the Replicator web UI by visiting http://localhost:5000, as well as Grafana at http://localhost:3000. Use admin
/admin
default credentials for Grafana. The Replicator dashboard is included in the deployment, so you can find it in the dashboards list.
6.2 - Kubernetes
You can run Replicator in a Kubernetes cluster in the same cloud as your managed EventStoreDB cloud cluster. The Kubernetes cluster workloads must be able to reach the managed EventStoreDB cluster. Usually, with a proper VPC (or VN) peering between your VPC and Event Store Cloud network, it works without issues.
We provide guidelines about connecting managed Kubernetes clusters:
6.2.1 - Helm
The easiest way to deploy Replicator to Kubernetes is by using a provided Helm chart. On this page, you find detailed instructions for using the Replicator Helm chart.
Add Helm repository
Ensure you have Helm 3 installed on your machine:
$ helm version
version.BuildInfo{Version:"v3.5.2", GitCommit:"167aac70832d3a384f65f9745335e9fb40169dc2", GitTreeState:"dirty", GoVersion:"go1.15.7"}
If you don’t have Helm, following their installation guide.
Add the Replicator repository:
$ helm repo add es-replicator https://eventstore.github.io/replicator
$ helm repo update
Provide configuration
Configure the Replicator options using a new values.yml
file:
replicator:
reader:
connectionString: "GossipSeeds=node1.esdb.local:2113,node2.esdb.local:2113,node3.esdb.local:2113; HeartBeatTimeout=500; UseSslConnection=False; DefaultUserCredentials=admin:changeit;"
sink:
connectionString: "esdb://admin:changeit@[cloudclusterid].mesdb.eventstore.cloud:2113"
partitionCount: 6
filters:
- type: eventType
include: "."
exclude: "((Bad|Wrong)\w+Event)"
transform:
type: http
config: "http://transform.somenamespace.svc:5000"
prometheus:
metrics: true
operator: true
Available options are:
Option | Description | Default |
---|---|---|
replicator.reader.connectionString |
Connection string for the source cluster or instance | nil |
replicator.reader.protocol |
Reader protocol | tcp |
replicator.reader.pageSize |
Reader page size (only applicable for TCP protocol | 4096 |
replicator.sink.connectionString |
Connection string for the target cluster or instance | nil |
replicator.sink.protocol |
Writer protocol | grpc |
replicator.sink.partitionCount |
Number of partitioned concurrent writers | 1 |
replicator.sink.partitioner |
Custom JavaScript partitioner | null |
replicator.sink.bufferSize |
Size of the sink buffer, in events | 1000 |
replicator.scavenge |
Enable real-time scavenge | true |
replicator.runContinuously |
Set to false if you want Replicator to stop when it reaches the end of $all stream. |
true |
replicator.filters |
Add one or more of provided filters | [] |
replicator.transform |
Configure the event transformation | |
replicator.transform.bufferSize |
Size of the prepare buffer (filtering and transformations), in events | 1000 |
prometheus.metrics |
Enable annotations for Prometheus | false |
prometheus.operator |
Create PodMonitor custom resource for Prometheus Operator |
false |
resources.requests.cpu |
CPU request | 250m |
resources.requests.memory |
Memory request | 512Mi |
resources.limits.cpu |
CPU limit | 1 |
resources.limits.memory |
Memory limit | 1Gi |
pvc.storageClass |
Persistent volume storage class name | null |
terminationGracePeriodSeconds |
Timeout for the workload graceful shutdown, it must be long enough for the sink buffer to flush | 300 |
jsConfigMaps |
List of existing config maps to be used as JS code files (for JS transform, for example) | {} |
Note:
- As Replicator uses 20.10 TCP client, you have to specify
UseSsl=false
in the connection string when connecting to an insecure cluster or instance. - Only increase the partitions count if you don’t care about the
$all
stream order (regular streams will be in order anyway)
You should at least provide both connection strings and ensure that workloads in your Kubernetes cluster can reach both the source and the target EventStoreDB clusters or instances.
Configuring a JavaScript transform
Follow the documentation to configure a JavaScript transform in your values.yml
file.
Then append the following option to your helm install
command:
--set-file transformJs=./transform.js
Configuring a custom partitioner
Follow the documentation to configure a custom partitioner in your values.yml
file.
Then append the following option to your helm install
command:
--set-file partitionerJs=./partitioner.js
Complete the deployment
When you have the values.yml
file complete, deploy the release using Helm. Remember to set the current kubectl
context to the cluster where you are deploying to.
helm install es-replicator \
es-replicator/es-replicator \
--values values.yml \
--namespace es-replicator
You can choose another namespace, the namespace must exist before doing a deployment.
The replication starts immediately after the deployment, assuming that all the connection strings are correct, and the Replicator workload has network access to both source and sink EventStoreDB instances.
$all
stream and will produce duplicate events.
6.2.2 - Monitoring
When the deployment finishes, you should be able to connect to the Replicator service by using port forwarding:
$ kubectl port-forward -n es-replicator svc/es-replicator 5000
The Replicator web interface should be then accessible via http://localhost:5000. The UI will display the replication progress, source read and target write positions, number of events written, and the replication gap. Note that the write rate is shown for the single writer. When you use concurrent writers, the speed will be higher than shown.
Prometheus
If you have Prometheus in your Kubernetes cluster, we recommend enabling prometheus.metrics
option. If the prometheus.operator
option is set to false
, the deployment will be annotated with prometheus.io/scrape
.
If you have Prometheus managed by Prometheus Operator, the scrape annotation won’t work. You can set both prometheus.metrics
and prometheus.operator
options to true
, so the Helm release will include the PodMonitor
custom resource. Make sure that your Prometheus
custom resource is properly configured with regard to podMonitorNamespaceSelector
and podMonitorSelector
, so it will not ignore the Replicator pod monitor.
Grafana
The best way to monitor the replication progress is using Prometheus and Grafana. If the pod is being properly scraped for metrics, you would be able to use the Grafana dashboard, which you can create by import it from JSON file.