Apache Flink Kubernetes Operator THIS IS NOT A CONTRIBUTION
In a nutshell Manage Flink Application and Session clusters/jobs Deploy, Upgrade, Savepoint, Monitor
Intro
Why another Flink operator •Existing OSS solutions •Fragmentation, unreliable maintenance •Contribution barrier •FLIP-212: Introduce Flink Kubernetes Operator
Supported Deployment Modes •Flink deployments in application or session mode •Flink application managed through FlinkDeployment •Empty Flink session managed through FlinkDeployment + jobs managed through FlinkSessionJob •Job submissions against a session cluster •Foundation for the common workload types and languages (Java, SQL, Python)
Project Status •4 releases •Active community with contributors from multiple organizations
 •Production ready
Acknowledgement • Gyula Fora (gyfora) • Matyas Orhidi (morhidi) • Aitozi • Marton Balassi (mbalassi) • Yang Wang (wangyang0918) • Nicholas Jiang (SteNicholas) • Biao Geng (bgeng777) • Thomas Weise (tweise) • Hao Xin (haoxins) • Usamah Jassat (usamj) • James Busche (jbusche) • Junfan Zhang (zuston) • zeus1ammon 27 Contributors
The basics
Control Loop
Deployment Flow 1. Submit FlinkDeployment (CR) 2. Operator creates JobManager 3. JobManager creates task manager pods 4. JobManager submits job
FlinkDeployment Status Status captures everything the operator knows about the application What’s in it? • JobManager Deployment Status • Job status • Basic Job Details • Savepoint Info • Reconciliation Status • Last reconciled spec • Success / Error information
Observing FlinkDeployment status The Observer component is responsible for determining the status Observe fl ow 1. Observe JobManager Deployment Status (exists, errors, Flink ports) 2. Observe Job Status (Rest API accessible, job status) 3. Observe Savepoint Status (pending savepoints) It takes a few reconcile loops to reach steady state after deployment…
Once the job is running… • Check the Flink UI -> Metrics, Flame Graphs, Memory Utilisation • Check the logs • Operator log -> Reconcile / deployment errors • JM/TM logs -> Job errors, warnings etc. • Trigger Savepoints • Perform upgrades • Suspend / Resume processing
Lifecycle management
Resource Lifecycle
Upgrade / Suspend Applications Jobs can be upgraded by simply submitting a new spec. What happens then? 1. Suspend running job (keep state) 2. Restore using the new spec (using state from last run) If job.state is set to SUSPENDED the job will be paused.
Application Upgrade Modes Controls how the streaming job will be suspended and restarted on spec changes Available modes • Stateless • Last State • Savepoint
Application Upgrade Modes Stateless Last State Savepoint Con fi g Requirement None • Checkpointing Enabled • Kubernetes HA Enabled • Savepoint directory de fi ned Job Status Requirement None* None* Deployment healthy Job Running Suspend Mechanism Cancel / Delete Delete Flink deployment (keep HA data) Cancel with savepoint Restore Mechanism Deploy from empty state Deploy job -> recover state from HA data Restore From savepoint When to use? Stateless jobs, prototyping Most stateful jobs Job Migration / Forking * No savepoint in progress
Manual Savepoints • Allows you to keep “backups” of your application state Trigger by changing job.savepointTriggerNonce 
 • Use job.initialSavepointPath to start from a speci fi c savepoint on new deployments
 • Savepoints are cleaned up automatically
Automatic Savepoint Management •Periodic savepoints •Con fi g: kubernetes.operator.periodic.savepoint.interval •Savepoints triggered as part of regular reconcile loop •Savepoint history •Count and age based •Disposal of savepoints
Configuration
Zero Downtime Changes Initial con fi guration through helm chart •How to apply changes? Dynamic changes without control plane interruption •Clusters with many concurrent reconciliations •Reload operator con fi guration from con fi g map Namespaces to watch •List of namespaces + dynamic con fi g change
Operator (System) Level •General operator wide con fi guration •Cannot be overridden on a per-resource basis Examples: •Timeout for the observer to wait the Flink REST client to return •Interval for the controller to reschedule the reconcile process •Maximum number of threads running the reconciliation loop https://nightlies.apache.org/ fl ink/ fl ink-kubernetes-operator-docs-main/docs/operations/con fi guration/#system-con fi guration
Resource (User) Level •Settings that a ff ect a single deployment •“Extend” the CR (but don’t require CRD changes!) Examples: •Enable recovery of missing/deleted jobmanager deployments •Timeout for deployments to become ready/stable before being rolled back •Interval before a savepoint trigger attempt is marked as unsuccessful https://nightlies.apache.org/ fl ink/ fl ink-kubernetes-operator-docs-main/docs/operations/con fi guration/#resourceuser-con fi guration
Flink Pod Templates CR with limited direct settings (like memory and cpu resource) Maximum fl exibility through fl inkCon fi guration and pod templates (init, sidecar, storage etc.) Common template with job/task manager override https://nightlies.apache.org/ fl ink/ fl ink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/
Observability
Metrics Flink metric reporters (default reporters shipped with image) •Prometheus (example)
 Metric scopes •Operator: JVM, k8s client metrics •Watched Namespace: Deployment count, Status count •Resource: Reconciliation count, .. (JOSDK metrics)
