Brain Monitoring with Kafka, OpenTSDB, and Grafana

Interested in using open source software to monitor brain activity, and control your devices? Sure you are! Read this fantastic post for some insight and direction.



Source code details

 
We’ll highlight key pieces of information we had to figure out, in many cases by looking at the Confluent’s source code since they weren’t well-publicized in either Kafka’s or Confluent’s documentation.

Streams application

 
Importing libraries

To use Confluent’s unit testing utility classes, you’ll need to build the a couple of JAR files and install them in your local Maven repository since they are not yet (at the time of this writing) published in any public Maven repository.

In the build.sbt file, the unit testing utility classes are referenced in the “libraryDependencies” section with either “test” and/or “test” classifier “tests” for each relevant library. Your local Maven repository is referenced in the “resolvers” section of the build.sbt file.

Installing the JAR files into your local Maven repository requires cloning a couple of Confluent’s GitHub repositories and running the “mvn install” command, which is handled in the “test_prep.sh” shell script. Note that we’re using a forked version of Confluent’s “examples” GitHub repository because we needed additional Maven configuration settings to build a JAR file of the test utility classes.

Avro class generation

Avro is used to convert the CSV input text and serialize it into a compact binary format. We import sbt-avrohugger in “build.sbt” and “project/plugins.sbt” for auto-generating a Scala case class from the Avro schema definition, which represents an OpenTSDB record. The Avro schema follows OpenTSDB’s data specification—metric name, timestamp, measurement value, and a key-value map of tags (assumed to be strings). The generated class is used in the Streams application during conversion from a CSV line to Avro messages in the “eeg” output Kafka topic, where the schema of the messages are enforced by Confluent’s schema registry.

Unit testing

Note the use of the “EmbeddedSingleNodeKafkaCluster” and “IntegrationTestUtils” classes in the test suite. Those are the utility classes we needed to import from Confluent’s “example” source code repository earlier. For testability, the Streams application’s topology logic is wrapped in the “buildAndStartStreamingTopology” method.

Command line options

After seeing a few examples in its documentation, we found Scopt to be a convenient Scala library for defining command line options. The first block of code in the main Streams application uses Scopt to define the command line parameters.

Different input and output serdes

The default Kafka message key and value are assumed to be Strings, hence the setting of the StreamsConfig key and value serdes (serializers/de-serializers) to the String serde. However, since we want to output Avro messages to the “eeg” topic, we override the outbound serde when the “.to()” method is called.

Avro serde and schema registry

Serializing and de-serializing Avro requires Confluent’s KafkaAvroSerializer and KafkaAvroDeserializer classes. Their constructors require the schema registry client as a parameter, so we instantiate an instance of the CachedSchemaRegistryClient class.

EEG CSV format and downsampling

The data originating from the EEG CSV file includes measurements from each sensor of the headset. Each of those sensor readings are converted into an Avro object representing an OpenTSDB record with the name of the sensor included in the OpenTSDB metric name.

Although there is a CSV column for original event’s timestamp, it’s not a Unix timestamp. So the system processing time (i.e., “System.currentTimeMillis()”) is used for simplicity’s sake.

The very first column of the CSV is the “counter”, which cycles through the numbers from 0 through 128. We downsample by filtering on the counter since the demo Docker cluster setup can’t handle the full volume of the data stream.

Sink connector

 
This section highlights some key points in our sink connector source code. We’ll mention what was needed to define custom configuration properties for connecting to a particular OpenTSDB host as well as settings to the OpenTSDB server itself.

The “taskConfigs” method

When defining a Kafka Connector, each task created by the connector receives a configuration. Even if your connector doesn’t have any task configuration settings, the “taskConfigs” method must return a list that contains at least one element, even if it’s an empty configuration Map class instance, in order for tasks to be instantiated. Otherwise, your connector won’t create any tasks and no data will be written to OpenTSDB.

Defining config settings for OpenTSDB host & port

As you may have inferred from the Docker file, the sink connector settings are in a properties file that are read when the Kafka Connect worker starts. We’ve defined the property keys for the OpenTSDB host and port, plus their default values, in the “OpenTsdbConnectorConfig” class. The default values are overridden in the properties file to match the host and port defined in the main docker-compose.yml configuration file. The property settings are propagated to each Kafka Connect task via the “props” parameter in the overridden “start()” method of the sink task class.

Writing to OpenTSDB

OpenTSDB’s documentation recommends inserting records through its HTTP API. We used the Play framework’s WS API to send HTTP POST requests containing data read from the Avro messages in the “eeg” Kafka topic. Currently, the task class has no error handling of cases when the HTTP request returns an erroneous response or times out.

OpenTSDB configuration changes needed for Connector

There are a couple of configuration settings in the OpenTSDB server itself that we needed to override. Both are set to true.

tsd.storage.fix_duplicates
This needs to be set to “true” because different measurements from the same sensor may have the same timestamp assigned during processing time.

tsd.http.request.enable_chunked
Chunk support should be enabled so that large batches of new data points will be processed if the large HTTP request is broken up into smaller packets.

Kafka Connect Standalone & Distributed Properties

In the Kafka Connect worker’s Docker file, note that the CLASSPATH environment variable must be set in order for the Kafka Connect worker to find the OpenTSDB connector JAR file.
Also, it includes property files for both standalone and distributed modes, but only standalone mode is enabled in the Docker image.

Grafana

Lastly, the visualization settings and connection settings to OpenTSDB are pre-loaded in its Docker container image.

Conclusion

 
We’ve given a whirlwind tour of our brain EEG streaming application, highlighting the usage of Confluent’s unit testing utility classes and many other components in the system. I’d like to thank my SVDS colleagues Matt MollisonMatt Rubashkin, and Ming Tsai, who were my teammates in the Kafka Hackathon.

Bio: Jeff Lam has over a decade of experience in companies of various sizes and industries - from startups to giants like General Electric. He enjoys achieving new heights in data infrastructure performance for customers, and has a range of experience from server-side to mobile app development.

Original. Reposted with permission.

Related: