In this topic I will be receiving data with IBM InfoSphere Streams using a highly volatile protocol from a sending source. Received data will be stored into a local file. The concept I am using is called pub/sub or the publish subsrcibe pattern. We need a few components to make this possible:
- IBM InfoSphere Streams (stream computing): you can get your copy here (for learning and fun, no production allowed)
- A messaging protocol (used by the client software to either publish or subscribe)
- A broker (for managing the pub/sub data flows). For more information goto this topic
What is stream computing?
Stream computing delivers real-time analytic processing on constantly changing data in motion. It enables descriptive and predictive analytics to support real time decisions. Stream computing allows you to capture and analyze all data – all the time, just in time.
Data is all around us – from social media feeds to call data records to videos – but can be difficult to leverage. Sometimes there is simply too much data to collect and store before analyzing it. Sometimes it’s an issue of timing – by the time you store data, analyze it, and respond – it’s too late.
Stream computing changes where, when and how much data you can analyze. Store less, analyze more, and make better decisions, faster with stream computing. For more information: IBM Stream Computing.
The protocol
MQTT is a machine-to-machine (M2M)/”Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For more information: http://mqtt.org/.
What is happening?
You first have to install IBM InfoSphere Streams (mine is running on CentOS 65). It is pretty straight forward: unpack the installation archive to a directory. Find the “dependency_checker.sh” script (in the StreamsInstallFiles directory) and run it. It will tell you what libraries are needed: so do a few yum installs and your done.
After installation, please adjust the .bashrc file, this is mine:
# .bashrc # Source global definitions if [ -f /etc/bashrc ]; then . /etc/bashrc fi # User specific aliases and functions PATH=$PATH:/opt/ibm/java-x86_64-70/jre/bin export PATH export JAVA_HOME=/opt/ibm/java-x86_64-70/jre/bin export STREAMS_MESSAGING_MQTT_HOME=/opt/ibm/MQTT/SDK source /opt/ibm/InfoSphereStreams/bin/streamsprofile.sh
I do not think you need a seperate Java install. It is included with Streams but for MQTT to make the MQTT sample work you need to add the MQTT client 3.15 to your environment. Please download it via Fix Central. Also have the STREAMS_MESSAGING_MQTT_HOME system variable pointing to the installation directory of the MQTT client. Finally run the streamsprofile.sh each time when you log in (so: the the last line in your .bashrc). Do this for each user that might be doing something with Streams.
Please follow the First Steps guide to finish up the installation and get yourself started with Streams.
After walking through the First Steps guide, you will end up with a project and a SPL application. To make this working I added com.ibm.streams.messaging to my project as a dependency. We will need the MQTT stuff to make this working. In the SPL application I added three operators:
- MQTTSource
- Parse
- FileSink
For the streams between the operators I used single variables:
- MQTTSource -> Parse: blob
- Pars -> FileSink: rstring
For each of the operators, I used the following parameters:
MQTTSource
Parse
FileSink
The overall code will look like this:
namespace my.mqtt.streaming ; use com.ibm.streams.messaging.mqtt::MQTTSource ; composite SimpleStream { graph (stream mqtt_topic) as SimpleMQTTSource = MQTTSource() { param topics : [ "streams" ] ; serverURI : "localhost:1883" ; format : block ; } () as SimpleFileSink = FileSink(SimpleParser as inPort0Alias) { param file : "mqtt.topics" ; format : csv ; quoteStrings : false ; } (stream SimpleParser) as SimpleParse = Parse(mqtt_topic) { param format : csv ; } }
When your SPL application is fine (there are no errors), you can launch it. It will be deployed to your Streams instance (the one you have created with the First Steps guide):
When feeding MQTT messages, you will see the deployed SPL application is working:
Finally: let´s take a look at the local file:
This is a very simple sample: we are receiving three types of MQTT topics with values “RED”, “GREEN” or “BLACK”.