Trifecta
Trifecta is a Command Line Interface (CLI) tool that enables users to quickly and easily inspect, verify and even query Kafka messages. In addition, Trifecta offers data import/export functions for transferring data between Kafka topics and many other Big Data Systems (including Cassandra, ElasticSearch, MongoDB and others).
Table of Contents
- Motivations
- Features
- Development
- Downloads
- What's New
- Trifecta UI
- Trifecta CLI
Motivations
The motivations behind creating Trifecta are simple; testing, verifying and managing Kafka topics and Zookeeper key-value pairs is an arduous task. The goal of this project is to ease the pain of developing applications that make use of Kafka and ZooKeeper via a console-based tool using simple Unix-like (or SQL-like) commands.
Features
- Avro integration
- Kafka — Avro support
-
Elastic Search — Avro support
- Copy Avro-encoded messages from Kafka to Elastic Search as JSON
- Zookeeper — Avro support (coming soon)
- Elastic Search integration (experimental)
- Kafka integration
- Storm integration
- Zookeeper integration
Development
Build Requirements
Configuring the project for your IDE
Generating an Eclipse project
$ sbt eclipse
Generating an Intellij Idea project
$ sbt gen-idea
Building the code
$ sbt clean assembly
Running the tests
$ sbt clean test
Configuring the application
On startup, Trifecta reads $HOME/.trifecta/config.properties (or creates the file if it doesn't exist). This file contains the configuration properties and connection strings for all supported systems.
# common properties
trifecta.common.autoSwitching = true
trifecta.common.columns = 25
trifecta.common.debugOn = false
trifecta.common.encoding = UTF-8
# Kafka/Zookeeper properties
trifecta.zookeeper.host = localhost:2181
# indicates whether Storm Partition Manager-style consumers should be read from Zookeeper
trifecta.storm.kafka.consumers.partitionManager = false
# Cassandra properties
trifecta.cassandra.hosts = localhost
# ElasticSearch properties
trifecta.elasticsearch.hosts = localhost:9200
# MongoDB properties
trifecta.mongodb.hosts = localhost
# Storm properties
trifecta.storm.hosts = localhost
Run the application
To start the Trifecta REPL:
$ java -jar trifecta.jar
Optionally, you can execute Trifecta instructions (commands) right from the command line:
$ java -jar trifecta.jar kls -l
Downloads
Trifecta binaries are available for immediate download.
- Trifecta Bundle v0.18.13 (66MB)
What's New
v0.18.13
- Trifecta UI
- Added real-time message streaming capability to the Inspect tab
v0.18.12
- Trifecta UI
- Updated Decoder and Query views for consistency
v0.18.1 to v0.18.11
- Trifecta UI
- Added capability to navigate directly from a message (in the Inspect tab) to its decoder (in the Decoders tab)
- Decoder tab user interface improvements
-
Observe tab user interface improvements
- The Consumers section has been enhanced to display topic and consumer offset deltas
- Redesigned the Replicas view to report under-replicated partitions.
- The Topics section has been enhanced to display topic offset deltas
-
Query tab user interface improvements
- Multiple queries can be executed concurrently
- Kafka Query language (KQL) (formerly Big-Data Query Language/BDQL) has grammar simplification
- The "with default" clause is no longer necessary
- The embedded web server now supports asynchronous request/response flows
Trifecta UI
Trifecta offers a single-page web application (via Angular.js) with a REST service layer and web-socket support, which offers a comprehensive and powerful set of features for inspecting Kafka topic partitions and messages.
Starting the embedded web server
To start the embedded web server, issue the following from the command line:
$ java -jar trifecta.jar --http-start
You'll see a few seconds of log messages, then a prompt indicating the web interface is ready for use.
Open your browser and navigate to http://localhost:8888
Configuring Trifecta UI
Additionally, Trifecta UI introduces a few new properties to the application configuration file (located in $HOME/.trifecta/config.properties). NOTE: The property values shown below are the default values.
# the embedded web server host/IP and port for client connections
trifecta.web.host = localhost
trifecta.web.port = 8888
# the interval (in seconds) that changes to consumer offsets will be pushed to web-socket clients
trifecta.web.push.interval.consumer = 15
# the interval (in seconds) that sampling messages will be pushed to web-socket clients
trifecta.web.push.interval.sampling = 2
# the interval (in seconds) that changes to topics (new messages) will be pushed to web-socket clients
trifecta.web.push.interval.topic = 15
# the number of actors to create for servicing requests
trifecta.web.actor.concurrency = 10
Configuring default Avro Decoders
Trifecta UI supports decoding Avro-encoded messages and displaying them in JSON format. To associate an Avro schema to a
Kafka topic, place the schema file in a subdirectory with the same name as the topic. For example, if I wanted to
associate an Avro file named quotes.avsc
to the Kafka topic shocktrade.quotes.avro
, I'd setup the following
file structure:
$HOME/.trifecta/decoders/shocktrade.quotes.avro/quotes.avsc
The name of the actual schema file can be anything you'd like. Once the file has been placed in the appropriate location, restart Trifecta UI, and your messages will be displayed in JSON format.
Additionally, once a "default" decoder is configured for a Kafka topic, the CLI application can use them as well. For more details about using default decoders with the CLI application click here.
Inspecting Kafka Messages
Trifecta UI has powerful support for viewing Kafka messages, and when the messages are in either JSON or Avro format Trifecta displays them as human readable (read: pretty) JSON documents.
Replicas
Trifecta UI provides a comprehensive view of the current state of replication for each topic partition.
Queries
Trifecta UI also provides a way to execute queries against Avro-encoded topics using the Kafka Query Language (KQL). KQL is a SQL-like language with syntax as follows:
select <fields> from <topic> [with <decoder>]
[where <condition>]
[limit <count>]
Consider the following example:
select symbol, exchange, lastTrade, open, close, high, low
from "shocktrade.quotes.avro"
where lastTrade <= 1 and volume >= 1,000,000
limit 25
The above query retrieves the symbol
, exchange
, lastTrade
, open
, close
, high
and low
fields from messages
within the Kafka topic shocktrade.quotes.avro
(using the default decoder) filtering for only messages where the
lastTrade
is less than or equal to 1
, the volume
is greater than or equal to 1,000,000
, and limiting the
number of results to 25
.
Now consider a similar example, except here we'll specify a custom decoder file (avro/quotes.avsc
):
select symbol, exchange, lastTrade, open, close, high, low
from "shocktrade.quotes.avro" with "avro:file:avro/quotes.avsc"
where lastTrade <= 1 and volume >= 1,000,000
limit 25
For more detailed information about KQL queries, click here.
Trifecta CLI
Trifecta CLI (Command Line Interface) is a REPL tool that simplifies inspecting Kafka messages, Zookeeper data, and optionally Elastic Search documents, MongoDB documents and Storm topologies via simple UNIX-like commands (or SQL-like queries).
Core Module
Trifecta exposes its commands through modules. At any time to see which modules are available one could issue the modules
command.
core:/home/ldaniels> modules
+ ------------------------------------------------------------------------------------- +
| name className loaded active |
+ ------------------------------------------------------------------------------------- +
| cassandra com.ldaniels528.trifecta.modules.CassandraModule true false |
| core com.ldaniels528.trifecta.modules.CoreModule true true |
| elasticSearch com.ldaniels528.trifecta.modules.ElasticSearchModule true false |
| kafka com.ldaniels528.trifecta.modules.KafkaModule true false |
| mongodb com.ldaniels528.trifecta.modules.MongoModule true false |
| storm com.ldaniels528.trifecta.modules.StormModule true false |
| zookeeper com.ldaniels528.trifecta.modules.ZookeeperModule true false |
+ ------------------------------------------------------------------------------------- +
To execute local system commands, enclose the command you'd like to execute using the back-ticks (`) symbol:
core:/home/ldaniels> `netstat -ptln`
To see all available commands, use the help
command (?
is a shortcut):
core:/home/ldaniels> ?
+ ---------------------------------------------------------------------------------------------------------------------- +
| command module description |
+ ---------------------------------------------------------------------------------------------------------------------- +
| ! core Executes a previously issued command |
| $ core Executes a local system command |
| ? core Provides the list of available commands |
| autoswitch core Automatically switches to the module of the most recently executed command |
. .
. .
. .
| ztree zookeeper Retrieves Zookeeper directory structure |
+ ---------------------------------------------------------------------------------------------------------------------- +
To see the syntax/usage of a command, use the syntax
command:
core:/home/ldaniels> syntax kget
Description: Retrieves the message at the specified offset for a given topic partition
Usage: kget [-o outputSource] [-d YYYY-MM-DDTHH:MM:SS] [-a avroSchema] [topic] [partition] [offset]
Kakfa Module
To view all of the Kafka commands, use the -m
switch and the module name
kafka:/> ? -m kafka
+ ------------------------------------------------------------------------------------------------------------------- +
| command module description |
+ ------------------------------------------------------------------------------------------------------------------- +
| kbrokers kafka Returns a list of the brokers from ZooKeeper |
| kcommit kafka Commits the offset for a given topic and group |
| kconsumers kafka Returns a list of the consumers from ZooKeeper |
| kcount kafka Counts the messages matching a given condition |
| kcursor kafka Displays the message cursor(s) |
| kfetch kafka Retrieves the offset for a given topic and group |
| kfetchsize kafka Retrieves or sets the default fetch size for all Kafka queries |
| kfind kafka Finds messages matching a given condition and exports them to a topic |
| kfindone kafka Returns the first occurrence of a message matching a given condition |
| kfirst kafka Returns the first message for a given topic |
| kget kafka Retrieves the message at the specified offset for a given topic partition |
| kgetkey kafka Retrieves the key of the message at the specified offset for a given topic partition |
| kgetminmax kafka Retrieves the smallest and largest message sizes for a range of offsets for a given partition |
| kgetsize kafka Retrieves the size of the message at the specified offset for a given topic partition |
| kinbound kafka Retrieves a list of topics with new messages (since last query) |
| klast kafka Returns the last message for a given topic |
| kls kafka Lists all existing topics |
| knext kafka Attempts to retrieve the next message |
| kprev kafka Attempts to retrieve the message at the previous offset |
| kreset kafka Sets a consumer group ID to zero for all partitions |
| kstats kafka Returns the partition details for a given topic |
| kswitch kafka Switches the currently active topic cursor |
+ ------------------------------------------------------------------------------------------------------------------- +
Kafka Brokers
To list the replica brokers that Zookeeper is aware of:
kafka:/> kbrokers
+ ---------------------------------------------------------- +
| jmx_port timestamp host version port |
+ ---------------------------------------------------------- +
| 9999 2014-08-23 19:33:01 PDT dev501 1 9093 |
| 9999 2014-08-23 18:41:07 PDT dev501 1 9092 |
| 9999 2014-08-23 18:41:07 PDT dev501 1 9091 |
| 9999 2014-08-23 20:05:17 PDT dev502 1 9093 |
| 9999 2014-08-23 20:05:17 PDT dev502 1 9092 |
| 9999 2014-08-23 20:05:17 PDT dev502 1 9091 |
+ ---------------------------------------------------------- +
Kafka Topics
To list all of the Kafka topics that Zookeeper is aware of:
kafka:/> kls
+ ------------------------------------------------------ +
| topic partitions replicated |
+ ------------------------------------------------------ +
| Shocktrade.quotes.csv 5 100% |
| shocktrade.quotes.avro 5 100% |
| test.Shocktrade.quotes.avro 5 100% |
| hft.Shocktrade.quotes.avro 5 100% |
| test2.Shocktrade.quotes.avro 5 100% |
| test1.Shocktrade.quotes.avro 5 100% |
| test3.Shocktrade.quotes.avro 5 100% |
+ ------------------------------------------------------ +
To see a subset of the topics (matches any topic that starts with the given search term):
kafka:/> kls shocktrade.quotes.avro
+ ------------------------------------------------ +
| topic partitions replicated |
+ ------------------------------------------------ +
| shocktrade.quotes.avro 5 100% |
+ ------------------------------------------------ +
To see a detailed list including all partitions, use the -l
flag:
kafka:/> kls -l shocktrade.quotes.avro
+ ------------------------------------------------------------------ +
| topic partition leader replicas inSync |
+ ------------------------------------------------------------------ +
| shocktrade.quotes.avro 0 dev501:9092 1 1 |
| shocktrade.quotes.avro 1 dev501:9093 1 1 |
| shocktrade.quotes.avro 2 dev502:9091 1 1 |
| shocktrade.quotes.avro 3 dev502:9092 1 1 |
| shocktrade.quotes.avro 4 dev502:9093 1 1 |
+ ------------------------------------------------------------------ +
To see the statistics for a specific topic, use the kstats
command:
kafka:/> kstats Shocktrade.quotes.csv
+ --------------------------------------------------------------------------------- +
| topic partition startOffset endOffset messagesAvailable |
+ --------------------------------------------------------------------------------- +
| Shocktrade.quotes.csv 0 5945 10796 4851 |
| Shocktrade.quotes.csv 1 5160 10547 5387 |
| Shocktrade.quotes.csv 2 3974 8788 4814 |
| Shocktrade.quotes.csv 3 3453 7334 3881 |
| Shocktrade.quotes.csv 4 4364 8276 3912 |
+ --------------------------------------------------------------------------------- +
Kafka Navigable Cursor
The Kafka module offers the concept of a navigable cursor. Any command that references a specific message offset
creates a pointer to that offset, called a navigable cursor. Once the cursor has been established, with a single command,
you can navigate to the first, last, previous, or next message using the kfirst
, klast
, kprev
and knext
commands
respectively. Consider the following examples:
To retrieve the first message of a topic partition:
kafka:/> kfirst Shocktrade.quotes.csv 0
[5945:000] 22.47.44.46.22.2c.31.30.2e.38.31.2c.22.39.2f.31.32.2f.32.30.31.34.22.2c.22 | "GDF",10.81,"9/12/2014"," |
[5945:025] 34.3a.30.30.70.6d.22.2c.4e.2f.41.2c.4e.2f.41.2c.2d.30.2e.31.30.2c.22.2d.30 | 4:00pm",N/A,N/A,-0.10,"-0 |
[5945:050] 2e.31.30.20.2d.20.2d.30.2e.39.32.25.22.2c.31.30.2e.39.31.2c.31.30.2e.39.31 | .10 - -0.92%",10.91,10.91 |
[5945:075] 2c.31.30.2e.38.31.2c.31.30.2e.39.31.2c.31.30.2e.38.30.2c.33.36.35.35.38.2c | ,10.81,10.91,10.80,36558, |
[5945:100] 4e.2f.41.2c.22.4e.2f.41.22 | N/A,"N/A" |
The previous command resulted in the creation of a navigable cursor (notice below how our prompt has changed).
kafka:Shocktrade.quotes.csv/0:5945> _
Let's view the cursor:
kafka:Shocktrade.quotes.csv/0:5945> kcursor
+ ------------------------------------------------------------------- +
| topic partition offset nextOffset decoder |
+ ------------------------------------------------------------------- +
| Shocktrade.quotes.csv 0 5945 5946 |
+ ------------------------------------------------------------------- +
Let's view the next message for this topic partition:
kafka:Shocktrade.quotes.csv/0:5945> knext
[5946:000] 22.47.44.50.22.2c.31.38.2e.35.31.2c.22.39.2f.31.32.2f.32.30.31.34.22.2c.22 | "GDP",18.51,"9/12/2014"," |
[5946:025] 34.3a.30.31.70.6d.22.2c.4e.2f.41.2c.4e.2f.41.2c.2d.30.2e.38.39.2c.22.2d.30 | 4:01pm",N/A,N/A,-0.89,"-0 |
[5946:050] 2e.38.39.20.2d.20.2d.34.2e.35.39.25.22.2c.31.39.2e.34.30.2c.31.39.2e.32.37 | .89 - -4.59%",19.40,19.27 |
[5946:075] 2c.31.38.2e.35.31.2c.31.39.2e.33.32.2c.31.38.2e.33.30.2c.31.35.31.36.32.32 | ,18.51,19.32,18.30,151622 |
[5946:100] 30.2c.38.32.32.2e.33.4d.2c.22.4e.2f.41.22 | 0,822.3M,"N/A" | | M,"N/A" |
Let's view the last message for this topic partition:
kafka:Shocktrade.quotes.csv/0:5945> klast
[10796:000] 22.4e.4f.53.50.46.22.2c.30.2e.30.30.2c.22.4e.2f.41.22.2c.22.4e.2f.41.22.2c | "NOSPF",0.00,"N/A","N/A", |
[10796:025] 4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.22.4e.2f.41.20.2d.20.4e.2f.41.22.2c.4e | N/A,N/A,N/A,"N/A - N/A",N |
[10796:050] 2f.41.2c.4e.2f.41.2c.30.2e.30.30.2c.4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.4e | /A,N/A,0.00,N/A,N/A,N/A,N |
[10796:075] 2f.41.2c.22.54.69.63.6b.65.72.20.73.79.6d.62.6f.6c.20.68.61.73.20.63.68.61 | /A,"Ticker symbol has cha |
[10796:100] 6e.67.65.64.20.74.6f.3a.20.3c.61.20.68.72.65.66.3d.22.2f.71.3f.73.3d.4e.4f | nged to: <a href="/q?s=NO |
[10796:125] 53.50.46.22.3e.4e.4f.53.50.46.3c.2f.61.3e.22 | SPF">NOSPF</a>" | | ,N/A,"N/A" |
Notice above we didn't have to specify the topic or partition because it's defined in our cursor. Let's view the cursor again:
kafka:Shocktrade.quotes.csv/0:10796> kcursor
+ ------------------------------------------------------------------- +
| topic partition offset nextOffset decoder |
+ ------------------------------------------------------------------- +
| Shocktrade.quotes.csv 0 10796 10797 |
+ ------------------------------------------------------------------- +
Now, let's view the previous record:
kafka:Shocktrade.quotes.csv/0:10796> kprev
[10795:000] 22.4d.4c.50.4b.46.22.2c.30.2e.30.30.2c.22.4e.2f.41.22.2c.22.4e.2f.41.22.2c | "MLPKF",0.00,"N/A","N/A", |
[10795:025] 4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.22.4e.2f.41.20.2d.20.4e.2f.41.22.2c.4e | N/A,N/A,N/A,"N/A - N/A",N |
[10795:050] 2f.41.2c.4e.2f.41.2c.30.2e.30.30.2c.4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.4e | /A,N/A,0.00,N/A,N/A,N/A,N |
[10795:075] 2f.41.2c.22.54.69.63.6b.65.72.20.73.79.6d.62.6f.6c.20.68.61.73.20.63.68.61 | /A,"Ticker symbol has cha |
[10795:100] 6e.67.65.64.20.74.6f.3a.20.3c.61.20.68.72.65.66.3d.22.2f.71.3f.73.3d.4d.4c | nged to: <a href="/q?s=ML |
[10795:125] 50.4b.46.22.3e.4d.4c.50.4b.46.3c.2f.61.3e.22 | PKF">MLPKF</a>" | | N/A,"N/A" |
To retrieve the start and end offsets and number of messages available for a topic across any number of partitions:
kafka:Shocktrade.quotes.csv/0:10795> kstats
+ --------------------------------------------------------------------------------- +
| topic partition startOffset endOffset messagesAvailable |
+ --------------------------------------------------------------------------------- +
| Shocktrade.quotes.csv 0 5945 10796 4851 |
| Shocktrade.quotes.csv 1 5160 10547 5387 |
| Shocktrade.quotes.csv 2 3974 8788 4814 |
| Shocktrade.quotes.csv 3 3453 7334 3881 |
| Shocktrade.quotes.csv 4 4364 8276 3912 |
+ --------------------------------------------------------------------------------- +
NOTE: Above kstats
is equivalent to both kstats Shocktrade.quotes.csv
and
kstats Shocktrade.quotes.csv 0 4
. However, because of the cursor we previously established, those arguments
could be omitted.
Kafka Consumer Groups
To see the current offsets for all consumer group IDs:
kafka:Shocktrade.quotes.csv/0:10795> kconsumers
+ ------------------------------------------------------------------------------------- +
| consumerId topic partition offset topicOffset messagesLeft |
+ ------------------------------------------------------------------------------------- +
| dev Shocktrade.quotes.csv 0 5555 10796 5241 |
| dev Shocktrade.quotes.csv 1 0 10547 10547 |
| dev Shocktrade.quotes.csv 2 0 8788 8788 |
| dev Shocktrade.quotes.csv 3 0 7334 7334 |
| dev Shocktrade.quotes.csv 4 0 8276 8276 |
+ ------------------------------------------------------------------------------------- +
Let's change the committed offset for the current topic/partition (the one to which our cursor is pointing) to 6000
kafka:Shocktrade.quotes.csv/0:10795> kcommit dev 6000
Let's re-examine the consumer group IDs:
kafka:Shocktrade.quotes.csv/0:10795> kconsumers
+ ------------------------------------------------------------------------------------- +
| consumerId topic partition offset topicOffset messagesLeft |
+ ------------------------------------------------------------------------------------- +
| dev Shocktrade.quotes.csv 0 6000 10796 4796 |
| dev Shocktrade.quotes.csv 1 0 10547 10547 |
| dev Shocktrade.quotes.csv 2 0 8788 8788 |
| dev Shocktrade.quotes.csv 3 0 7334 7334 |
| dev Shocktrade.quotes.csv 4 0 8276 8276 |
+ ------------------------------------------------------------------------------------- +
Notice that the committed offset, for consumer group dev, has been changed to 6000 for partition 0.
Finally, let's use the kfetch
to retrieve just the offset for the consumer group ID:
kafka:Shocktrade.quotes.csv/0:10795> kfetch dev
6000
The Kafka Module also provides the capability for watching messages (in near real-time) as they become available. When
you watch a topic, a watch cursor is created, which allows you to move forward through the topic as new messages appear.
This differs from navigable cursors, which allow you to move back and forth through a topic at will. You can take
advantage of this feature by using two built-in commands: kwatch
and kwatchnext
.
First, kwatch
is used to create a connection to a consumer group:
kafka:Shocktrade.quotes.csv/0:10795> kwatch shocktrade.quotes.avro dev -a file:avro/quotes.avsc
Some output will likely follow as the connection (and watch cursor) are established. Next, if a message is already available, it will be returned immediately:
{
"symbol":"MNLDF",
"exchange":"OTHER OTC",
"lastTrade":1.18,
"tradeDate":null,
"tradeTime":"1:50pm",
"ask":null,
"bid":null,
"change":0.0,
"changePct":0.0,
"prevClose":1.18,
"open":null,
"close":1.18,
"high":null,
"low":null,
"volume":0,
"marketCap":null,
"errorMessage":null
}
kafka:[w]shocktrade.quotes.avro/4:23070> _
To retrieve any subsequent messages, you use the kwatchnext
command:
kafka:[w]shocktrade.quotes.avro/4:23070> kwatchnext
{
"symbol":"PXYN",
"exchange":"OTHER OTC",
"lastTrade":0.075,
"tradeDate":null,
"tradeTime":"3:57pm",
"ask":null,
"bid":null,
"change":0.01,
"changePct":15.38,
"prevClose":0.065,
"open":0.064,
"close":0.075,
"high":0.078,
"low":0.064,
"volume":1325779,
"marketCap":2.45E7,
"errorMessage":null
}
kafka:[w]shocktrade.quotes.avro/4:23071> _
Did you notice the "[w]" you see in the prompt? This indicates a watch cursor is active. However, because each call to
kwatchnext
will also update the navigable cursor, you are free to also use the bi-directional navigation commands
knext
and kprev
(as well as kfirst
and klast
). Be aware though, that changes to the navigable cursor do not
affect the watch cursor. Thus after a subsequent use of kwatchnext
, the navigable cursor will be overwritten.
Kafka Inbound Traffic
To retrieve the list of topics with new messages (since your last query):
kafka:/> kinbound
+ ----------------------------------------------------------------------------------------------------------- +
| topic partition startOffset endOffset change msgsPerSec lastCheckTime |
+ ----------------------------------------------------------------------------------------------------------- +
| Shocktrade.quotes.avro 3 0 657 65 16.3 09/13/14 06:37:03 PDT |
| Shocktrade.quotes.avro 0 0 650 64 16.0 09/13/14 06:37:03 PDT |
| Shocktrade.quotes.avro 1 0 618 56 14.0 09/13/14 06:37:03 PDT |
| Shocktrade.quotes.avro 2 0 618 49 12.3 09/13/14 06:37:03 PDT |
| Shocktrade.quotes.avro 4 0 584 40 10.0 09/13/14 06:37:03 PDT |
+ ----------------------------------------------------------------------------------------------------------- +
Next, we wait a few moments and run the command again:
kafka:/> kinbound
+ ----------------------------------------------------------------------------------------------------------- +
| topic partition startOffset endOffset change msgsPerSec lastCheckTime |
+ ----------------------------------------------------------------------------------------------------------- +
| Shocktrade.quotes.avro 1 0 913 295 15.6 09/13/14 06:37:21 PDT |
| Shocktrade.quotes.avro 3 0 952 295 15.6 09/13/14 06:37:21 PDT |
| Shocktrade.quotes.avro 2 0 881 263 13.9 09/13/14 06:37:21 PDT |
| Shocktrade.quotes.avro 4 0 846 262 13.8 09/13/14 06:37:21 PDT |
| Shocktrade.quotes.avro 0 0 893 243 12.8 09/13/14 06:37:21 PDT |
+ ----------------------------------------------------------------------------------------------------------- +
Kafka & Avro Integration
Trifecta supports Avro integration for Kafka. The next few examples make use of the following Avro schema:
{
"type": "record",
"name": "StockQuote",
"namespace": "Shocktrade.avro",
"fields":[
{ "name": "symbol", "type":"string", "doc":"stock symbol" },
{ "name": "lastTrade", "type":["null", "double"], "doc":"last sale price", "default":null },
{ "name": "tradeDate", "type":["null", "long"], "doc":"last sale date", "default":null },
{ "name": "tradeTime", "type":["null", "string"], "doc":"last sale time", "default":null },
{ "name": "ask", "type":["null", "double"], "doc":"ask price", "default":null },
{ "name": "bid", "type":["null", "double"], "doc":"bid price", "default":null },
{ "name": "change", "type":["null", "double"], "doc":"price change", "default":null },
{ "name": "changePct", "type":["null", "double"], "doc":"price change percent", "default":null },
{ "name": "prevClose", "type":["null", "double"], "doc":"previous close price", "default":null },
{ "name": "open", "type":["null", "double"], "doc":"open price", "default":null },
{ "name": "close", "type":["null", "double"], "doc":"close price", "default":null },
{ "name": "high", "type":["null", "double"], "doc":"day's high price", "default":null },
{ "name": "low", "type":["null", "double"], "doc":"day's low price", "default":null },
{ "name": "volume", "type":["null", "long"], "doc":"day's volume", "default":null },
{ "name": "marketCap", "type":["null", "double"], "doc":"market capitalization", "default":null },
{ "name": "errorMessage", "type":["null", "string"], "doc":"error message", "default":null }
],
"doc": "A schema for stock quotes"
}
Let's retrieve the first message from topic Shocktrade.quotes.avro (partition 0) using an Avro schema as our optional message decoder:
kafka:/> kfirst Shocktrade.quotes.avro 0 -a file:avro/quotes.avsc
{
"symbol":"GES",
"exchange":"NYSE",
"lastTrade":21.75,
"tradeDate":null,
"tradeTime":"4:03pm",
"ask":null,
"bid":null,
"change":-0.41,
"changePct":-1.85,
"prevClose":22.16,
"open":21.95,
"close":21.75,
"high":22.16,
"low":21.69,
"volume":650298,
"marketCap":1.853E9,
"errorMessage":null
}
Let's view the cursor:
kafka:shocktrade.quotes.avro/2:9728> kcursor
+ ---------------------------------------------------------------------------------- +
| topic partition offset nextOffset decoder |
+ ---------------------------------------------------------------------------------- +
| Shocktrade.quotes.avro 0 0 1 AvroDecoder(schema) |
+ ---------------------------------------------------------------------------------- +
The kfirst
, klast
, kprev
and knext
commands also work with the Avro integration:
kafka:shocktrade.quotes.avro/2:9728> knext
{
"symbol":"GEO",
"exchange":"NYSE",
"lastTrade":41.4,
"tradeDate":null,
"tradeTime":"4:03pm",
"ask":null,
"bid":null,
"change":0.76,
"changePct":1.87,
"prevClose":40.64,
"open":40.55,
"close":41.4,
"high":41.49,
"low":40.01,
"volume":896757,
"marketCap":2.973E9,
"errorMessage":null
}
Since Avro is based on JSON, we can also express the same data in Avro compatible JSON format:
kafka:shocktrade.quotes.avro/2:9728> kget -f avro_json
{
"symbol":"GEO",
"exchange":{
"string":"NYSE"
},
"lastTrade":{
"double":41.4
},
"tradeDate":null,
"tradeTime":{
"string":"4:03pm"
},
"ask":null,
"bid":null,
"change":{
"double":0.76
},
"changePct":{
"double":1.87
},
"prevClose":{
"double":40.64
},
"open":{
"double":40.55
},
"close":{
"double":41.4
},
"high":{
"double":41.49
},
"low":{
"double":40.01
},
"volume":{
"long":896757
},
"marketCap":{
"double":2.973E9
},
"errorMessage":null
}
Default Avro Decoders
You can also associate a Kafka topic to a "default" Avro schema. To associate an Avro schema to the topic, place the
schema file in a subdirectory with the same name as the topic. For example, if I wanted to associate an Avro file named
quotes.avsc
to the Kafka topic shocktrade.quotes.avro
, I'd setup the following file structure:
$HOME/.trifecta/decoders/Shocktrade.quotes.avro/quotes.avsc
The name of the actual schema file can be anything you'd like. Once the file has been placed in the appropriate location, restart Trifecta, and default decoder will be ready for use.
In our last example, we did the following:
kafka:/> kfirst Shocktrade.quotes.avro 0 -a file:avro/quotes.avsc
With a default decoder, we can do this instead:
kafka:/> kfirst Shocktrade.quotes.avro 0 -a default
Kafka Search by Key
You can view the key for any message by using the kgetkey
command. Let's start by retrieving the last available message
for a topic/partition.
kafka:/> klast Shocktrade.quotes.csv 0
[10796:000] 22.4e.4f.53.50.46.22.2c.30.2e.30.30.2c.22.4e.2f.41.22.2c.22.4e.2f.41.22.2c | "NOSPF",0.00,"N/A","N/A", |
[10796:025] 4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.22.4e.2f.41.20.2d.20.4e.2f.41.22.2c.4e | N/A,N/A,N/A,"N/A - N/A",N |
[10796:050] 2f.41.2c.4e.2f.41.2c.30.2e.30.30.2c.4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.4e | /A,N/A,0.00,N/A,N/A,N/A,N |
[10796:075] 2f.41.2c.22.54.69.63.6b.65.72.20.73.79.6d.62.6f.6c.20.68.61.73.20.63.68.61 | /A,"Ticker symbol has cha |
[10796:100] 6e.67.65.64.20.74.6f.3a.20.3c.61.20.68.72.65.66.3d.22.2f.71.3f.73.3d.4e.4f | nged to: <a href="/q?s=NO |
[10796:125] 53.50.46.22.3e.4e.4f.53.50.46.3c.2f.61.3e.22 | SPF">NOSPF</a>" |
Now let's view the key for message using the kgetkey
command:
kafka:Shocktrade.quotes.csv/0:10796> kgetkey
[000] 31.34.31.30.35.36.33.37.31.34.34.36.35 | 1410563714465 |
To ensure you'll notice the change in the cursor's position, let's reposition the cursor to the beginning of the topic/partition:
kafka:Shocktrade.quotes.csv/0:10796> kfirst
[5945:000] 22.47.44.46.22.2c.31.30.2e.38.31.2c.22.39.2f.31.32.2f.32.30.31.34.22.2c.22 | "GDF",10.81,"9/12/2014"," |
[5945:025] 34.3a.30.30.70.6d.22.2c.4e.2f.41.2c.4e.2f.41.2c.2d.30.2e.31.30.2c.22.2d.30 | 4:00pm",N/A,N/A,-0.10,"-0 |
[5945:050] 2e.31.30.20.2d.20.2d.30.2e.39.32.25.22.2c.31.30.2e.39.31.2c.31.30.2e.39.31 | .10 - -0.92%",10.91,10.91 |
[5945:075] 2c.31.30.2e.38.31.2c.31.30.2e.39.31.2c.31.30.2e.38.30.2c.33.36.35.35.38.2c | ,10.81,10.91,10.80,36558, |
[5945:100] 4e.2f.41.2c.22.4e.2f.41.22 | N/A,"N/A" |
kafka:Shocktrade.quotes.csv/0:5945> _
Next, using the kfindone
command let's search for the last message by key (using hexadecimal dot-notation):
kafka:Shocktrade.quotes.csv/0:5945> kfindone key is 31.34.31.30.35.36.33.37.31.34.34.36.35
Task is now running in the background (use 'jobs' to view)
You may have received the message Task is now running in the background (use 'jobs' to view). This happens whenever a
query is executed that requires more than 5 seconds to complete. You may use the jobs
command to check the status of a
background task; however, either way, after a few moments the results should appear on the console:
kafka:Shocktrade.quotes.csv/0:5945> Job #1006 completed
[10796:000] 22.4e.4f.53.50.46.22.2c.30.2e.30.30.2c.22.4e.2f.41.22.2c.22.4e.2f.41.22.2c | "NOSPF",0.00,"N/A","N/A", |
[10796:025] 4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.22.4e.2f.41.20.2d.20.4e.2f.41.22.2c.4e | N/A,N/A,N/A,"N/A - N/A",N |
[10796:050] 2f.41.2c.4e.2f.41.2c.30.2e.30.30.2c.4e.2f.41.2c.4e.2f.41.2c.4e.2f.41.2c.4e | /A,N/A,0.00,N/A,N/A,N/A,N |
[10796:075] 2f.41.2c.22.54.69.63.6b.65.72.20.73.79.6d.62.6f.6c.20.68.61.73.20.63.68.61 | /A,"Ticker symbol has cha |
[10796:100] 6e.67.65.64.20.74.6f.3a.20.3c.61.20.68.72.65.66.3d.22.2f.71.3f.73.3d.4e.4f | nged to: <a href="/q?s=NO |
[10796:125] 53.50.46.22.3e.4e.4f.53.50.46.3c.2f.61.3e.22 | SPF">NOSPF</a>" |
The result is the last message of the topic. Notice that our cursor has changed reflecting the move from offset 5945 to 10796.
kafka:Shocktrade.quotes.csv/0:10796> _
Kafka Advanced Search
Building on the Avro Integration, Trifecta offers the ability to execute queries against structured data.
Suppose you want to know how many messages contain a volume greater than 1,000,000, you could issue the kcount
command:
kafka:Shocktrade.quotes.avro/0:1> kcount volume > 1000000
1350
The response was 1350, meaning there are 1350 messages containing a volume greater than 1,000,000.
Suppose you want to find a message for Apple (ticker: "AAPL"), you could issue the kfindone
command:
kafka:Shocktrade.quotes.avro/0:1> kfindone symbol == "AAPL"
{
"symbol":"AAPL",
"exchange":"NASDAQNM",
"lastTrade":109.01,
"tradeDate":null,
"tradeTime":"4:00pm",
"ask":109.03,
"bid":109.01,
"change":0.31,
"changePct":0.29,
"prevClose":108.7,
"open":108.72,
"close":109.01,
"high":109.32,
"low":108.55,
"volume":33691536,
"marketCap":6.393E11,
"errorMessage":null
}
You can also specify complex queries by combining multiple expressions with the and
keyword:
kafka:Shocktrade.quotes.avro/1234:4> kfindone lastTrade < 1 and volume > 1000000 -a file:avro/quotes.avsc
{
"symbol":"MDTV",
"exchange":"OTHER OTC",
"lastTrade":0.022,
"tradeDate":null,
"tradeTime":"3:27pm",
"ask":null,
"bid":null,
"change":0.0099,
"changePct":81.82,
"prevClose":0.0121,
"open":0.015,
"close":0.022,
"high":0.027,
"low":0.015,
"volume":1060005,
"marketCap":125000.0,
"errorMessage":null
}
Now suppose you want to copy the messages having high volume (1,000,000 or more) to another topic:
kafka:Shocktrade.quotes.avro/3300:3> kfind volume >= 1000000 -o topic:hft.Shocktrade.quotes.avro
Finally, let's look at the results:
kafka:Shocktrade.quotes.avro/3:0> kstats hft.Shocktrade.quotes.avro
+ ---------------------------------------------------------------------------------- +
| topic partition startOffset endOffset messagesAvailable |
+ ---------------------------------------------------------------------------------- +
| hft.Shocktrade.quotes.avro 0 0 283 283 |
| hft.Shocktrade.quotes.avro 1 0 287 287 |
| hft.Shocktrade.quotes.avro 2 0 258 258 |
| hft.Shocktrade.quotes.avro 3 0 245 245 |
| hft.Shocktrade.quotes.avro 4 0 272 272 |
+ ---------------------------------------------------------------------------------- +
Let's see how these statistics compares to the original:
kafka:Shocktrade.quotes.avro/3:0> kstats Shocktrade.quotes.avro
+ ----------------------------------------------------------------------------------- +
| topic partition startOffset endOffset messagesAvailable |
+ ----------------------------------------------------------------------------------- +
| Shocktrade.quotes.avro 0 0 4539 4539 |
| Shocktrade.quotes.avro 1 0 4713 4713 |
| Shocktrade.quotes.avro 2 0 4500 4500 |
| Shocktrade.quotes.avro 3 0 4670 4670 |
| Shocktrade.quotes.avro 4 0 4431 4431 |
+ ----------------------------------------------------------------------------------- +
Searching By Query
Trifecta provides the ability to perform SQL-like queries against Avro-encoded Kafka topics (Read more about Trifecta's Kafka-Avro integration here). The syntax is very similar to SQL except for a few minor differences. Here's the basic syntax:
select <fieldsToDisplay>
from <topic> with <decoder>
where <searchCriteria>
limit <maximumNumberOfResultsToReturn>
Consider the following example:
kafka:shocktrade.quotes.avro/0:32050> select symbol, exchange, open, close, high, low
from "topic:shocktrade.quotes.avro"
with "avro:file:avro/quotes.avsc"
where symbol == "AAPL"
NOTE: Because we didn't specify a limit for the number of results that could be returned, the default value (25) is used.
As with most potentially long-running statements in Trifecta, if the query takes longer than a few seconds to complete, it will be executed in the background.
kafka:shocktrade.quotes.avro/0:32050> Job #607 completed (use 'jobs -v 607' to view results)
+ --------------------------------------------------------------------- +
| partition offset symbol exchange open close high low |
+ --------------------------------------------------------------------- +
| 0 32946 AAPL NASDAQNM 108.72 109.01 109.32 108.55 |
+ --------------------------------------------------------------------- +
NOTE: Although the partition
and offset
fields weren't specified in the query, they are always included in
the query results.
Let's look at another example:
kafka:shocktrade.quotes.avro/0:32050> select symbol, exchange, lastTrade, open, close, high, low
from "topic:shocktrade.quotes.avro"
with "avro:file:avro/quotes.avsc"
where lastTrade <= 1 and volume >= 1,000,000
limit 25
Task is now running in the background (use 'jobs' to view)
kafka:shocktrade.quotes.avro/0:32050> Job #873 completed (use 'jobs -v 873' to view results)
+ --------------------------------------------------------------------------------- +
| partition offset symbol exchange lastTrade open close high low |
+ --------------------------------------------------------------------------------- +
| 3 34047 NIHDQ OTHER OTC 0.0509 0.0452 0.0509 0.0549 0.0452 |
| 3 33853 IMRS NASDAQNM 0.2768 0.25 0.2768 0.28 0.2138 |
| 3 33818 VTMB OTHER OTC 0.0014 0.0013 0.0014 0.0014 0.0012 |
| 3 33780 MLHC OTHER OTC 4.0E-4 5.0E-4 4.0E-4 5.0E-4 3.0E-4 |
| 3 33709 ECDC OTHER OTC 1.0E-4 1.0E-4 1.0E-4 1.0E-4 1.0E-4 |
| 3 33640 PWDY OTC BB 0.0037 0.0032 0.0037 0.0043 0.003 |
| 3 33534 BPZ NYSE 0.9599 1.02 0.9599 1.0201 0.92 |
| 3 33520 TAGG OTHER OTC 2.0E-4 1.0E-4 2.0E-4 2.0E-4 1.0E-4 |
| 3 33515 MDMN OTHER OTC 0.055 0.051 0.055 0.059 0.051 |
| 3 33469 MCET OTHER OTC 5.0E-4 5.0E-4 5.0E-4 5.0E-4 5.0E-4 |
| 3 33460 GGSM OTHER OTC 3.0E-4 3.0E-4 3.0E-4 4.0E-4 3.0E-4 |
| 3 33404 TDCP OTHER OTC 0.0041 0.0041 0.0041 0.0041 0.0038 |
| 3 33337 GSS AMEX 0.305 0.27 0.305 0.31 0.266 |
| 3 33254 MDTV OTHER OTC 0.022 0.015 0.022 0.027 0.015 |
| 2 33246 AMRN NGM 0.9 0.9128 0.9 0.93 0.88 |
| 2 33110 TRTC OTHER OTC 0.38 0.3827 0.38 0.405 0.373 |
| 2 33068 AEMD OTHER OTC 0.2419 0.26 0.2419 0.2625 0.23 |
| 2 33060 ZBB AMEX 0.6101 0.65 0.6101 0.65 0.55 |
| 2 33058 TUNG OTHER OTC 0.0019 0.0021 0.0019 0.0021 0.0016 |
| 2 33011 DKAM OTHER OTC 1.0E-4 2.0E-4 1.0E-4 2.0E-4 1.0E-4 |
| 2 32984 ADMD OTHER OTC 0.001 0.001 0.001 0.0011 9.0E-4 |
| 2 32905 GLDG OTHER OTC 2.0E-4 2.0E-4 2.0E-4 2.0E-4 2.0E-4 |
| 2 32808 RBY AMEX 0.9349 0.821 0.9349 0.94 0.821 |
| 2 32751 PZG AMEX 0.708 0.6 0.708 0.73 0.5834 |
| 2 32731 PAL AMEX 0.15 0.15 0.15 0.1555 0.145 |
+ --------------------------------------------------------------------------------- +
Zookeeper Module
Zookeeper: Navigating directories and keys
To view the Zookeeper keys at the current hierarchy level:
zookeeper@dev501:2181:/> zls
consumers
storm
controller_epoch
admin
controller
brokers
To change the current Zookeeper hierarchy level:
zookeeper:localhost:2181:/> zcd brokers
/brokers
Now view the keys at this level:
zookeeper:localhost:2181:/brokers> zls
topics
ids
Let's look at the entire Zookeeper hierarchy recursively from our current path:
zookeeper:localhost:2181/brokers> ztree
/brokers
/brokers/topics
/brokers/topics/shocktrade.quotes.avro
/brokers/topics/shocktrade.quotes.avro/partitions
/brokers/topics/shocktrade.quotes.avro/partitions/0
/brokers/topics/shocktrade.quotes.avro/partitions/0/state
/brokers/topics/shocktrade.quotes.avro/partitions/1
/brokers/topics/shocktrade.quotes.avro/partitions/1/state
/brokers/topics/shocktrade.quotes.avro/partitions/2
/brokers/topics/shocktrade.quotes.avro/partitions/2/state
/brokers/topics/shocktrade.quotes.avro/partitions/3
/brokers/topics/shocktrade.quotes.avro/partitions/3/state
/brokers/topics/shocktrade.quotes.avro/partitions/4
/brokers/topics/shocktrade.quotes.avro/partitions/4/state
/brokers/ids
/brokers/ids/3
/brokers/ids/2
/brokers/ids/1
/brokers/ids/6
/brokers/ids/5
/brokers/ids/4
Zookeeper: Getting and setting key-value pairs
Let's view the contents of one of the keys:
zookeeper:localhost:2181/brokers> zget topics/Shocktrade.quotes.csv/partitions/4/state
[00] 7b.22.63.6f.6e.74.72.6f.6c.6c.65.72.5f.65.70.6f.63.68.22.3a.31.2c.22.6c.65 | {"controller_epoch":1,"le
[25] 61.64.65.72.22.3a.35.2c.22.76.65.72.73.69.6f.6e.22.3a.31.2c.22.6c.65.61.64 | ader":5,"version":1,"lead
[50] 65.72.5f.65.70.6f.63.68.22.3a.30.2c.22.69.73.72.22.3a.5b.35.5d.7d | er_epoch":0,"isr":[5]}
Since we now know the contents of the key is text-based (JSON in this case), let's look at the JSON value using
the format flag (-f json
) to transform the JSON in a nice human-readable format.
zookeeper:localhost:2181/brokers> zget topics/Shocktrade.quotes.csv/partitions/4/state -f json
{
"controller_epoch": 3,
"leader": 6,
"version": 1,
"leader_epoch": 2,
"isr": [6]
}
Next, let's set a key-value pair in Zookeeper:
zookeeper:localhost:2181/> zput /test/message "Hello World"
To retrieve the value we've just set, we can use the zget
command again:
zookeeper:localhost:2181/> zget /test/message
[00] 48.65.6c.6c.6f.20.57.6f.72.6c.64 | Hello World
The zput
command also allows us to set other types of values besides strings. The following example demonstrates
setting a binary array literal using hexadecimal dot-notation.
zookeeper:localhost:2181/> zput /test/data de.ad.be.ef.ca.fe.ba.be
The default value types for integer and decimal values are long
and double
respectively. However, you can also
explicitly set the value type using the type flag (-t
).
zookeeper:localhost:2181/> zput /test/data2 123.45 -t float
The valid value types are:
- bytes
- char
- double
- float
- integer
- json
- long
- short
- string
To verify that all of the key-value pairs were inserted we use the zls
command again:
zookeeper:localhost:2181/> zls /test
data
message
data2
To view all of the Zookeeper commands, use the -m
switch and the module name (zookeeper
in this case):
zookeeper:localhost:2181/> ? -m zookeeper
+ ----------------------------------------------------------------------------------------- +
| command module description |
+ ----------------------------------------------------------------------------------------- +
| zcd zookeeper Changes the current path/directory in ZooKeeper |
| zexists zookeeper Verifies the existence of a ZooKeeper key |
| zget zookeeper Retrieves the contents of a specific Zookeeper key |
| zls zookeeper Retrieves the child nodes for a key from ZooKeeper |
| zmk zookeeper Creates a new ZooKeeper sub-directory (key) |
| zput zookeeper Sets a key-value pair in ZooKeeper |
| zreconnect zookeeper Re-establishes the connection to Zookeeper |
| zrm zookeeper Removes a key-value from ZooKeeper (DESTRUCTIVE) |
| zruok zookeeper Checks the status of a Zookeeper instance (requires netcat) |
| zstats zookeeper Returns the statistics of a Zookeeper instance (requires netcat) |
| ztree zookeeper Retrieves Zookeeper directory structure |
+ ----------------------------------------------------------------------------------------- +
Cassandra Module
Trifecta has basic built-in support for querying and inspecting the state of Apache Cassandra clusters. To establish a
connection to a Cassandra cluster, use the cqconnect
command:
core:/home/ldaniels> cqconnect dev528
From this point, you need to select a keyspace to execute queries against. To do this, use the keyspace
command:
cql:cluster1:/> keyspace shocktrade
Now that a keyspace has been selected, you can execute queries against the column families (tables) found within the keyspace.
cql:cluster1:shocktrade> cql "select name, symbol, exchange, lastTrade, volume from stocks where symbol = 'AAPL' limit 5"
+ --------------------------------------------------- +
| name lasttrade exchange symbol volume |
+ --------------------------------------------------- +
| Apple Inc. 557.36 NASDAQ AAPL 14067517 |
+ --------------------------------------------------- +
You can view basic cluster information by issuing the clusterinfo
command:
cql:cluster1:shocktrade> clusterinfo
+ -------------------------------------------------------------------- +
| name value |
+ -------------------------------------------------------------------- +
| Cluster Name cluster1 |
| Partitioner org.apache.cassandra.dht.Murmur3Partitioner |
| Consistency Level ONE |
| Fetch Size 5000 |
| JMX Reporting Enabled true |
+ -------------------------------------------------------------------- +
You can describe keyspaces and tables with the describe
command:
cql:cluster1:shocktrade> describe keyspace shocktrade
CREATE KEYSPACE shocktrade WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.SimpleStrategy',
'replication_factor': '3' } AND DURABLE_WRITES = true;
You can retrieve the list of all the commands that Cassandra Module offers with the help (?) command.
? -m cassandra
Elastic Search Module
To establish a connection to a local/remote Elastic Search peer, use the econnect
command:
core:/home/ldaniels> econnect dev501:9200
+ ----------------------------------- +
| name value |
+ ----------------------------------- +
| Cluster Name ShockTrade |
| Status green |
| Timed Out false |
| Number of Nodes 5 |
| Number of Data Nodes 5 |
| Active Shards 10 |
| Active Primary Shards 5 |
| Initializing Shards 0 |
| Relocating Shards 0 |
| Unassigned Shards 0 |
+ ----------------------------------- +
Once connected, the server statistics above will be returned.
To create a document, use the eput
command:
elasticSearch:localhost:9200/> eput /quotes/quote/AMD { "symbol":"AMD", "lastSale":3.33 }
+ --------------------------------------- +
| created _index _type _id _version |
+ --------------------------------------- +
| true quotes quote AMD 3 |
+ --------------------------------------- +
To retrieve the document we've just created, use the eget
command:
elasticSearch:localhost:9200/quotes/quote/AMD> eget /quotes/quote/AMD
{
"symbol":"AMD",
"lastSale":3.55
}
Elastic Search: Avro to JSON
Now let's do something slightly more advanced. Let's use Trifecta's powerful search and copy features to copy a message from a Kafka Topic to create (or update) an Elastic Search document. NOTE: Some setup steps have been omitted for brevity. See Kafka Advanced Search for full details.
elasticSearch:localhost:9200/quotes/quote/AMD> kfindone symbol == "AAPL" -o es:/quotes/quote/AAPL
{
"symbol":"AAPL",
"exchange":"NASDAQNM",
"lastTrade":109.01,
"tradeDate":null,
"tradeTime":"4:00pm",
"ask":109.03,
"bid":109.01,
"change":0.31,
"changePct":0.29,
"prevClose":108.7,
"open":108.72,
"close":109.01,
"high":109.32,
"low":108.55,
"volume":33691536,
"marketCap":6.393E11,
"errorMessage":null
}
Finally, let's view the document we've created:
kafka:shocktrade.quotes.avro/4:5429> eget /quotes/quote/AAPL
{
"symbol":"AAPL",
"exchange":"NASDAQNM",
"lastTrade":109.01,
"tradeDate":null,
"tradeTime":"4:00pm",
"ask":109.03,
"bid":109.01,
"change":0.31,
"changePct":0.29,
"prevClose":108.7,
"open":108.72,
"close":109.01,
"high":109.32,
"low":108.55,
"volume":33691536,
"marketCap":6.393E11,
"errorMessage":null
}
Please note we could also use any of the Kafka message retrieve commands (kget
, kfirst
, knext
, kprev
and klast
)
to copy a Kafka message as an Elastic Search document. See the following example:
kafka:shocktrade.quotes.avro/4:5429> kget -o es:/quotes/quote/AAPL
Sometimes we need to copy a set of messages, and copying them one-by-one can be tedious and time-consuming; however, there
is a command for just this sort of use-case. The copy
command can be used to copy messages from any Avro- or JSON-capable
input source (e.g. Kafka or Elastic Search) to any Avro- or JSON-capable output source. Consider the following example.
Here we're copying messages from a Kafka topic to an Elastic Search index using the symbol field of the message as the
ID for the Elastic Search document.
es:localhost:9200/> copy -i topic:shocktrade.quotes.avro -o es:/quotes/quote/$symbol -a file:avro/quotes.avsc
Once the operation has completed, the copy statistics are displayed:
es:localhost:9200/> Job #1845 completed (use 'jobs -v 1845' to view results)
+ -------------------------------------------------- +
| runTimeSecs records failures recordsPerSecond |
+ -------------------------------------------------- +
| 27.0 3367 0 143.3 |
+ -------------------------------------------------- +
MongoDB Module
Let's start by connecting to a MongoDB instance:
mongo:mongodb$> mconnect dev601
Next, let's choose a database to work in:
mongo:mongodb$> use shocktrade
Finally, let's retrieve a document. In this example, we'll retrieve a stock quote for Apple Inc. (ticker: AAPL)
mongo:mongodb$> mget Stocks { "symbol" : "AAPL" }
{
"_id":{
"$oid":"51002b2d84aebf0342cfb659"
},
"EBITDA":5.913E10,
"active":true,
"ask":98.77,
"askSize":null,
"assetClass":"Equity",
"assetType":"Common Stock",
"avgVolume":null,
"avgVolume10Day":59306900,
"avgVolume3Month":54995300,
"baseSymbol":null,
"beta":1.03,
"bid":98.76,
"bidSize":null,
"bookValuePerShare":20.19,
"businessSummary":"\n Apple Inc. designs, manufactures, and markets personal computers and related personal computing and mobile communication devices along with a variety of related software, services, peripherals, and networking solutions. The Company sells its products worldwide through its online stores, its retail stores, its direct sales force, third-party wholesalers, and resellers.\n ",
"change":-0.42,
"change52Week":44.37,
"change52WeekHigh":null,
"change52WeekLow":null,
"change52WeekSNP500":16.41,
"changePct":-0.42,
"cikNumber":320193,
"close":98.76
}
To view all of the MongoDB commands, use the -m
switch and the module name (mongodb
in this case):
mongo:mongodb$> ? -m mongodb
+ ------------------------------------------------------------------ +
| command module description |
+ ------------------------------------------------------------------ +
| mconnect mongodb Establishes a connection to a MongoDB cluster |
| mfindone mongodb Retrieves a document from MongoDB |
| mget mongodb Retrieves a document from MongoDB |
| mput mongodb Inserts a document into MongoDB |
| use mongodb Sets the current MongoDB database |
+ ------------------------------------------------------------------ +
Storm Module
Let's view the currently running topologies:
storm:localhost> sls
+ ---------------------------------------------------------------------------------------------------------------------------------------------- +
| name topologyId status workers executors tasks uptimeSecs |
+ ---------------------------------------------------------------------------------------------------------------------------------------------- +
| AvroSummaryMetricCounterTopology AvroSummaryMetricCounterTopology-42-1410749701 ACTIVE 4 48 48 93153 |
| Hydra-Listener-Traffic-Rates Hydra-Listener-Traffic-Rates-3-1409671097 ACTIVE 4 30 30 1171757 |
| TopTalkersTopologyV5 TopTalkersTopologyV5-44-1410803756 ACTIVE 4 53 53 39098 |
| NetworkMonitoringTrafficRateAggregation NetworkMonitoringTrafficRateAggregation-2-1409671007 ACTIVE 4 22 22 1171847 |
+ ---------------------------------------------------------------------------------------------------------------------------------------------- +
Next, let's look at the details of one of the topologies by ID:
storm:localhost> sget TopTalkersTopologyV5-44-1410803756
+ --------------------------------------------------- +
| topologyId bolts spouts |
+ --------------------------------------------------- +
| TopTalkersTopologyV5-44-1410803756 7 1 |
+ --------------------------------------------------- +
Let's look at the Topology's bolts:
storm:localhost> sbolts TopTalkersTopologyV5-44-1410803756
+ ------------------------------------------------------------------------------------------------------------------------------------------- +
| topologyId name parallelism input groupingFields groupingType tickTupleFreq |
+ ------------------------------------------------------------------------------------------------------------------------------------------- +
| TopTalkersTopologyV5-44-1410803756 decoderBolt 10 kafkaSpout Local/Shuffle |
| TopTalkersTopologyV5-44-1410803756 kafkaSinkVipOnlyBolt 1 summaryByVipOnlyBolt Local/Shuffle |
| TopTalkersTopologyV5-44-1410803756 kafkaSinkVipSiteBolt 1 summaryByVipSiteBolt Local/Shuffle |
| TopTalkersTopologyV5-44-1410803756 summaryByVipOnlyBolt 20 decoderBolt vip Fields 60 |
| TopTalkersTopologyV5-44-1410803756 summaryByVipSiteBolt 20 decoderBolt vip, site Fields 60 |
+ ------------------------------------------------------------------------------------------------------------------------------------------- +
Let's look at the Topology's spouts:
storm:localhost> spouts TopTalkersTopologyV5-44-1410803756
+ ------------------------------------------------------------- +
| topologyId name parallelism |
+ ------------------------------------------------------------- +
| TopTalkersTopologyV5-44-1410803756 kafkaSpout 1 |
+ ------------------------------------------------------------- +
Finally, let's take a look at the connection properties for this session:
storm:localhost> sconf
+ ---------------------------------------------------------------------------------------------------------- +
| key value |
+ ---------------------------------------------------------------------------------------------------------- +
| nimbus.childopts -Xmx1024m |
| nimbus.host localhost |
| nimbus.inbox.jar.expiration.secs 3600 |
. .
. .
. .
| worker.heartbeat.frequency.secs 1 |
| zmq.hwm 0 |
| zmq.linger.millis 5000 |
| zmq.threads 1 |
+ ---------------------------------------------------------------------------------------------------------- +
To view all of the Storm commands, use the -m
switch and the module name (storm
in this case):
storm:localhost> ? -m storm
+ -------------------------------------------------------------------------------------- +
| command module description |
+ -------------------------------------------------------------------------------------- +
| sbolts storm Retrieves the list of bolts for s given topology by ID |
| sconf storm Lists, retrieves or sets the configuration keys |
| sconnect storm Establishes (or re-establishes) a connect to the Storm Nimbus Host |
| sget storm Retrieves the information for a topology |
| skill storm Kills a running topology |
| sls storm Lists available topologies |
| spouts storm Retrieves the list of spouts for a given topology by ID |
+ -------------------------------------------------------------------------------------- +