openhab.conf , conenction to mosquitto
mqtt:mysensor.url=tcp://10.0.1.10:1883 mqtt:mysensor.clientId=MQTT mqtt:mysensor.qos=0 mqtt:mysensor.retain=true mqtt:mysensor.async=false
Perl script
#!/usr/bin/perl -w
#
# MQTT Gateway - A gateway to exchange MySensors messages from an
# serial gateway with an MQTT broker.
#
#
# Tested with MySensors 1.4beta (July 27, 2014) (www.mysensors.org)
# and Mosquitto MQTT Broker (mosquitto.org) on Ubuntu Linux 12.04.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
local $ENV{ANYEVENT_MQTT_DEBUG} = 1;
my $useSerial = 1;
my $serialHandle;
use strict;
use Cwd;
use IO::Socket::INET;
use Net::MQTT::Constants;
#use AnyEvent::IO;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::MQTT;
use AnyEvent::Strict; # check parameters passed to callbacks
#use WebSphere::MQTT::Client;
use Data::Dumper;
use List::Util qw(first);
use Storable;
use v5.14; # Requires support for given/when constructs
use Device::SerialPort;
select(STERR);
$|=1;
select(STDOUT);
$|=1;
#-- Message types
use enum qw { C_PRESENTATION=0 C_SET C_REQ C_INTERNAL C_STREAM };
use constant commandToStr => qw( C_PRESENTATION C_SET C_REQ C_INTERNAL C_STREAM );
#-- Variable types
use enum qw { V_TEMP=0 V_HUM V_LIGHT V_DIMMER V_PRESSURE V_FORECAST V_RAIN
V_RAINRATE V_WIND V_GUST V_DIRECTION V_UV V_WEIGHT V_DISTANCE
V_IMPEDANCE V_ARMED V_TRIPPED V_WATT V_KWH V_SCENE_ON V_SCENE_OFF
V_HEATER V_HEATER_SW V_LIGHT_LEVEL V_VAR1 V_VAR2 V_VAR3 V_VAR4 V_VAR5
V_UP V_DOWN V_STOP V_IR_SEND V_IR_RECEIVE V_FLOW V_VOLUME V_LOCK_STATUS
V_DUST_LEVEL };
use constant variableTypesToStr => qw{ V_TEMP V_HUM V_LIGHT V_DIMMER V_PRESSURE V_FORECAST V_RAIN
V_RAINRATE V_WIND V_GUST V_DIRECTION V_UV V_WEIGHT V_DISTANCE
V_IMPEDANCE V_ARMED V_TRIPPED V_WATT V_KWH V_SCENE_ON V_SCENE_OFF
V_HEATER V_HEATER_SW V_LIGHT_LEVEL V_VAR1 V_VAR2 V_VAR3 V_VAR4 V_VAR5
V_UP V_DOWN V_STOP V_IR_SEND V_IR_RECEIVE V_FLOW V_VOLUME V_LOCK_STATUS
V_DUST_LEVEL };
sub variableTypeToIdx
{
my $var = shift;
return first { (variableTypesToStr)[$_] eq $var } 0 .. scalar(variableTypesToStr);
}
#-- Internal messages
use enum qw { I_BATTERY_LEVEL=0 I_TIME I_VERSION I_ID_REQUEST I_ID_RESPONSE
I_INCLUSION_MODE I_CONFIG I_FIND_PARENT I_FIND_PARENT_RESPONSE
I_LOG_MESSAGE I_CHILDREN I_SKETCH_NAME I_SKETCH_VERSION
I_REBOOT };
use constant internalMessageTypesToStr => qw{ I_BATTERY_LEVEL I_TIME I_VERSION I_ID_REQUEST I_ID_RESPONSE
I_INCLUSION_MODE I_CONFIG I_FIND_PARENT I_FIND_PARENT_RESPONSE
I_LOG_MESSAGE I_CHILDREN I_SKETCH_NAME I_SKETCH_VERSION
I_REBOOT };
#-- Sensor types
use enum qw { S_DOOR=0 S_MOTION S_SMOKE S_LIGHT S_DIMMER S_COVER S_TEMP S_HUM S_BARO S_WIND
S_RAIN S_UV S_WEIGHT S_POWER S_HEATER S_DISTANCE S_LIGHT_LEVEL S_ARDUINO_NODE
S_ARDUINO_REPEATER_NODE S_LOCK S_IR S_WATER S_AIR_QUALITY S_CUSTOM S_DUST
S_SCENE_CONTROLLER };
use constant sensorTypesToStr => qw{ S_DOOR S_MOTION S_SMOKE S_LIGHT S_DIMMER S_COVER S_TEMP S_HUM S_BARO S_WIND
S_RAIN S_UV S_WEIGHT S_POWER S_HEATER S_DISTANCE S_LIGHT_LEVEL S_ARDUINO_NODE
S_ARDUINO_REPEATER_NODE S_LOCK S_IR S_WATER S_AIR_QUALITY S_CUSTOM S_DUST
S_SCENE_CONTROLLER };
#-- Datastream types
use enum qw { ST_FIRMWARE_CONFIG_REQUEST=0 ST_FIRMWARE_CONFIG_RESPONSE ST_FIRMWARE_REQUEST ST_FIRMWARE_RESPONSE
ST_SOUND ST_IMAGE };
use constant datastreamTypesToStr => qw{ ST_FIRMWARE_CONFIG_REQUEST ST_FIRMWARE_CONFIG_RESPONSE ST_FIRMWARE_REQUEST ST_FIRMWARE_RESPONSE
ST_SOUND ST_IMAGE };
#-- Payload types
use enum qw { P_STRING=0 P_BYTE P_INT16 P_UINT16 P_LONG32 P_ULONG32 P_CUSTOM P_FLOAT32 };
use constant payloadTypesToStr => qw{ P_STRING P_BYTE P_INT16 P_UINT16 P_LONG32 P_ULONG32 P_CUSTOM P_FLOAT32 };
use constant topicRoot => 'MyMQTT'; # Include leading slash, omit trailing slash
my $mqttHost = 'localhost'; # Hostname of MQTT broker # mosquitto host
my $mqttPort = 1883; # Port of MQTT broker # mosquitto port
my $mysnsHost = '192.168.1.10'; # IP of MySensors Ethernet gateway # not used
my $mysnsPort = 5003; # Port of MySensors Ethernet gateway # not used
my $retain = 1;
my $qos = MQTT_QOS_AT_LEAST_ONCE;
my $keep_alive_timer = 120;
my $subscriptionStorageFile = "subscriptions.storage";
my $cv;
my $socktx;
my $serialPort = "/dev/ttyACM0"; #arduino serial port
my $serialDevice;
my $nodeFile = "nodes.txt";
my %knownNodes;
my @subscriptions;
sub debug
{
print "##" . join(" ", @_)."\n";
}
sub saveNode
{
my $message = shift;
my $address = $message->{'radioId'};
my $name = $message->{'payload'};
chomp $name;
if (exists($knownNodes{$address}) && $knownNodes{$address} ne $name) {
print "Possible duplicate\n";
}
$knownNodes{$address} = $name;
writeNodeFile();
}
sub writeNodeFile
{
open OUTPUT, ">", $nodeFile or die $!;
foreach my $key (sort keys %knownNodes) {
print OUTPUT $key."=".$knownNodes{$key}."\n";
}
close OUTPUT;
}
sub readNodeFile
{
open INPUT, "<", $nodeFile or return;
while (<INPUT>) {
if (index($_,'=') > -1) {
chomp;
my ($address, $name) = split /=/;
$knownNodes {$address} = $name;
}
}
close INPUT;
}
sub findFreeID
{
readNodeFile();
print "Finding free ID\n";
# my @key_list= keys %knownNodes;
# if ($key_list >254) return 255;
for my $index (1..254) {
if (!exists($knownNodes{$index})) {
print "Found $index\n";
return $index;
}
}
print "Found nothing, returning 255\n";
return 255;
}
sub initialiseSerialPort
{
print "Initialising serial port $serialPort\n";
$serialDevice = tie (*FH, 'Device::SerialPort',$serialPort)
|| die "Can't tie: $!";
$serialDevice->baudrate(115200);
$serialDevice->databits(8);
$serialDevice->parity("none");
$serialDevice->stopbits(1);
$serialDevice->write_settings;
# $serialDevice->write("Trying to write to the port");
my $tEnd = time()+2; # 2 seconds in future
while (time()< $tEnd) { # end latest after 2 seconds
my $c = $serialDevice->lookfor(); # char or nothing
next if $c eq ""; # restart if noting
print $c; # uncomment if you want to see the gremlin
last;
}
while (1) { # and all the rest of the gremlins as they come in one piece
my $c = $serialDevice->lookfor(); # get the next one
last if $c eq ""; # or we're done
print $c; # uncomment if you want to see the gremlin
}
}
sub sendToSerialGateway
{
my $message = shift;
debug ("Wanting to write to serial ports");
# $serialHandle->push_write($message);
$serialDevice->write($message);
debug("Wrote to serial port: ".$message);
}
sub create_handle {
return new AnyEvent::Handle(
fh => $serialDevice->{'HANDLE'},
on_error => sub {
my ($handle, $fatal, $message) = @_;
$handle->destroy;
undef $handle;
$cv->send("$fatal: $message");
},
on_read => sub {
my $handle = shift;
$handle->push_read(line => sub {
my ($handle, $line) = @_;
onSerialRead($line);
});
}
);
}
sub parseMsg
{
my $txt = shift;
chomp $txt;
my @fields = split(/;/,$txt);
my $msgRef = { radioId => $fields[0],
childId => $fields[1],
cmd => $fields[2],
ack => $fields[3],
subType => $fields[4],
payload => $fields[5] };
return $msgRef;
}
sub parseTopicMessage
{
my ($topic, $message) = @_;
if ($topic !~ s/^@{[topicRoot]}\/?//)
{
# Topic root doesn't match
return undef;
}
my @fields = split(/\//,$topic);
my $msgRef = { radioId => int($fields[0]),
childId => int($fields[1]),
cmd => C_SET,
ack => 0, # No way to tell ack from topic
subType => variableTypeToIdx( $fields[2] ),
payload => $message };
# print Dumper($msgRef);
return $msgRef;
}
sub assignID
{
print "ID request received\n";
my $msgRef = shift;
my @fields = ( $msgRef->{'radioId'},
$msgRef->{'childId'},
C_INTERNAL,
0,
I_ID_RESPONSE,
findFreeID);
my $message = join(';', @fields);
debug($message);
if ($useSerial== 0) {
print $socktx "$message\n";
} else {
sendToSerialGateway("$message\n");
}
}
sub createMsg
{
my $msgRef = shift;
my @fields = ( $msgRef->{'radioId'},
$msgRef->{'childId'},
$msgRef->{'cmd'},
$msgRef->{'ack'},
$msgRef->{'subType'},
defined($msgRef->{'payload'}) ? $msgRef->{'payload'} : "" );
return join(';', @fields);
}
sub subTypeToStr
{
my $cmd = shift;
my $subType = shift;
# Convert subtype to string, depending on message type
given ($cmd)
{
$subType = (sensorTypesToStr)[$subType] when C_PRESENTATION;
$subType = (variableTypesToStr)[$subType] when C_SET;
$subType = (variableTypesToStr)[$subType] when C_REQ;
$subType = (internalMessageTypesToStr)[$subType] when C_INTERNAL;
default { $subType = "<UNKNOWN_$subType>" }
}
return $subType;
}
sub dumpMsg
{
my $msgRef = shift;
my $cmd = (commandToStr)[$msgRef->{'cmd'}];
my $st = subTypeToStr( $msgRef->{'cmd'}, $msgRef->{'subType'} );
printf("Rx: fr=%03d ci=%03d c=%03d(%-14s) st=%03d(%-16s) ack=%d %s\n", $msgRef->{'radioId'}, $msgRef->{'childId'}, $msgRef->{'cmd'}, $cmd, $msgRef->{'subType'}, $st, $msgRef->{'ack'}, defined($msgRef->{'payload'}) ? "'".$msgRef->{'payload'}."'" : "");
}
sub createTopic
{
my $msgRef = shift;
my $st = subTypeToStr( $msgRef->{'cmd'}, $msgRef->{'subType'} );
# Note: Addendum to spec: a topic ending in a slash is considered equivalent to the same topic without the slash -> So we leave out the trailing slash
return sprintf("%s/%d/%d/%s", topicRoot, $msgRef->{'radioId'}, $msgRef->{'childId'}, $st);
}
my $mqtt;
# Open a separate socket connection for sending.
# TODO: I guess this should not be required.... Figure out how...
if($useSerial== 0) {
$socktx = new IO::Socket::INET (
PeerHost => $mysnsHost,
PeerPort => $mysnsPort,
Proto => 'tcp',
) or die "ERROR in Socket Creation : $!\n";
}
# Callback called when MQTT generates an error
sub onMqttError
{
my ($fatal, $message) = @_;
if ($fatal) {
die "MQTT: ".$message, "\n";
} else {
warn $message, "\n";
}
}
# Callback when MQTT broker publishes a message
sub onMqttPublish
{
my($topic, $message) = @_;
debug("$topic:$message");
my $msgRef = parseTopicMessage($topic, $message);
my $mySnsMsg = createMsg($msgRef);
debug($mySnsMsg);
print "Publish $topic: $message => $mySnsMsg\n";
if ($useSerial== 0) {
print $socktx "$mySnsMsg\n";
} else {
sendToSerialGateway("$mySnsMsg\n");
}
}
# Callback called when socket connection generates an error
sub onSocketError
{
my ($handle, $fatal, $message) = @_;
if ($fatal) {
die $message, "\n";
} else {
warn $message, "\n";
}
}
sub onSocketDisconnect
{
my ($handle) = @_;
$handle->destroy();
}
sub subscribe
{
my $topic = shift;
print "Subscribe '$topic'\n";
$mqtt->subscribe(
topic => $topic,
callback => \&onMqttPublish,
qos => $qos );
# $cv->recv; # subscribed,
# Add topic to list of subscribed topics, if not already present.
if (!($topic ~~ @subscriptions))
{
push(@subscriptions, $topic);
}
}
sub unsubscribe
{
my $topic = shift;
print "Unsubscribe '$topic'\n";
$mqtt->unsubscribe(
topic => $topic
);
# Remove topic from list of subscribed topics, if present.
if ($topic ~~ @subscriptions)
{
@subscriptions = grep { !/^$topic$/ } @subscriptions;
}
}
sub gettime
{
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time);
$year += 1900;
$mon++;
return sprintf("%04d%02d%02d-%02d:%02d:%02d", $year, $mon, $mday, $hour, $min, $sec);
}
sub publish
{
my ($topic, $message) = @_;
$message = "" if !defined($message);
print "Publish '$topic':'$message'\n";
$mqtt->publish(
topic => $topic,
message => $message,
retain => 1,
qos => $qos,
);
# Publish timestamp of last update
$mqtt->publish(
topic => "$topic/lastupdate",
message => gettime(),
retain => 1,
qos => $qos,
);
}
sub onNodePresenation
{
# Node announces itself on the network
my $msgRef = shift;
my $radioId = $msgRef->{'radioId'};
# Remove existing subscriptions for this node
foreach my $topic( grep { /^@{[topicRoot]}\/$radioId\// } @subscriptions )
{
unsubscribe($topic);
}
}
sub onSocketRead
{
my ($handle) = @_;
chomp($handle->rbuf);
debug($handle->rbuf);
handleMessage($handle->rbuf);
# Clear the buffer
$handle->rbuf = "";
}
sub onSerialRead
{
my ($line) = @_;
debug($line);
handleMessage($line);
}
sub handleMessage
{
my $msg=shift;
my $msgRef = parseMsg($msg);
dumpMsg($msgRef);
given ($msgRef->{'cmd'})
{
when ([C_PRESENTATION, C_SET, C_INTERNAL])
{
onNodePresenation($msgRef) if (($msgRef->{'childId'} == 255) && ($msgRef->{'cmd'} == C_PRESENTATION));
saveNode($msgRef) if($msgRef->{'subType'} == I_SKETCH_NAME);
assignID($msgRef) if($msgRef->{'childId'} == 255 && $msgRef->{'cmd'} ==C_INTERNAL && $msgRef->{'subType'} == I_ID_REQUEST);
publish( createTopic($msgRef), $msgRef->{'payload'} );
}
when (C_REQ)
{
subscribe( createTopic($msgRef) );
}
default
{
# Ignore
}
}
}
sub doShutdown
{
store(\@subscriptions, $subscriptionStorageFile) ;
}
sub onCtrlC
{
print "\nShutdown requested...\n";
doShutdown();
exit;
}
# Install Ctrl-C handler
$SIG{INT} = \&onCtrlC;
initialiseSerialPort();
while (1)
{
# Run code in eval loop, so die's are caught with error message stored in $@
# See: https://sites.google.com/site/oleberperlrecipes/recipes/06-error-handling/00---simple-exception
local $@ = undef;
eval
{
print "use $useSerial";
$mqtt = AnyEvent::MQTT->new(
host => $mqttHost,
port => $mqttPort,
keep_alive_timer => $keep_alive_timer,
on_error => \&onMqttError,
# clean_session => 0, # Retain client subscriptions etc
client_id => 'MySensors',
);
if ($useSerial==0) {
tcp_connect($mysnsHost, $mysnsPort, sub {
my $sock = shift or die "FAIL";
my $handle;
$handle = AnyEvent::Handle->new(
fh => $sock,
on_error => \&onSocketError,
on_eof => sub {
print "DISCONNECT!\n";
$handle->destroy();
},
on_connect => sub {
my ($handle, $host, $port) = @_;
print "Connected to MySensors gateway at $host:$port\n";
},
on_read => \&onSocketRead );
} );
}
$cv = AnyEvent->condvar;
if ($useSerial==1) {
$serialHandle = create_handle();
}
# On shutdown active subscriptions will be saved to file. Read this file here and restore subscriptions.
if (-e $subscriptionStorageFile)
{
my $subscrRef = retrieve($subscriptionStorageFile);
print "Restoring previous subscriptions:\n" if @$subscrRef;
foreach my $topic(@$subscrRef)
{
subscribe($topic);
}
}
$cv->recv;
}
or do
{
print "Exception: ".$@."\n";
# doShutdown();
print "Restarting...\n";
sleep(5);
# exit (0);
}
}
Created: 2015-03-26 20:15:50 - Hits: 6097