Skip to content

Simply takes the input from the transformer and pushes it to the database (and creates the table if it does not already exist)

Notifications You must be signed in to change notification settings

flowcore-io/postgres-db-transformer

Repository files navigation

Typescript Postgres Insertion Transformer

All this transformer does is look at the incoming data and insert it into a postgres database. You can fine tune which data you are interested in by providing the environment variables demonstrated in the .env.example file.

Schema Structure

When you specify a schema, the transformer will create a table with the specified schema, as well as attempt to fill that table with the appropriate values from the input.

The currently supported schema values are as follows:

{
  "userId": {
    "type": "string",
    "mapFrom": "id"
  },
  "user": {
    "type": "jsonb"
  },
  "submissionId": {
    "type": "string"
  }
}

The initial value (e.g. userId), is the db column name. The sub objects relates to properties associated with that column.

BINARY 64 ENCODING: When you set the TABLE_SCHEMA_BASE64 environment variable, you need to remember to encode the schema to base64. This is because the transformer will decode the schema before using it.

type

The type of the column. This can be any of the following:

  • text
  • string
  • integer
  • decimal
  • boolean
  • json
  • jsonb
  • uuid
  • timestamp
  • binary
  • bigInteger
  • float
  • double
  • increments
  • time
  • date
  • dateTime

Failure to provide one of these types, results in the column being ignored during creation.

WHEN NO SCHEMA IS PROVIDED: If you do not specify a schema, then the transformer will do its best to generate one for you based on the input data. Keep in mind, that the auto generated schema is not at all optimised for performance, and should only be used during testing.

mapFrom

Will map the value from the input to the specified column. For example:

{
  "userId": {
    "type": "string",
    "mapFrom": "id"
  }
}

assumes that the input has a property id, and will insert that value into the userId column.

WHEN mapfrom IS NOT FOUND ON THE INPUT: If the mapFrom value is not found on the input, then the transformer will assume that the value is not present, and ignore it.


Template boilerplate code:

Entrypoint

The main.ts file is the entrypoint for the transformer. It contains the following endpoints:

GET /health

This endpoint is used by the transformer shell to check if the transformer is healthy.

POST /transform

This endpoint is used by the transformer shell to transform data.

📝 Quick start

📑 Generate your own repository from this template

Create a new repo from the template


🚀 Deployment

We try to simplify your development experience, by including a pipeline that will automatically test, build and push the release artifact to the GitHub release. However, if you wish to modify the GitHub Action workflow, or use your own, you are completely free to do so.

🔧 Preparing our pre-made Pipeline

Before you can utilise our pre-built pipeline, you need to authenticate yourself with it. This is done by:

  1. generating a personal access token with the permission of creating artifacts.
  2. adding a new repository secret to your Settings > Secrets and variables > Actions, with the name of RELEASE_GITHUB_TOKEN, and the value of your newly created personal token

You can follow our step-by-step video guide, if you are not accustomed to this process:

Automating the release process for your Transformer with GitHub Actions

1️⃣.2️⃣.0️⃣ Release Artifact Versioning

We utilise Conventional Commits to automatically manage the versioning of your released artifacts. In short:

  • prefixing your commits with feat: will trigger a minor version (1.2.0)
  • prefixing your commits with fix: will trigger a patch version (1.0.3)

As an example: feat: splitting email into username and domain, or fix: email now splits correctly by @ symbol

If you need anything more granular, then you can refer to the link above.

🔄 Release Process

If using the correct conventional commit syntax, then the release process is as follows:

Anything that gets pushed to the main branch, will trigger a pull request; that runs tests to validate the release. Once the pull request has been merged the release will be published, together with the artifact.

💻 Development

We have 2 recommended methods of working on your transformer.

  1. Develop it directly from GitHub
  2. Run it locally

Your choice may vary depending on your development environment, preference and/or complexity.

Option 1: Develop directly in GitHub (Simple)

You can look at how you start the editor directly in your browser here.

