Skip to content

Conversation

@yangxi
Copy link

@yangxi yangxi commented Apr 3, 2019

The Trogdor Runner executes Trogdor ExternalCommandSpec tasks. It can be invoked directly from command line ./tests/trogdor_runner --spec ExternalCommandSpecFile, or driven by Trogdor agents. When running from command, the Trogdor Runner reads the spec file. If driven by Trogdor agents, this runner is waiting on the stdin for the ExternalCommandSpec tasks, then execute the tasks.

  • Add Trogdor Runner (./tests/trogdor/trogdor_runner.py) to execute Trogdor tasks.
  • Add Trogdor Produce Runner (./tests/trogdor/produce_spec_runner.py) to execute Trogdor Produce task.
  • Add Trogdor Produce Test to ./tests/run.sh.
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"bootstrapServers": "localhost:29092",
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the total number of messages produced?
If so it seems a bit low to only produce for 5s, we'll want at least 30s of producing to amortize for startup costs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is just an example spec file for demonstrating how to config the workload. For real product perf tests, this is too short.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?
Let's keep it as the 5s case. I am working on creating a regression Jenkins job that compares performance between Python clients and Java clients. We will have complex spec files for that Jenkins job. After I finish that, I will add those spec files to this tests/trogdor/ directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is much value in a perf test that runs for this short, the results will be too noisy. So let's provide a file with proper defaults. At least 30s, preferably 60s of runtime.

"targetMessagesPerSec": 10000,
"maxMessages": 50000,
"activeTopics": {
"foo[1-3]": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a more descriptive name, "py_trogdor" or something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a demo spec file. The idea is to use the similar one as the example of Java Trogdor Produce runner. https://github.com/apache/kafka/blob/trunk/tests/spec/simple_produce_bench.json

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the names to py_trogdor.

"p99LatencyMs": self.latency_histogram.get_value_at_percentile(99)/100.0,
"maxLatencyMs": self.latency_histogram.get_max_value()/100.0
}
if realQPS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if realQPS is not None:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

trogdor_log("ProduceSpecRunner: delivery failed: {} [{}]: {}".format(msg.topic(), msg.partition(), err))
self.nr_failed_messages += 1
now = time.time()
latency = now - sent_time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should preferably skip the first few messages, or use some priming messages without a delivery report to avoid the initial startup cost (that includes bringing up connections).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am working on adding an iteration field to the produce spec. With iteration, you can control how many times Trogdor should run the same workload. That's a better way to compare warmup Vs. stable cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is probably true for the cluster itself, but each client instance will need some warmup to amortize for connection setup times, etc, and this should not be included in the final throughput or latency results.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warmup will be the first iteration, say first 5000 messages. Also, the warmup latency is important, I suggest we should show that too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the iterations using the same producer instance?
If so, then this is fine, but if new producer instances are used for each iteration we still need a warmup period per iteration to avoid including startup costs in the throughput measurements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any comment on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the iterations field to the spec file. When the field > 1, the trogdor_runner.py reports an JSON array of performance results, e.g. {"status": [{"totalSent": 50000, "totalRecorded": 50000, "totalError": 0, "planMPS": 10000, "averageLatencyMs": 13814.083504000002, "p50LatencyMs": 13864.95, "p95LatencyMs": 29470.71, "p99LatencyMs": 30719.99, "maxLatencyMs": 31006.71}, {"totalSent": 50000, "totalRecorded": 50000, "totalError": 0, "planMPS": 10000, "averageLatencyMs": 19482.478866999998, "p50LatencyMs": 19394.55, "p95LatencyMs": 35573.75, "p99LatencyMs": 36597.75, "maxLatencyMs": 36823.03}]}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the iterations reusing the same producer instance?
If not then there is no warm-up benefit on the client side, so I really think they should be reused.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, they use the same producer.

self.nr_finished_messages += 1

def get_msg_callback(self):
product_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

produce_time, or send_time to be consistent with message_on_delivery

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

pre = match.group(1)
start = int(match.group(2))
end = int(match.group(3))
last = match.group(4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suffix. last could be confused with end.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

admin_request_timeout_ms = 25000
create_kafka_conf(bootstrap_servers, common_client_config, admin_client_config)
admin_conf = create_kafka_conf(bootstrap_servers, common_client_config, admin_client_config)
admin_conf["socket.timeout.ms"] = admin_request_timeout_ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? The defaults should be good.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to be same as the Java Trogdor produce worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let's avoid setting specific configs unless we know it is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the unneeded configuration.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill Done.


def create_producer_conn(bootstrap_servers, common_client_config, producer_config):
producer_conf = create_kafka_conf(bootstrap_servers, common_client_config, producer_config)
return Producer(**producer_conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for double **, the constructor takes a dict as is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return Producer(**producer_conf)


def create_admin_conn(bootstrap_servers, common_client_config, admin_client_config):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change conn to client.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

create_topic(admin_conn, topic_name, topic)


def create_topic(admin_conn, topic_name, topic):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

admin_client

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@yangxi yangxi requested a review from edenhill April 4, 2019 10:34
@yangxi
Copy link
Author

yangxi commented Apr 4, 2019

@edenhill Thanks for the review. I addressed most of your comments.

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor nits, otherwise LGTM!

The per-instance warmup is still a thing though.

self.nr_failed_messages = 0
self.producer = create_producer_conn(self.bootstrap_servers, self.common_client_conf, self.producer_conf)
trogdor_log("Produce {} at message-per-sec {}".format(self.max_messages, self.qps))
trogdor_log("Produce {} at message-per-sec {}".format(self.max_messages, self.mps))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be clearer, "Produce {} messages at .."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"bootstrapServers": "localhost:29092",
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

trogdor_log("ProduceSpecRunner: delivery failed: {} [{}]: {}".format(msg.topic(), msg.partition(), err))
self.nr_failed_messages += 1
now = time.time()
latency = now - sent_time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any comment on this?

admin_request_timeout_ms = 25000
create_kafka_conf(bootstrap_servers, common_client_config, admin_client_config)
admin_conf = create_kafka_conf(bootstrap_servers, common_client_config, admin_client_config)
admin_conf["socket.timeout.ms"] = admin_request_timeout_ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the unneeded configuration.

Xi Yang added 8 commits April 19, 2019 15:07
* Execute the produce spec in multiple iterations with the same producer connection. * When the iterations is larger than 1, the trogdor_runner reports an JSON array of performance results showing performance of multiple iterations.
@yangxi yangxi requested a review from edenhill June 25, 2019 00:52
Copy link
Contributor

@stanislavkozlovski stanislavkozlovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small comments. I had actually started reviewing this when you posted the PR but never got around to finishing the review. sorry!

"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"bootstrapServers": "localhost:29092",
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should increase this such that it doesn't run for 5 seconds only. Maybe 60s? (600000)

"commandNode": "node0",
"workload": {
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"bootstrapServers": "localhost:29092",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a typo, the default broker port is 9092, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is probably the docker mapped port.
@yangxi ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -0,0 +1,222 @@
# Copyright 2016 Confluent Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2019 :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2020 :P

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still working on this PR, will fix it :)

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous feedback is not fully addressed.
Still open questions regarding iterations and whether they reuse or not reuse the same producer instance (they should).
And it would be good with some doc strings on the trogdor classes to understand what they are doing.

run_trogdor() {
start_cluster
echo "Executing Trogdor"
python ${TEST_SOURCE}/trogdor/trogdor_runner.py --spec ${TEST_SOURCE}/trogdor/example-produce-spec.json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not point to the example, but a proper file. The user/dev should copy and modify the example file into place. Alternatively we call this file produce-spec.json, without the example part, to allow it to run out of the box.

"commandNode": "node0",
"workload": {
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"bootstrapServers": "localhost:29092",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is probably the docker mapped port.
@yangxi ?

"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"bootstrapServers": "localhost:29092",
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is much value in a perf test that runs for this short, the results will be too noisy. So let's provide a file with proper defaults. At least 30s, preferably 60s of runtime.

trogdor_log("ProduceSpecRunner: delivery failed: {} [{}]: {}".format(msg.topic(), msg.partition(), err))
self.nr_failed_messages += 1
now = time.time()
latency = now - sent_time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the iterations reusing the same producer instance?
If not then there is no warm-up benefit on the client side, so I really think they should be reused.

@mhowlett
Copy link
Contributor

I'm going to progress the work in this PR over here: https://github.com/mhowlett/confluent-kafka-python/tree/trogdor

I'll add to the comments in this PR, where relevant, but make updates to my own branch. we can sort out anything already raised in comments here, and continue the discussion in my branch when done.

@cla-assistant
Copy link

cla-assistant bot commented Aug 15, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Xi Yang seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants