Use of CNNs for image processing has gained massive popularity in recent times due to its accuracy and robustness.
Another tool which has has gained enormous acceptance in the robotics community is the all-powerful Robotic Operating System 2 (ROS2). The introduction of ROS2 has made it easier to have multiple robots on the same ROS network and has facilitated the use of small embedded platforms to be a participant in the ROS environment.
I believe that the marriage of these two popular tools was imminent for our product as we at Nymble strives to make use of the cutting edge technology to give cooking a new life. Our product utilizes the ROS2 network as its backbone and a deep neural network to detect the contents of dispensing boxes during automated cooking or the contents of the pan while manual cooking.
One major hurdle faced was the integration of learning-based image detection into the ROS network on a mobile platform. ROS2 is bleeding-edge and online resources on how the integration is done are scarce. Hence, I decided to draft this post to ease up the process for those who are trying to achieve something similar for their project.
We have used the following setup:
For my project, I created a custom service with image data as request and string with the detection result as a response. The program which sends the image and receives the detection information acts as the ROS client and the image detection program is the service. I would suggest using a similar architecture instead of a publisher/subscriber network to avoid the complexities involved in matching the image and the result. I used python for the image detection part of the code and used the message type std_msg/UInt8MultiArray instead of sensor_msgs/Image for the convenience it offers to fiddle around with individual pixel values and makes it easy to change the orientation of the received image.
The following code should help you get a better picture about how you can set up a service for image detection.
#importing the required functions
from __future__ import absolute_import
from __future__ import division
from __future__ import print_functionimport argparse
import cv2
import numpy as np
import tensorflow as tf
import rclpy
from rclpy.executors import SingleThreadedExecutor#importing the custom service created
from customsrv.srv import ImgStr#creating a class for the service definition
class Service:def __init__(self):
self.node = rclpy.create_node('image_client')
self.srv = self.node.create_service(ImgStr, 'image_detect', self.detect_callback)def detect_callback(self, request, response):
#loading the tensorflow graph
model_file = \
"Tensorflow/Temp_fc/output_graph.pb"
label_file = "Tensorflow/Temp_fc/output_labels.txt"
input_layer = "Placeholder"
output_layer = "final_result"
graph = load_graph(model_file)
print("received data")
global img
for i in range(224):
for j in range(224):
for k in range(3):
img[j,i,k]=request.im.data[k+j*3+i*224*3]
#converting the image into a tensor form
t = read_tensor_from_image_file(img)
input_name = "import/" + input_layer
output_name = "import/" + output_layer
input_operation = graph.get_operation_by_name(input_name)
output_operation = graph.get_operation_by_name(output_name)with tf.Session(graph=graph) as sess:
results = sess.run(output_operation.outputs[0], {
input_operation.outputs[0]: t
})
results = np.squeeze(results)top_k = results.argsort()[-5:][::-1]
labels = load_labels(label_file)
response.veg.data = labels[top_k[0]]
for i in top_k:
print(labels[i], results[i])
return response#defining the load graph function
def load_graph(model_file):
graph = tf.Graph()
graph_def = tf.GraphDef()with open(model_file, "rb") as f:
graph_def.ParseFromString(f.read())
with graph.as_default():
tf.import_graph_def(graph_def)
return graph#defining the function which converts the image data into a tensor
def read_tensor_from_image_file(img):np_image_data = np.asarray(img)
np_image_data= np.divide(np_image_data.astype(float),255)
np_final = np.expand_dims(np_image_data,axis=0)
return np_finaldef load_labels(label_file):
label = []
proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
for l in proto_as_ascii_lines:
label.append(l.rstrip())
return labeldef main(args=None):
rclpy.init(args=args)
print("creating service")#create the service
service = Service()
#spinning the node with a blocking call
print("spinning")
executor = SingleThreadedExecutor()
executor.add_node(service.node)
executor.spin_once(timeout_sec=-1)if __name__ == '__main__':
main()
We at Nymble, are committed to building a team that represents a variety of backgrounds, perspectives, and skills. The more inclusive we are, the better our work will be.
If you are interested in applying learning based methods in robotics, write to us at hello@nymble.in !