Make your changes and push them to your repository. Bear in mind that you need to use conventional commits to be able to release your transformer. You can read more about it under Deployment.

Option B: Develop locally (Advanced)

1. ⚙️ Installation️

Prerequisites:

  • NodeJS
  • Docker

And you must clone the project on to your machine

install dependencies:

yarn install && yarn build

run the transformer shell

docker-compose -f test/docker/docker-compose.yaml up -d

Create a .env file with the following content:

HOST_ADDRESS=host.docker.internal

2. 💻 Development

To start developing with watch mode run:

yarn build:watch

In another terminal or tab, run:

yarn test:watch

to run the tests on the built transformer.

When changes are made any of the files the transformer will be reloaded and the tests will be run again.

Note: The dist directory needs to be writable by the transformer shell.

🔎 Development Overview

Change the transformer

To change the transformer, edit the transform.entrypoint.ts file. To add functionality on startup edit the start.entrypoint.ts file. To add additional health checks edit the health.entrypoint.ts file.

Change the input and output data

To change the validation of inputs and outputs edit the test/expected.json file. This file specify the event payloads that are sent to the transformer and the expected output. The :uuid: and :date: values for the expected outcome matches to any string.

Further customization

Change the test/app.spec.ts file to add additional tests and more advanced validation. Further change the files in the src directory to add more advanced logic.

🌕 Utilise your Transformer in Flowcore

To use this transformer in the Flowcore platform, create a new adapter and point it to the github release artifact.

The shell will then download the artifact, run it and for each data point post to the transform endpoint.

Migrate from v1 to v2

The v2 of the transformer shell work differently from v1. The main difference is that the v2 shell loads from a config file rather than through en endpoint. This means that the transformer shell needs to be updated to use the new config file.

1. Update the transformer shell docker compose file

change the transformer service in the docker-compose.yaml file to use the new image.

version: "3.8"

services:
  shell:
    container_name: transformer-shell
    image: flowcoreio/adapter-nodejs-transformer-shell:2.1.0
    ports:
      - "3001:3001"
      - "10000:10000"
    environment:
      LOG_LEVEL: debug
      LOG_PRETTY_PRINT: "true"
      TRANSFORMERS: node
      PORT: 3001
      TRANSFORMER_DEV_MODE: "true"
    volumes:
      - ./../../dist:/app/transformers/test-transformer
      - ./../config:/usr/src/app/transformer

add a new transformer.json configuration file at test/config/transformer.json with the following content:

{
  "name": "test-transformer",
  "version": "1.0.0",
  "runtime": "node",
  "artifactUrl": "/app/transformers/test-transformer",
  "entrypoint": "main.js",
  "startTimeTimeout": 10000,
  "port": 10000
}

then finally update the app.spec.ts file to use this new setup.

  1. remove the TRANSFORMER_BLUEPRINT const
const TRANSFORMER_BLUEPRINT: TransformerBlueprint = JSON.parse(fs.readFileSync(path.join(process.cwd(), "test/config", "transformer.json"), "utf-8"));
  1. change the check to use the shell health endpoint in beforeAll

add adapter port const

const adapterPort = 3001;

replace the try catch block that loads the transformer with the following code:

await waitForExpect(async () => {
  console.debug(`Checking if transformer is loaded on http://localhost:${adapterPort}/health`);
  const axiosResponse = await axios.get(
    `http://localhost:${adapterPort}/health`,
  );

  if (axiosResponse.status !== 200) {
    console.debug(`Transformer not loaded on http://localhost:${adapterPort}/health`, axiosResponse.data);
  }

  expect(axiosResponse.status).toEqual(200);
}, 10000, 1000);
  1. remove all references to processId

remove the processId let variable and the processId from the axios.post call. This is the simplified version of the axios.post call:

const processedResult = await axios.post(
  "http://localhost:3001/transform",
  data,
);
  1. and finally remove the unload transformer call in afterAll, it should just look like this:
afterAll(async () => {
  server.close();
});

About

Simply takes the input from the transformer and pushes it to the database (and creates the table if it does not already exist)

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •