Skip to content

Conversation

@piochelepiotr
Copy link
Contributor

@piochelepiotr piochelepiotr commented Nov 24, 2025

What does this PR do?

This PR introduces the kafka_actions integration, which enables one-time administrative and operational actions on Kafka clusters. Note: This integration is exclusively triggered via Remote Configuration and is not meant to run continuously like traditional checks.

Supported Actions:

Action Description
read_messages Read and filter messages with jq-style expressions, supporting streaming and multiple formats (JSON, String, BSON, Avro, Protobuf)
produce_message Produce messages to topics with base64-encoded payloads and headers
create_topic Create topics with custom partitions, replication factor, and configs
update_topic_config Update topic configurations and partition counts
delete_topic Delete topics
delete_consumer_group Delete consumer groups
update_consumer_group_offsets Reset consumer group offsets to specific positions

Key Features:

  • Cluster ID verification before executing actions (prevents accidental operations on wrong clusters)
  • Advanced filtering with operators: ==, !=, >, <, >=, <=, contains, and, or, nested field access
  • Real-time streaming for message consumption with configurable limits (n_messages_retrieved, max_scanned_messages)
  • Centralized configuration validation with detailed error messages

Motivation

Operations teams need the ability to perform targeted, one-time Kafka actions for debugging, incident response, and maintenance—without deploying custom scripts or tools. This integration provides a secure, auditable way to:

  • Read specific messages from production topics for debugging (e.g., "find failed orders from user X")
  • Create/modify topics during migrations or scaling events
  • Reset consumer group offsets during incident recovery
  • Produce test messages for validation

By integrating these capabilities into the Datadog Agent and triggering them via Remote Configuration, teams can perform Kafka operations instantly from the UI.

Review checklist (to be filled by reviewers)

  • Feature or bugfix MUST have appropriate tests (unit, integration, e2e)
  • Add the qa/skip-qa label if the PR doesn't need to be tested during QA.
  • If you need to backport this PR to another branch, you can add the backport/<branch-name> label to the PR and it will automatically open a backport PR once this one is merged
@codecov
Copy link

codecov bot commented Nov 24, 2025

Codecov Report

❌ Patch coverage is 74.20516% with 430 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.85%. Comparing base (b40aa6f) to head (35138c5).
⚠️ Report is 9 commits behind head on master.

Additional details and impacted files
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +171 to +176
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
self.log.debug("Reached end of partition")
continue
else:
raise KafkaException(msg.error())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle partition EOF using KafkaError constant

In consume_messages the code checks msg.error().code() == KafkaException._PARTITION_EOF, but KafkaException does not define _PARTITION_EOF (the constant lives on KafkaError). When a consumer hits the end of a partition—an expected condition while tailing topics—this attribute access will raise AttributeError and abort the read_messages action instead of skipping the EOF. Use the KafkaError EOF code (and import it) to avoid crashing on normal partition boundaries.

Useful? React with 👍 / 👎.

@estherk15
Copy link
Contributor

Thanks @piochelepiotr! I created a card for docs editorial review. Let me know if this is on a deadline.

@estherk15 estherk15 added the editorial review Waiting on a more in-depth review from a docs team editor label Nov 24, 2025
Copy link
Contributor

@estherk15 estherk15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow up @piochelepiotr. Left some formatting suggestions, let me know if you have any questions on my comments!

piochelepiotr and others added 14 commits November 25, 2025 14:41
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
estherk15
estherk15 previously approved these changes Nov 25, 2025
Co-authored-by: Esther Kim <esther.kim@datadoghq.com>
@temporal-github-worker-1 temporal-github-worker-1 bot dismissed estherk15’s stale review November 25, 2025 22:09

Review from estherk15 is dismissed. Related teams and files:

  • documentation
    • kafka_actions/README.md
@iliakur iliakur self-assigned this Nov 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment