Kyte's Journey of Temporal Workflows

Natalya Reznik Nov 22, 2022

Kyte is a new generation transportation company. We bring you a car just when you need it, for as long as you want, thus unlocking your freedom to go places without having to wait in lines or waste your time waiting at the rental counter.

At Kyte, we perform numerous operations offline. Once a user books a car via the Kyte website or mobile app, a number of tasks need to be completed before, during, and after the rental starts. An example is multiple reminders and notifications sent at predefined times or at a predefined cadence. Historically, multiple ECS cron jobs have been added for the purpose of doing some asynchronous processing in a batch-like fashion. Kyte Engineering has found that over time this approach presented various problems related to tracking, scaling, and maintaining the flows. As our load volume kept increasing, the problems above were becoming more and more pressing, so we decided to start investigating workflow as an architectural solution.

Problems

The sequential batch processing approach does not scale. Each booking’s processing is delayed by the processing of all bookings ahead of it. If the task is time sensitive, at the time it’s executed it may no longer be relevant or may delay some other critical tasks it triggers. Additionally, the ability to retry a failed task is limited, because each retry will further delay processing of other tasks. The logic to process tasks for multiple bookings would be:

    • give me all bookings that satisfy certain criteria;

    • loop through the bookings and perform the specified task on each.

Diagram 1
Before workflows.

Parallelizing the processing may solve the scalability issue above, but comes with other issues:

    • design and maintain parallelization;

    • keep track of retries;

    • persist data and state across restarts;

    • recover from individual failures.

Solution

A widely accepted architectural industry standard intended to solve the stated problems is a workflow engine. A workflow engine takes care of retries, statefulness, and scalability while allowing the developer to focus on the business logic. Additionally, in a workflow paradigm we can implement a workflow of a certain type and launch one instance of it per entity (in our domain, this can be a rental booking, a trip leg, a subscription, etc.).

Diagram 2
After workflows.

The setup above allows us to

    • configure a retry policy executed on each entity (e.g. booking, user, trip leg) where the retries are fully handled by the workflow engine;

    • design a process as a set of atomic actions (activities) that always only run once on success;

    • not worry about the state of the flow across restarts;

    • easily track processing of a specific entity via the workflow UI:

Diagram 3
Workflows chart.

Why Temporal?

We evaluated a number of existing workflow engines based on a set of weighted criteria, such as availability and maturity of Python SDK (as we are currently mostly a Python shop), documentation availability and completeness and server setup and maintenance cost. Temporal won as an open source yet stable and extensively used product with a highly involved and expanding community, great level of support, and a Python SDK in development nearing GA.

Unlike some other workflows out there (e.g. Airflow), Temporal is not designed as a data processing engine. A Temporal workflow is not a pre-defined pipeline or a DAG. A Temporal workflow is a function written in a programming language of choice and can be event driven (i.e. can listen to signals), which is extremely important for Kyte’s business case where a different course of action or a different code path need to be taken depending on the real-time, real-world situation.

Kyte Setup

We started with a self-hosted Temporal cluster as a proof of concept. However, having limited infrastructure development resources, we decided that maintaining our own EKS cluster (including deployment, upgrading, scaling, user authorization) was not a task we could afford spending enough time on and we’d become more efficient by delegating that work to Temporal, so we soon moved to a managed Temporal, hosted by temporal.io.

We (Kyte) deploy and manage our Temporal workers (client processes running the workflows) while Temporal fully manages and maintains the server user authentication and authorization, server versioning, scaling and tuning, and environments.

Diagram 4
Kyte's Temporal setup.

Monitoring

The Temporal SDK emits a number of useful metrics via Prometheus by default. We initialize the metrics on startup as follows:

prometheus_http_port: str = os.getenv("TEMPORAL_PROMETHEUS_HTTP_PORT") or "18081" _init_metrics("0.0.0.0:" + prometheus_http_port)

We collect the metrics in DataDog by scraping Prometheus endpoints. The metrics are used to create dashboards as well as set up monitors and alerts. We currently have the following types of monitors:

    • workflow heartbeat monitor (based on the ,[object Object], metric);

    • workflow activity latency is too high (based on the ,[object Object], metric);

    • workflow failed (based on the ,[object Object],).