Logging Con fi gured via Con fi gMap (default in helm chart)

Kubernetes Events Important changes (and errors) recorded as events kubectl describe flinkdeployment basic-example Events can be forwarded to infrastructure speci fi c collectors
When things go wrong
Error Scenarios Typical causes •Operator service account access problem •Invalid Flink deployment con fi guration •Operator failure / bug Where to look •Operator log •Service accounts / roles / role bindings FlinkDeployment Not Created
Error Scenarios Typical causes •Flink service account access problem •Flink image pull error •Pod template / other Kubernetes issues Where to look •FlinkDeployment (CR) events •Describe JobManager replicaset JobManager pod not created
Error Scenarios Typical causes •Flink service account access problem •TM pod template problems •Insu ffi cient resources Where to look •JobManager pod logs •Describe pending task manager pod TaskManager pods not ready
Roadmap
Roadmap •Version 1.1 •Dynamic change of watched namespaces •Pluggable Status and Event reporters (integration point for control planes) •Improved savepoint management & periodic triggering •Experimental auto-scaling using Horizontal Pod AutoScaler •Version 1.2 •Standalone deployment mode support (FLIP-225, support for older Flink versions) •Improved scaling and autoscaling support •Improved rollback mechanism •Roadmap documentation page
Q&A

Introducing the Apache Flink Kubernetes Operator

  • 1.
  • 2.
    In a nutshell ManageFlink Application and Session clusters/jobs Deploy, Upgrade, Savepoint, Monitor
  • 3.
  • 4.
    Why another Flinkoperator •Existing OSS solutions •Fragmentation, unreliable maintenance •Contribution barrier •FLIP-212: Introduce Flink Kubernetes Operator
  • 5.
    Supported Deployment Modes •Flinkdeployments in application or session mode •Flink application managed through FlinkDeployment •Empty Flink session managed through FlinkDeployment + jobs managed through FlinkSessionJob •Job submissions against a session cluster •Foundation for the common workload types and languages (Java, SQL, Python)
  • 6.
    Project Status •4 releases •Activecommunity with contributors from multiple organizations
 •Production ready
  • 7.
    Acknowledgement • Gyula Fora(gyfora) • Matyas Orhidi (morhidi) • Aitozi • Marton Balassi (mbalassi) • Yang Wang (wangyang0918) • Nicholas Jiang (SteNicholas) • Biao Geng (bgeng777) • Thomas Weise (tweise) • Hao Xin (haoxins) • Usamah Jassat (usamj) • James Busche (jbusche) • Junfan Zhang (zuston) • zeus1ammon 27 Contributors
  • 8.
  • 9.
  • 10.
    Deployment Flow 1. SubmitFlinkDeployment (CR) 2. Operator creates JobManager 3. JobManager creates task manager pods 4. JobManager submits job
  • 11.
    FlinkDeployment Status Status captureseverything the operator knows about the application What’s in it? • JobManager Deployment Status • Job status • Basic Job Details • Savepoint Info • Reconciliation Status • Last reconciled spec • Success / Error information
  • 12.
    Observing FlinkDeployment status TheObserver component is responsible for determining the status Observe fl ow 1. Observe JobManager Deployment Status (exists, errors, Flink ports) 2. Observe Job Status (Rest API accessible, job status) 3. Observe Savepoint Status (pending savepoints) It takes a few reconcile loops to reach steady state after deployment…
  • 13.
    Once the jobis running… • Check the Flink UI -> Metrics, Flame Graphs, Memory Utilisation • Check the logs • Operator log -> Reconcile / deployment errors • JM/TM logs -> Job errors, warnings etc. • Trigger Savepoints • Perform upgrades • Suspend / Resume processing
  • 15.
  • 16.
  • 17.
    Upgrade / SuspendApplications Jobs can be upgraded by simply submitting a new spec. What happens then? 1. Suspend running job (keep state) 2. Restore using the new spec (using state from last run) If job.state is set to SUSPENDED the job will be paused.
  • 18.
    Application Upgrade Modes Controlshow the streaming job will be suspended and restarted on spec changes Available modes • Stateless • Last State • Savepoint
  • 19.
    Application Upgrade Modes StatelessLast State Savepoint Con fi g Requirement None • Checkpointing Enabled • Kubernetes HA Enabled • Savepoint directory de fi ned Job Status Requirement None* None* Deployment healthy Job Running Suspend Mechanism Cancel / Delete Delete Flink deployment (keep HA data) Cancel with savepoint Restore Mechanism Deploy from empty state Deploy job -> recover state from HA data Restore From savepoint When to use? Stateless jobs, prototyping Most stateful jobs Job Migration / Forking * No savepoint in progress
  • 20.
    Manual Savepoints • Allows youto keep “backups” of your application state Trigger by changing job.savepointTriggerNonce 
 • Use job.initialSavepointPath to start from a speci fi c savepoint on new deployments
 • Savepoints are cleaned up automatically
  • 21.
    Automatic Savepoint Management •Periodicsavepoints •Con fi g: kubernetes.operator.periodic.savepoint.interval •Savepoints triggered as part of regular reconcile loop •Savepoint history •Count and age based •Disposal of savepoints
  • 22.
  • 23.
    Zero Downtime Changes Initialcon fi guration through helm chart •How to apply changes? Dynamic changes without control plane interruption •Clusters with many concurrent reconciliations •Reload operator con fi guration from con fi g map Namespaces to watch •List of namespaces + dynamic con fi g change
  • 24.
    Operator (System) Level •Generaloperator wide con fi guration •Cannot be overridden on a per-resource basis Examples: •Timeout for the observer to wait the Flink REST client to return •Interval for the controller to reschedule the reconcile process •Maximum number of threads running the reconciliation loop https://nightlies.apache.org/ fl ink/ fl ink-kubernetes-operator-docs-main/docs/operations/con fi guration/#system-con fi guration
  • 25.
    Resource (User) Level •Settingsthat a ff ect a single deployment •“Extend” the CR (but don’t require CRD changes!) Examples: •Enable recovery of missing/deleted jobmanager deployments •Timeout for deployments to become ready/stable before being rolled back •Interval before a savepoint trigger attempt is marked as unsuccessful https://nightlies.apache.org/ fl ink/ fl ink-kubernetes-operator-docs-main/docs/operations/con fi guration/#resourceuser-con fi guration
  • 26.
    Flink Pod Templates CRwith limited direct settings (like memory and cpu resource) Maximum fl exibility through fl inkCon fi guration and pod templates (init, sidecar, storage etc.) Common template with job/task manager override https://nightlies.apache.org/ fl ink/ fl ink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/
  • 27.
  • 28.
    Metrics Flink metric reporters(default reporters shipped with image) •Prometheus (example)
 Metric scopes •Operator: JVM, k8s client metrics •Watched Namespace: Deployment count, Status count •Resource: Reconciliation count, .. (JOSDK metrics)
  • 29.
    Logging Con fi gured via Con fi gMap(default in helm chart)

  • 30.
    Kubernetes Events Important changes(and errors) recorded as events kubectl describe flinkdeployment basic-example Events can be forwarded to infrastructure speci fi c collectors
  • 31.
  • 32.
    Error Scenarios Typical causes •Operatorservice account access problem •Invalid Flink deployment con fi guration •Operator failure / bug Where to look •Operator log •Service accounts / roles / role bindings FlinkDeployment Not Created
  • 33.
    Error Scenarios Typical causes •Flinkservice account access problem •Flink image pull error •Pod template / other Kubernetes issues Where to look •FlinkDeployment (CR) events •Describe JobManager replicaset JobManager pod not created
  • 34.
    Error Scenarios Typical causes •Flinkservice account access problem •TM pod template problems •Insu ffi cient resources Where to look •JobManager pod logs •Describe pending task manager pod TaskManager pods not ready
  • 35.
  • 36.
    Roadmap •Version 1.1 •Dynamic changeof watched namespaces •Pluggable Status and Event reporters (integration point for control planes) •Improved savepoint management & periodic triggering •Experimental auto-scaling using Horizontal Pod AutoScaler •Version 1.2 •Standalone deployment mode support (FLIP-225, support for older Flink versions) •Improved scaling and autoscaling support •Improved rollback mechanism •Roadmap documentation page
  • 37.