rpk topic consume
Consume records from topics.
Consuming records reads from any amount of input topics, formats each record
according to --format
, and prints them to STDOUT
. The output formatter
understands a wide variety of formats.
The default output format --format json
is a special format that outputs each
record as JSON.
Formatting
Formatting is based on percent escapes and modifiers. Slashes can be used for common escapes:
Escape | Description |
---|---|
|
Tabs |
|
Newlines |
|
Carriage returns |
|
Slashes |
|
Hex encoded characters |
The percent encodings are represented like this:
Percent encoding | Description |
---|---|
|
Topic |
|
Topic length |
|
Key |
|
Key length |
|
Value |
|
Value length |
|
Begin the header specification |
|
Number of headers |
|
Partition |
|
Offset |
|
Leader epoch |
|
Timestamp (formatting described below) |
|
Producer ID |
|
Producer epoch |
|
Partition log start offset |
|
Partition last stable offset |
|
Partition high watermark |
|
Record attributes (formatting described below) |
|
Percent sign |
|
Left brace |
|
Right brace |
|
Number of records formatted |
Modifiers
Text and numbers can be formatted in many different ways, and the default
format can be changed within brace modifiers. %v
prints a value, while %v{hex}
prints the value hex encoded. %T
prints the length of a topic in ASCII, while
%T{big8}
prints the length of the topic as an eight byte big endian.
All modifiers go within braces following a percent-escape.
Numbers
Formatting number values can have the following modifiers:
Format | Description |
---|---|
|
Print the number as ASCII (default) |
|
Sixteen hex characters |
|
Eight hex characters |
|
Four hex characters |
|
Two hex characters |
|
One hex character |
|
Eight byte big endian number |
|
Four byte big endian number |
|
Two byte big endian number |
|
Alias for byte |
|
Eight byte little endian number |
|
Four byte little endian number |
|
Two byte little endian number |
|
Alias for byte |
|
One byte number |
|
|
All numbers are truncated as necessary per the modifier. Printing %V{byte}
for
a length 256 value prints a single null, whereas printing %V{big8}
prints the bytes 1 and 0.
When writing number sizes, the size corresponds to the size of the raw values,
not the size of encoded values. %T% t{hex}
for the topic foo
prints
3 666f6f
, not 6 666f6f
.
Timestamps
By default, the timestamp field is printed as a millisecond number value. In
addition to the number modifiers above, timestamps can be printed with either
Go
formatting:
%d{go[2006-01-02T15:04:05Z07:00]}
Or strftime
formatting:
%d{strftime[%F]}
An arbitrary amount of brackets (or braces, or # symbols) can wrap your date formatting:
%d{strftime=== [%F] ===}
This prints [YYYY-MM-DD]
, while the surrounding three # on each
side are used to wrap the formatting.
For more information on Go time formatting, see the Go documentation.
For more information on strftime
formatting, run man strftime
.
Attributes
Each record (or batch of records) has a set of possible attributes. Internally, these are packed into bit flags. Printing an attribute requires first selecting which attribute you want to print, and then optionally specifying how you want it to be printed:
%a{compression}
%a{compression;number}
%a{compression;big64}
%a{compression;hex8}
Compression is by default printed as text (none
, gzip
, …). Compression
can be printed as a number with ;number
, where number is any number
formatting option described above. No compression is 0
, gzip is 1
, etc.
%a{timestamp-type}
%a{timestamp-type;big64}
The record’s timestamp type prints as:
-
-1
for very old records (before timestamps existed) -
0
for client-generated timestamps -
1
for broker-generated timestamps
Number formatting can be controlled with ;number .
|
%a{transactional-bit}
%a{transactional-bit;bool}
Prints 1
if the record is a part of a transaction or 0
if it is not.
%a{control-bit}
%a{control-bit;bool}
Prints 1
if the record is a commit marker or 0
if it is not.
Text
Text fields without modifiers default to writing the raw bytes. Alternatively, there are the following modifiers:
Modifier | Description |
---|---|
|
Hex encoding |
|
Base64 standard encoding |
|
Base64 encoding raw |
|
The unpack modifier has a further internal specification, similar to timestamps above. |
Unpacking text can allow translating binary input into readable output. If a
value is a big-endian uint32, %v
prints the raw four bytes, while
%v{unpack[>I]}
prints the number in as ASCII. If unpacking exhausts the
input before something is unpacked fully, an error message is appended to the
output.
Headers
Headers are formatted with percent encoding inside of the modifier:
%h{%k=%v{hex}}
This prints all headers with a space before the key and after the value, an
equals sign between the key and value, and with the value hex encoded. Header
formatting actually just parses the internal format as a record format, so all
of the above rules about %K
, %V
, text, and numbers apply.
Values
Values for consumed records can be omitted by using the --meta-only
flag.
Tombstone records (records with a null
value) have their value omitted from the JSON output by default. All other records, including those with an empty-string value (""
), will have their values printed.
Offsets
The --offset
flag allows for specifying where to begin consuming, and
optionally, where to stop consuming. The literal words start
and end
specify consuming from the start and the end.
Offset | Description |
---|---|
|
Consume from the beginning |
|
Consume from the end |
|
Consume until the current end |
|
Consume oo after the current start offset |
|
Consume oo before the current end offset |
|
Consume after an exact offset |
|
Alias for oo |
|
Consume until an exact offset |
|
Consume from exact offset o1 until exact offset o2 |
|
Consume starting from a given timestamp |
|
alias for @t |
|
Consume until a given timestamp |
|
Consume from timestamp t1 until timestamp t2 |
Each timestamp option is evaluated until one succeeds.
Timestamp | Description |
---|---|
13 digits |
Parsed as a unix millisecond |
9 digits |
Parsed as a unix second |
YYYY-MM-DD |
Parsed as a day, UTC |
YYYY-MM-DDTHH:MM:SSZ |
Parsed as RFC3339, UTC; fractional seconds optional (.MMM) |
-dur |
Duration; from now (as t1) or from t1 (as t2) |
dur |
For t2 in @t1:t2, relative duration from t1 |
end |
For t2 in @t1:t2, the current end of the partition |
Durations are parsed simply:
3ms three milliseconds
10s ten seconds
9m nine minutes
1h one hour
1m3ms one minute and three milliseconds
For example:
-o @2022-02-14:1h consume 1h of time on Valentine's Day 2022
-o @-48h:-24h consume from 2 days ago to 1 day ago
-o @-1m:end consume from 1m ago until now
-o @:-1hr consume from the start until an hour ago
Examples
A key and value, separated by a space and ending in newline:
-f '%k %v\n'
A key length as four big endian bytes and the key as hex:
-f '%K{big32}%k{hex}'
A little endian uint32 and a string unpacked from a value:
-f '%v{unpack[is$]}'
Flags
Value | Type | Description |
---|---|---|
|
string |
Group balancer to use if group consuming (range, roundrobin, sticky, cooperative-sticky) (default "cooperative-sticky"). |
|
int32 |
Maximum amount of bytes per fetch request per broker (default 1048576). |
|
duration |
Maximum amount of time to wait when fetching from a broker before the broker replies (default 5s). |
|
string |
Output format (see --help for details) (default "json"). |
|
string |
Group to use for consuming (incompatible with -p). |
|
- |
Help for consume. |
|
- |
Print all record info except the record value (for -f json). |
|
int |
Quit after consuming this number of records (0 is unbounded). |
|
string |
Offset to consume from / to (start, end, 47, +2, -3) (default "start"). |
|
int32 |
int32Slice Comma delimited list of specific partitions to consume (default []). |
|
- |
Pretty print each record over multiple lines (for -f json) (default true). |
|
- |
Opt in to printing control records. |
|
string |
Rack to use for consuming, which opts into follower fetching. |
|
- |
Opt in to reading only committed offsets. |
|
- |
Parse topics as regex; consume any topic that matches any expression. |
|
strings |
[=key,value] If present, |
|
string |
Redpanda or |
|
stringArray |
Override |
|
string |
Profile to use. See |
|
- |
Enable verbose logging. |