Deploying to Kubernetes
Kubernetes is a first class deployment target for Malstrom jobs. Deploying to Kubernetes is best done using the Malstrom Kubernetes operator.
Installing the Operator
The operator can be installed via helm.
First add the Malstrom Helm repository:
helm repo add malstrom https://malstrom.io/helm
Fetch the repository's helm charts:
helm repo update malstrom
Install the operator (see below for supported config values):
helm install malstrom-operator malstrom/malstrom-operator -f values.yaml
Using the Kubernetes Job Runtime
The runtime for executing Malstrom jobs on Kubernetes is supplied via the malstrom-k8s crate. You can use the runtime just like you would for example use the MultiThreadRuntime
when running locally. In fact, we recommend letting your program utilize both, depending on its environment. This makes development simpler, as you can easily execute the job on your own machine.
See how this example program uses the MultiThreadRuntime
when run normally, but will use the KubernetesRuntime
when the IS_K8S
env var is set to "true":
//! A very basic program which can run locally or on Kubernetes depending on
use malstrom::operators::*;
use malstrom::runtime::MultiThreadRuntime;
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;
use malstrom_k8s::KubernetesRuntime;
fn main() {
let run_k8s = std::env::var("IS_K8S") == Ok("true".to_string());
if run_k8s {
KubernetesRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow)
.execute_auto()
.unwrap()
} else {
MultiThreadRuntime::builder()
.persistence(NoPersistence)
.parrallelism(2)
.build(build_dataflow)
.execute()
.unwrap()
}
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
provider
.new_stream()
.source(
"iter-source",
StatelessSource::new(SingleIteratorSource::new(0..=100)),
)
.map("double", |x| x * 2)
.inspect("print", |x, _| println!("{}", x.value));
}
Deploying an Example job
For production use cases we recommend building an image which includes your jobs compiled binary. For testing you can also enable the operator's "artifact-manager" which allows you to upload jobs to the operator directly, which is what we will do for the sake of this example.
By default the "artifact-manager" is disabled. It can be enabled via the values.yaml
when installing with helm:
artifactManager:
deploy: true
Next we will compile our program: cargo build --release
By default the compiled binary is placed in the target
folder under target/release/<app-name>
.
Next we will need a connection to the "artifact-manager" running on Kubernetes. We will use kubectl to port-forward from the Kubernetes cluster to a port on out local machine. This command makes the "artifact-manager" available under localhost:29918
:
kubectl port-forward svc/artifact-manager 29918:80
Next we upload our compiled binary, making it available under the name "foobar":
curl -F \
"file=@./target/release/<app-name>" \
"localhost:29918/foobar"
Note that you might have to cross-compile to the architecture your clusters nodes are running on.
To actully run the job we will create a Kubernetes resource of the type MalstromJob
:
apiVersion: malstrom.io/v1alpha
kind: MalstromJob
metadata:
name: test-job
spec:
replicas: 2
jobState: Running
binary:
source: http://artifact-manager.default.svc.cluster.local/foobar
destination: /artifact/executable
podSpecTemplate:
containers:
- name: main
image: alpine:3.21
env:
- name: IS_K8S
value: "true"
initContainers:
- name: artifact-downloader
image: ghcr.io/malstromdevelopers/artifact-downloader:latest
When applying this manifest with kubectl apply -f path/to/manifest.yaml
we create and start a Malstrom job with a parallelism of 2
. You should now see the running pods.
Scaling a job
A job can be rescaled easily, simply change spec.replicas
in the Malstromjob
definition to any number greater than 0. The operator will then automatically take care of rescaling the job.
Operator values
WARNING
Currently the operator only supports modifying the job replica count. Any other modification requires manually deleting and re-creating the job. See issue 25
The operator helm chart supports these configuration values:
# Default values for malstrom-operator.
name: malstrom-operator # operator name
namespace: default # deployment namespace for operator pods
logLevel: info # logging level, operator will log at this level to std out
operator:
image:
repository: ghcr.io/malstromdevelopers/malstrom-operator
tag: latest
imagePullPolicy: IfNotPresent
resources: {}
# artifact manager allows directly uploading malstrom jobs as binaries
# not recommended for production
artifactManager:
deploy: false
image:
repository: ghcr.io/malstromdevelopers/artifact-manager
tag: latest
imagePullPolicy: IfNotPresent
resources: {}
service:
type: ClusterIP
port: 80
storageClassName: null
size: 1Gi
serviceAccount: # service account used by operator
name: malstrom-operator
namespaces: ["default"]
imagePullSecrets: [] # names of Kubernetes secrets used for pulling the operator images
affinity: {}