Tune Throughput and Queue Depth
| Field | Value |
|---|---|
| Difficulty | Advanced |
| Estimated Read Time | 15-20 minutes |
| Labels | performance, tuning, async, queues |
Performance tuning only helps once your correctness baseline is stable; this chapter assumes it is, and turns to the knobs that decide how an async pipeline behaves when work arrives faster than it can be processed. You will set the queue depth, choose what happens when that queue fills, push a deterministic burst of frames non-blockingly, drain the results, and read the measurement report that tells you whether you dropped anything and how long each frame took.
By the end you will have a working harness for measuring an async run under backpressure: enqueue counts, drop counts, outputs pulled, average latency, and push cost. The same loop is the basis for tuning a real pipeline against the heuristics in In Practice.
Walkthrough
Configure the run options
RunOptions is where async behavior under load is decided. We set queue_depth (how many in-flight samples the runtime accepts), overflow_policy (what happens when that queue is full — Block, KeepLatest, or DropIncoming), output_memory = Owned (returned tensors own their data so they survive past the pull), We then build() the graph in Async mode, which gives us a run with independent producer and consumer sides.
The overflow policy is parsed from --drop into simaai::neat::OverflowPolicy::{Block,KeepLatest,DropIncoming}; graph.build(input, opt) returns the run handle.
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;
auto run = graph.build(std::vector<cv::Mat>{rgb}, opt);
Push the workload and drain
This is where the queue policy is exercised. We call try_push(...) in a tight loop — a non-blocking push that simply returns whether the sample was accepted, so a full queue under DropIncoming/KeepLatest shows up as rejected pushes rather than a stall. After the burst we call close_input() to signal no more inputs, then drain the consumer side with a pull(...) loop until it returns empty. Pairing try_push with close_input plus a drain loop is the canonical non-blocking async pattern.
// try_push never blocks; pair it with close_input + drain pull loop.
simaai::neat::MeasureOptions measure_opt;
measure_opt.title = "tutorial 016 throughput";
auto scope = run.start_measurement(measure_opt);
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();
int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;
const auto measured = scope.stop();
Read the measurement report
With the run drained, we stop the measurement scope. The report counters group gives runtime-side numbers — inputs enqueued, inputs dropped, outputs pulled — while input gives push-side numbers such as average push cost and input renegotiations. Together, these tell you whether your queue depth and overflow policy did what you intended: did frames drop, did latency climb, was the push path cheap.
std::cout << "inputs_enqueued=" << measured.counters.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << measured.counters.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << measured.end_to_end.avg_ms << "\n";
std::cout << "avg_push_us=" << measured.input.avg_push_us << "\n";
std::cout << "renegotiations=" << measured.input.renegotiations << "\n";
Run
This chapter needs no model archive. Run the Python and C++ (prebuilt) commands from the Neat install root (the directory that contains share/ and lib/); run the build from source commands from the repo root.
C++ (prebuilt):
./lib/sima-neat/tutorials/tutorial_016_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block
C++ (build from source):
./build.sh --target tutorial_016_tune_throughput_and_queues
./build/tutorials-standalone/tutorial_016_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block
Expected output (exact counts and timings depend on the host and policy):
inputs_enqueued=32
inputs_dropped=0
outputs_pulled=32
avg_latency_ms=0.42
avg_push_us=18.0
renegotiations=0
[OK] 016_tune_throughput_and_queues
(The Python build prints the same keys without the trailing [OK] line.)
To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.
In Practice
Practical guidance for queue sizing, drop policies, presets, and output-lifetime safety.
Queue sizing (queue_depth)
Heuristics:
- Start with
queue_depth = 4–16for low‑latency pipelines. - Increase queues if your producer is bursty or if downstream elements have variable latency (decode/MLA/postproc).
- Keep queues small if you need freshest frames (e.g., live camera preview).
Overflow policy (RunOptions::overflow_policy)
Block: safest for correctness; producer waits when queue is full.DropIncoming: keep queued work, drop incoming samples when saturated.KeepLatest: prefer freshest frames, drop the oldest queued samples.
For live feeds, KeepLatest usually yields the lowest end-to-end latency.
Presets and renegotiation
Use RunOptions::preset to control latency/safety tradeoffs:
Realtime: lowest latency, aggressive freshness behavior.Balanced: starts zero-copy when possible, runs startup probe checks, and falls back to copy mode if reliability trips.Reliable: conservative behavior and stable output ownership.
Input shape renegotiation is automatic for dynamic inputs (the renegotiations counter above reports how often it happened).
Output lifetimes (output_memory)
output_memory = Owned: returnedTensorowns its data.output_memory = ZeroCopy: tensor may reference runtime buffers reused after pull.output_memory = Auto: runtime chooses zero-copy first and falls back to owned where reliability requires it.
If you need to keep tensor data beyond the current step, call clone() or cpu().contiguous().
Buffer pool safety
RunAdvancedOptions::max_input_bytessets a hard upper bound on input buffer allocation.- If a larger buffer is required, the runtime fails fast with an explicit error.
Use these to protect long‑running processes from unbounded allocations when inputs change size.
Full source
Show the complete C++ and Python programs
// Tune async Graph throughput via RunOptions and MeasureReport.
//
// Usage:
// tutorial_016_tune_throughput_and_queues [--iters 32] [--queue 4] [--drop block|latest|incoming]
#include "neat.h"
#include <opencv2/core.hpp>
#include <iostream>
#include <stdexcept>
#include <string>
namespace {
bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}
int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}
simaai::neat::OverflowPolicy parse_drop_policy(int argc, char** argv) {
std::string mode;
if (!get_arg(argc, argv, "--drop", mode))
return simaai::neat::OverflowPolicy::Block;
if (mode == "latest")
return simaai::neat::OverflowPolicy::KeepLatest;
if (mode == "incoming")
return simaai::neat::OverflowPolicy::DropIncoming;
return simaai::neat::OverflowPolicy::Block;
}
} // namespace
int main(int argc, char** argv) {
try {
const int iters = parse_int_arg(argc, argv, "--iters", 32);
const int queue_depth = parse_int_arg(argc, argv, "--queue", 4);
cv::Mat rgb(120, 160, CV_8UC3, cv::Scalar(70, 20, 200));
if (!rgb.isContinuous())
rgb = rgb.clone();
simaai::neat::Graph graph;
simaai::neat::InputOptions in;
in.format = "RGB";
in.width = rgb.cols;
in.height = rgb.rows;
in.depth = rgb.channels();
in.is_live = true;
graph.add(simaai::neat::nodes::Input(in));
graph.add(simaai::neat::nodes::Output());
// CORE LOGIC
// RunOptions controls how the async runner buffers and drops frames.
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;
auto run = graph.build(std::vector<cv::Mat>{rgb}, opt);
// try_push never blocks; pair it with close_input + drain pull loop.
simaai::neat::MeasureOptions measure_opt;
measure_opt.title = "tutorial 016 throughput";
auto scope = run.start_measurement(measure_opt);
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();
int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;
const auto measured = scope.stop();
std::cout << "inputs_enqueued=" << measured.counters.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << measured.counters.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << measured.end_to_end.avg_ms << "\n";
std::cout << "avg_push_us=" << measured.input.avg_push_us << "\n";
std::cout << "renegotiations=" << measured.input.renegotiations << "\n";
std::cout << "[OK] 016_tune_throughput_and_queues\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}