Using Gateways with MQTT Bridge
This page explains how gateways can use the MQTT bridge to communicate with OmniCore and publish telemetry events on behalf of bound devices. Before you begin, read Using the MQTT bridge for general information on using the MQTT bridge with OmniCore.
Using gateways with the MQTT bridge
- After you've created and configured the gateway, connect it to OmniCore over the MQTT bridge.
- Create devices if you haven't already.
- Optional: Bind the devices to the gateway.
- When using the MQTT bridge, you only need to bind the devices if they can't generate their own JWTs.
- Optional: Subscribe to the system error topic to get feedback on whether device operations are successful or not.
- Attach the devices to the gateway.
- Use the gateway to relay telemetry, device state, and configuration messages on behalf of its devices. Try the end-to-end demo to learn how.
Gateway messages
After the gateway connects to OmniCore over the MQTT bridge, it can send or receive three types of messages:
- Control messages: Attaches a device to the gateway, or detaches a device from the gateway. These messages are sent between the gateway and OmniCore. OmniCore accepts control messages only from gateways; if another type of device attempts to send a control message, OmniCore closes the connection.
- Messages from gateways and devices: Can be relayed by the gateway on behalf of a device, or sent directly from the gateway itself.
- System error messages: When the gateway is subscribed to the MQTT system error topic on behalf of the device, OmniCore sends error messages to the gateway whenever the device encounters an error.
Attaching devices to a gateway
To enable the gateway to proxy device communications with OmniCore, have the gateway publish a QoS 1 /REGISTRY_ID/{device_ID_to_attach}/attach control message over the MQTT bridge.
If you configured the gateway to authenticate devices using the devices' JWTs, the payload of the attach message must include the token in JSON format: { "authorization" : "{JWT_token}" }. Otherwise, OmniCore authenticates the device by checking its association with the gateway.
Detaching devices from the gateway
To detach a device from the gateway, have the gateway publish a QoS 1 /REGISTRY_ID/{device_ID}/detach control message over the MQTT bridge. If the device isn't attached at the time the message is sent, OmniCore ignores the detach control message and sends a PUBACK message.
Troubleshooting
To be notified when a device encounters an error, subscribe the gateway to the MQTT /REGISTRY_ID/$[gateway_ID]/errors topic using QoS level 0:
- Go
import (
"fmt"
"io"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, subscriptionId string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {
const (
mqttBrokerURL = "tls://hostprefix.mqtt.korewireless.com:8883"
protocolVersion = 4 // corresponds to MQTT 3.1.1
)
// onConnect defines the on connect handler which resets backoff variables.
var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
}
// onMessage defines the message handler for the mqtt client.
var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
fmt.Fprintf(w, "Message: %s\n", msg.Payload())
}
// onDisconnect defines the connection lost handler for the mqtt client.
var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Println("Client disconnected")
}
jwt, _ := createJWT( privateKeyPath, algorithm, 60)
clientID := fmt.Sprintf("subscriptions/%s/registries/%s/devices/%s", subscriptionID, registryID, gatewayID)
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttBrokerURL)
opts.SetClientID(clientID)
opts.SetUsername("unused")
opts.SetPassword(jwt)
opts.SetProtocolVersion(protocolVersion)
opts.SetOnConnectHandler(onConnect)
opts.SetDefaultPublishHandler(onMessage)
opts.SetConnectionLostHandler(onDisconnect)
// Create and connect a client using the above options.
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to connect client")
return token.Error()
}
if err := attachDevice(deviceID, client, ""); err != nil {
fmt.Fprintf(w, "AttachDevice error: %v\n", err)
return err
}
// Sleep for 5 seconds to allow attachDevice message to propagate.
time.Sleep(5 * time.Second)
// Subscribe to the config topic of the current gateway and a device bound to the gateway.
gatewayTopic := fmt.Sprintf("/%s/%s/%s", registryID,gatewayID, topic)
if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, token.Error())
return token.Error()
}
deviceTopic := fmt.Sprintf("/%s/%s/%s", registryID,deviceID, topic)
if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, token.Error())
return token.Error()
}
time.Sleep(time.Duration(clientDuration) * time.Second)
if err := detachDevice(deviceID, client, ""); err != nil {
fmt.Fprintf(w, "DetachDevice error: %v\n", err)
return err
}
if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, token.Error())
return token.Error()
}
client.Disconnect(10)
return nil
}