This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Event transformations

Transform the event schema, add or remove the data, enrich, or filter out events using complex rules.

During the replication, you might want to transform events using some complex rules. For example, some fields need to be removed, the JSON schema should change, or some data need to be merged, split, or even enriched with external data.

Transformations allow you:

  • Move events to another stream
  • Change the event type
  • Manipulate the event data, like changing field names and values, or even the structure
  • Same, but for metadata
  • [WIP] Slit one event into multiple events

For this purpose, you can use the transformation function. Find out more about available transformation options on pages listed below.

1 - JavaScript transformation

Transform and filter events using in-proc JavaScript function

Introduction

When you need to perform simple changes in the event schema, change the stream name or event type based on the existing event details and data, you can use a JavaScript transform.

For this transform, you need to supply a code snippet, written in JavaScript, which does the transformation. The code snippet must be placed in a separate file, and it cannot have any external dependencies. There’s no limitation on how complex the code is. Replicator uses the V8 engine to execute JavaScript code. Therefore, this transform normally doesn’t create a lot of overhead for the replication.

Guidelines

You can configure Replicator to use a JavaScript transformation function using the following parameters:

  • replicator.transform.type - must be set to js
  • replicator.transform.config - name of the file, which contains the transformation function

For example:

replicator:
  transform:
    type: js
    config: ./transform.js

The function itself must be named transform. Replicator will call it with the following arguments:

  • Stream - original stream name
  • EventType - original event type
  • Data - event payload as an object
  • Metadata - event metadata as an object

The function must return an object, which contains Stream, EventType, Data and Metadata fields. Both Data and Metadata must be valid objects, the Metadata field can be undefined. If you haven’t changed the event data, you can pass Data and Metadata arguments, which the function receives as arguments.

Logging

You can log from JavaScript code directly to Replicator logs. Use the log object with debug, info, warn and error. You can use string interpolation as usual, or pass templated strings in Serilog format. The first parameter is the template string, plus you can pass up to five additional values, which could be values or objects.

For example:

log.info(
    "Transforming event {@Data} of type {Type}", 
    original.data, original.eventType
);

Remember that the default log level is Information, so debug logs won’t be shown. Enable debug-level logging by setting the REPLICATOR_DEBUG environment variable to true.

Example

Here is an example of a transformation function, which changes the event data, stream name, and event type:

function transform(original) {
    // Log the transform calls
    log.info(
        "Transforming event {Type} from {Stream}", 
        original.EventType, original.Stream
    );

    // Ignore some events
    if (original.Stream.length > 7) return undefined;

    // Create a new event version
    const newEvent = {
        // Copy original data
        ...original.Data,
        // Change an existing property value 
        Data1: `new${original.Data.Data1}`,
        // Add a new property
        NewProp: `${original.Data.Id} - ${original.Data.Data2}`
    };
    
    // Return the new proposed event with modified stream and type
    return {
        Stream: `transformed${original.Stream}`,
        EventType: `V2.${original.EventType}`,
        Data: newEvent,
        Meta: original.Meta
    }
}

If the function returns undefined, the event will not be replicated, so the JavaScript transform can also be used as an advanced filter. The same happens if the transform function returns an event, but either the stream name or event type is empty or undefined.

2 - HTTP transformation

Transform and filter events using out-of-process HTTP server

Introduction

An HTTP transformation can be used for more complex changes in event schema or data, which is done in an external process. It allows you to use any language and stack, and also call external infrastructure to enrich events with additional data.

When configured accordingly, Replicator will call an external HTTP endpoint for each event it processes, and expect a converted event back. As event data is delivered as-is (as bytes) to the transformer, there’s no limitation of the event content type and serialisation format.

Guidelines

Before using the HTTP transformation, you need to build and deploy the transformation function, which is accessible using an HTTP(S) endpoint. The endpoint is built and controlled by you. It must return a response with a transformed event with 200 status code, or 204 status code with no payload. When the Replicator receives a 204 back, it will not propagate the event, so it also works as an advanced filter.

The transformation configuration has two parameters:

  • replicator.transform.type - should be http for HTTP transform
  • replicator.transform.config - the HTTP(S) endpoint URL

For example:

replicator:
  transform:
    type: http
    config: http://transform-func.myapp.svc.cluster.local

Replicator doesn’t support any authentication, so the endpoint must be open and accessible. You can host it at the same place as Replicator itself to avoid the network latency, or elsewhere. For example, your transformation service can be running in the same Kubernetes cluster, so you can provide the internal DNS name to its service. Alternatively, you can use a serverless function.

Replicator will call your endpoint using a POST request with JSON body.

The request and response formats are the same:

{
    "eventType": "string",
    "streamName": "string",
    "metadata": "string",
    "payload": "string"
}

Your HTTP transformation can modify any of these four properties.

Both metadata and payload are UTF-8 encoded bytes as a string.

If you store your events in JSON format, payload will be a JSON string that you can deserialize.

metadata is always a JSON string.

If your endpoint returns HTTP status code 204, the event will be ignored and wont be replicated to the sink.

Example

Here is an example of a serverless function in GCP, which transforms each part of the original event:

using System.Text.Json;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System.Threading.Tasks;

namespace HelloWorld {
    public class Function : IHttpFunction {
        public async Task HandleAsync(HttpContext context) {
            var original = await JsonSerializer
                .DeserializeAsync<HttpEvent>(context.Request.Body);

            var payload = JsonSerializer
                .Deserialize<TestEvent>(original.Payload);
            payload.EventProperty1 = $"Transformed {payload.EventProperty1}";

            var metadata = JsonSerializer
                .Deserialize<Metadata>(original.Metadata);
            metadata.MetadataProperty1 = $"Transformed {metadata.MetadataProperty1}";

            var proposed = new HttpEvent {
                StreamName = $"transformed-{original.StreamName}",
                EventType  = $"V2.{original.EventType}",
                Metadata   = JsonSerializer.Serialize(metadata),
                Payload    = JsonSerializer.Serialize(payload)
            };

            await context.Response
                .WriteAsync(JsonSerializer.Serialize(proposed));
        }

        class HttpEvent {
            public string EventType  { get; set; }
            public string StreamName { get; set; }
            public string Metadata   { get; set; }
            public string Payload    { get; set; }
        }

        class Metadata {
            public string MetadataProperty1 { get; set; }
            public string MetadataProperty2 { get; set; }
        }

        class TestEvent {
            public string EventProperty1 { get; set; }
            public string EventProperty2 { get; set; }
        }
    }
}

The TestEvent is the original event contract, which is kept the same. However, you are free to change the event schema too, if necessary.