Local Testing

When making changes it is useful to quickly test the orchestration part of the workflow without the overhead of connecting to Temporal. For local testing we are using Temporalite, a distribution of Temporal running in a single process with no run-time dependencies.

Our docker-compose-temporal.yml contains the following:

version: "3.5" services: temporalite: container_name: temporal build: https://github.com/temporalio/temporalite.git#main networks: - kyte ports: - 8233:8233 - 7233:7233 networks: kyte: name: kyte_default driver: bridge

Pitfalls

Worker Scalability

As we started scaling out, we noticed that our Workers could not handle the load. The workflow in question was designed to estimate and flag the risk of delivering or picking up the car on time, so a workflow per a rental trip leg was spawned (most rental trips have at least two legs, delivery and return). That summed up to about a thousand workflows per day each running a few activities in parallel. Despite not a very high load, some workflows seemed “stuck” while waiting to get executed. Adding resources to Worker processes did not seem to solve the problem.

A discussion with Temporal engineers revealed a configuration issue. A Worker object created via the Temporal API is configured to handle a certain number of parallel activities (in the beta version of the Python SDK the default is low). With a large number of workflows each starting a large number of activities simultaneously, Workers reach the limit and stop asking the server for work. Increasing the max_concurrent_activities setting when creating Workers solved the issue without adding computing resources.

Long-running Activities

Because a car could be rented for days, weeks, or months, some activities mapped to bookings will also be long-running as they run for the period of the booking. An example is an activity continuously checking for the status and location of the car.

If a worker goes away in the middle of executing an activity, there are two ways the server will detect that and hand the activity to another worker:

    • when the start_to_close_timeout is exceeded (i.e. when the activity takes longer than supposed to);

    • when the heartbeat_timeout is exceeded (i.e. when the Worker stops checking in with the server using the heartbeat API).

Because we do a full deployment on every code change, our Workers are frequently brought down. In the scenario where a Worker started executing an activity and a subsequent deployment replaced the Worker, without a heartbeat, the Temporal server would wait for the start_to_close_timeout (could be days) to fail and re-run the activity. Adding heartbeats to our activities in combination with heartbeat timeouts resolved another issue of seemingly “stuck” workflows.

Non-determinism and Backward Compatibility

A workflow’s code must be fully deterministic. E.g. if a Worker dies while executing a workflow and a new Worker picks up the workflow, the workflow must be replayed to the point where it previously stopped. In other words, it must do the same exact thing on every execution. If the workflow code is changed between deployments, the changes may lead to non-determinism errors. There could also be code in the workflow itself introducing non-determinism (e.g. calling datetime()).

The right way to deal with this is to use the Temporal versioning API for any non-backward-compatible changes and to always check the workflow code for any non-determinism issues before deploying. It is possible to test for non-determinism using the Temporal workflow Replayer API.

However, if you already have hundreds or thousands active workflows in deployment failing with non-determinism errors, what do you do? One way to mitigate that is to reset all the workflows in a batch fashion. Our workflows are designed to be fully idempotent. An example is the deposit charge and refund workflow. If a workflow charges the deposit for a booking and is restarted, it will detect that the deposit has already been charged and will not charge again.

We use a tctl (Temporal server command line tool) script to start workflows anew.

workflows=$(tctl --tls_cert_path $temporal_api_cert_path --tls_key_path $temporal_api_key_path --address $temporal_server:$temporal_port --namespace $temporal_namespace wf listall --workflow_type $workflow --open) workflow_list=$(echo ${workflows} | awk -F " *" 'NR>1 { print $4}') echo $workflow_list > workflowlist tctl --tls_cert_path temporalsdk.pem --tls_key_path temporalsdk.key --address $temporal_server:$temporal_port --namespace $temporal_namespace workflow reset-batch --input_file workflowlist --reset_type FirstWorkflowTask --reason "Ensure backward compatibility"

The good news is that the Temporal engineering and support teams are extremely helpful and responsive and as we keep rolling out Temporal usage to new use cases, we gain more Temporal knowledge so the experience is smoother with each new workflow.

Natalya Reznik is a Software Engineer at Kyte.