IoT on AWS: Machine Learning Models and Dashboards from Sensor Data
I developed my first IoT project using my notebook as an IoT device and AWS IoT as infrastructure, with this "simple" idea: collect CPU Temperature from my Notebook running on Ubuntu, send to Amazon AWS IoT, save data, make it available for Machine Learning models and dashboards.
By Rubens Zimbres, Data Scientist
Google Colab has open source projects that help Data Scientists everywhere. Inspired in this mindset, I developed my first IoT project using my notebook as an IoT device and AWS IoT as infrastructure.
So, I had a "simple" idea: collect CPU Temperature from my Notebook running on Ubuntu, send to Amazon AWS IoT, save data, make it available for Machine Learning models and dashboards.
However, the operationalization of this idea is quite complex: first, develop a Python notebook that runs Ubuntu command line internally ('sensors'), collecting CPU temperature and is able to connect to AWS IoT via proper security protocols using MQTT. Without using a MQTT broker like Mosquitto.
It is necessary to create a Thing at AWS IoT, get the Certificates, create and attach the Policy and create a SQL Rule to send data (JSON) to Cloud Watch and Dynamo DB. Then, create a Data Pipeline from Dynamo DB to S3, so that the data become available for a Machine Learning model and also to AWS Quick Sight dashboard.
Let's get started by installing 'sensors' in Ubuntu 16.04 and 'AWSIoTPythonSDK' library in Anaconda 3:
$ sudo apt-get install lm-sensors $ sudo service kmod start
Let’s see what the ‘sensors’ command look like:
Now, install AWSIoTPythonSDK library:
$ pip install AWSIoTPythonSDK
Let's start with the Python notebook: the following function was developed to collect CPU Temperature with a delay of 5 seconds:
import subprocess import shlex import time def measure_temp(): temp = subprocess.Popen(shlex.split('sensors -u'), stdout=subprocess.PIPE, bufsize=10, universal_newlines=True) return temp.communicate() while True: string=measure_temp()[0] print(string.split()[8]) time.sleep(5)
Then, we run the notebook from Linux command line:
Good. Now this code is inserted in basicPubSub.py notebook from AWSIoTPythonSDK library like this:
while True: if args.mode == 'both' or args.mode == 'publish': args.message=measure_temp()[0].split()[8] mess={"reported": {"light": "blue", "Temperature": measure_temp()[0].split()[8],"timestamp": time.time() },"timestamp": 1526519248} args.message=mess print(measure_temp()[0].split()[8],(time.time()-start)/60,'min') print(mess,'\n') message = {} message['message'] = args.message message['sequence'] = loopCount messageJson = json.dumps(message) myAWSIoTMQTTClient.publish(topic, messageJson, 1) if args.mode == 'publish': print('Published topic %s: %s\n' % (topic, messageJson)) loopCount += 1 time.sleep(5)
Cool. We have a Python notebook that will connect to AWS IoT Core via MQTT protocol. Now we set up the shadow (JSON file) at AWS IoT, that is similar to the 'device twin' from Microsoft. Note that as I had only one device, I didn’t insert a device ID in the JSON file.
{ "desired": { "light": "green", "Temperature": 55, "timestamp": 1526323886 }, "reported": { "light": "blue", "Temperature": 55, "timestamp": 1526323886 }, "delta": { "light": "green" } }
Now we get the certificates .pem, .key files and rootCA.pem for a safe connection. We type CTRL+ALT+T at Ubuntu and enter the command line and publish to a topic '-t':
$ python basicPubSub_adapted.py -e 1212345.iot.us-east-1.amazonaws.com -r rootCA.pem -c 2212345-certificate.pem.crt -k 2212345-private.pem.key -id arn:aws:iot:us-east-1:11231112345:thing/CPUUbuntu -t 'Teste'
We will receive the feedback from AWS IoT connection in the Linux shell, and check in AWS IoT monitoring tool (after 1 minute) if connections were successful:
It is also possible to see if the messages are being published (orange area) and also the protocol used for the connection (on the left):
Also, we see that the 'shadow' is also being updated (center):
Now we create a SQL rule to send data to Cloud Watch and also to Dynamo DB, creating IAM roles, policies and permissions:
Data is then saved in DynamoDB, as a JSON file. Instead of timestamp, you can use MessageID as the Primary Key.
Now we can visualize Cloud dynamics and data transfer in CloudWatch:
Then we create a Data Pipeline from DynamoDB to S3 to be used by QuickSight:
It is also needed to create a JSON file and set up IAM permissions so that Quick Sight can read from S3 bucket:
{ "fileLocations": [ { "URIs": [ "https://s3.amazonaws.com/your-bucket/2018-05-19-19-41-16/12345-c2712345-12345" ] }, { "URIPrefixes": [ "https://s3.amazonaws.com/your-bucket/2018-05-19-19-41-16/12345-c2712345-12345" ] } ], "globalUploadSettings": { "format": "JSON", "delimiter": "\n","textqualifier":"'" } }
Now we have our static plot of CPU Temperature in Quick Sight.
Also, S3 data (.JSON file) is now available for Machine Learning models, like anomaly detection, prediction and classification, making possible to create a pipeline with Sage Maker and Deep Learning libraries = FUN.
This was a very nice way to get in touch with Amazon AWS services, like EC2, IoT, Cloud Watch, DynamoDB, S3, Quick Sight and Lambda. It's definitely not easy to set up everything and their dependencies, but this part of the project costed less than 1 USD. And generated a lot of fun !
This is the flowchart of the first part of the project at AWS:
Project Part 2 – Near Real-Time Dashboard
Now let's develop a second solution, using Streaming Data from AWS IoT that is sent to Kinesis / Firehose and then to AWS ElasticSearch, and finally to Kibana, a near real-time dashboard. You can opt to clean and extract data with Lambda (or not) using AWS IoT as input and AWS Batch as output to connect with Kinesis. Anyway, Kibana is able to interpret your JSON file.
First we must set up another rule for AWS IoT send telemetry to Kinesis Firehose stream delivery:
Then create an Elastic Search domain
Setting up the access to a specific IP:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "es:*", "Resource": "arn:aws:es:us-east-1:12345:domain/domain/*", "Condition": { "IpAddress": { "aws:SourceIp": "178.042.222.33" } } } ] }
Then we create the Stream and Stream delivery with Kinesis Firehose.
Finally, we connect AWS Elasticsearch with Kibana, adjusting at Kibana’s 'Dev Tools':
PUT /data { "mappings": { "doc": { "properties": { "light":{"type":"text"}, "Temperature": {"type": "integer"}, "timestamp": {"type": "integer"} } } } }
Note that Elasticsearch will provide a Kibana endopint. Finally, we have our Near Real-Time Dashboard of CPU Temperature. It’s important to notice that we are almost in a real-time environment. The issue here is that Kibana updates the graphic each 5 seconds (or 15 if you want) but Elasticsearch has a minimum latency of 60 seconds.
We can now visualize our fancy dashboard:
More info and files at my GitHub - Repo 2018 (CPU Temperature – IoT Project): https://github.com/RubensZimbres/Repo-2018
Bio: Rubens Zimbres is a Data Scientist, PhD in Business Administration with emphasis in Artificial Intelligence and Cellular Automata. Currently works in Telecommunications area, developing Machine Learning, Deep Learning models and IoT solutions for the financial sector and agriculture.
Related:
- GANs in TensorFlow from the Command Line: Creating Your First GitHub Project
- Putting the “Science” Back in Data Science
- Machine Learning Applied to Big Data, Explained