Skip to content

Commit 1a3e34e

Browse files
authored
Merge pull request #32 from asteurer/wasip2-mqtt
feat(mqtt): implement mqtt in wasip2
2 parents a65f13a + ea81e64 commit 1a3e34e

File tree

8 files changed

+186
-0
lines changed

8 files changed

+186
-0
lines changed

v3/examples/mqtt-outbound/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
main.wasm
2+
.spin/

v3/examples/mqtt-outbound/README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Requirements
2+
- Latest version of [TinyGo](https://tinygo.org/getting-started/)
3+
- Latest version of [Docker](https://docs.docker.com/get-started/get-docker/)
4+
5+
# Usage
6+
7+
In one terminal window, run:
8+
```sh
9+
# Note that the `-d` flag is intentionally omitted
10+
docker compose up
11+
```
12+
13+
In another terminal, you'll run your Spin app:
14+
```sh
15+
spin up --build
16+
```
17+
18+
In yet another terminal, you'll interact with the Spin app:
19+
```sh
20+
curl localhost:3000/publish
21+
```
22+
23+
You will see logs appear in the `docker compose` window that look something like this:
24+
```sh
25+
$ docker compose up
26+
...
27+
broker | 1754324646: New connection from 172.18.0.1:36970 on port 1883.
28+
broker | 1754324646: New client connected from 172.18.0.1:36970 as client001 (p2, c1, k30, u'user').
29+
subscriber | telemetry Eureka!
30+
broker | 1754324646: Client client001 closed its connection.
31+
```
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
services:
2+
mosquitto:
3+
image: eclipse-mosquitto:2.0.22
4+
container_name: broker
5+
ports:
6+
- "1883:1883"
7+
command: mosquitto -c /mosquitto-no-auth.conf
8+
9+
subscriber:
10+
image: eclipse-mosquitto:2.0.22
11+
container_name: subscriber
12+
depends_on:
13+
- mosquitto
14+
command: mosquitto_sub -h mosquitto -t '#' -v
15+
restart: "no" # Clean up container when stopped

v3/examples/mqtt-outbound/go.mod

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
module github.com/http_go
2+
3+
go 1.24
4+
5+
require github.com/spinframework/spin-go-sdk/v3 v3.0.0
6+
7+
require (
8+
github.com/julienschmidt/httprouter v1.3.0 // indirect
9+
go.bytecodealliance.org/cm v0.2.2 // indirect
10+
)
11+
12+
replace github.com/spinframework/spin-go-sdk/v3 => ../../

v3/examples/mqtt-outbound/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
2+
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
3+
go.bytecodealliance.org/cm v0.2.2 h1:M9iHS6qs884mbQbIjtLX1OifgyPG9DuMs2iwz8G4WQA=
4+
go.bytecodealliance.org/cm v0.2.2/go.mod h1:JD5vtVNZv7sBoQQkvBvAAVKJPhR/bqBH7yYXTItMfZI=

v3/examples/mqtt-outbound/main.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package main
2+
3+
import (
4+
"net/http"
5+
"os"
6+
"strconv"
7+
8+
spinhttp "github.com/spinframework/spin-go-sdk/v3/http"
9+
"github.com/spinframework/spin-go-sdk/v3/mqtt"
10+
)
11+
12+
func main() {}
13+
14+
func init() {
15+
spinhttp.Handle(func(w http.ResponseWriter, r *http.Request) {
16+
addr := os.Getenv("MQTT_ADDRESS")
17+
usr := os.Getenv("MQTT_USERNAME")
18+
pass := os.Getenv("MQTT_PASSWORD")
19+
keepAliveStr := os.Getenv("MQTT_KEEP_ALIVE_INTERVAL")
20+
topic := os.Getenv("MQTT_TOPIC")
21+
22+
keepAlive, err := strconv.Atoi(keepAliveStr)
23+
if err != nil {
24+
w.WriteHeader(http.StatusInternalServerError)
25+
w.Write([]byte("MQTT_KEEP_ALIVE_INTERVAL is not valid: must be an integer"))
26+
}
27+
28+
conn, err := mqtt.OpenConnection(addr, usr, pass, uint64(keepAlive))
29+
if err != nil {
30+
w.WriteHeader(http.StatusInternalServerError)
31+
w.Write([]byte(err.Error()))
32+
}
33+
34+
message := []byte("Eureka!")
35+
36+
if err := conn.Publish(topic, message, mqtt.QosAtMostOnce); err != nil {
37+
w.WriteHeader(http.StatusInternalServerError)
38+
w.Write([]byte(err.Error()))
39+
}
40+
41+
w.WriteHeader(200)
42+
w.Write([]byte("Message successfully published!\n"))
43+
})
44+
}

v3/examples/mqtt-outbound/spin.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
spin_manifest_version = 2
2+
3+
[application]
4+
name = "go-mqtt-outbound-example"
5+
version = "0.1.0"
6+
authors = ["Andrew Steurer <[email protected]>"]
7+
description = "Using Spin with MQTT"
8+
9+
[[trigger.http]]
10+
route = "/publish"
11+
component = "mqtt-outbound"
12+
13+
[component.mqtt-outbound]
14+
source = "main.wasm"
15+
# To test anonymous MQTT authentication, remove the values from MQTT_USERNAME and MQTT_PASSWORD env variables.
16+
environment = { MQTT_ADDRESS = "mqtt://127.0.0.1:1883?client_id=client001", MQTT_USERNAME = "user", MQTT_PASSWORD = "password", MQTT_KEEP_ALIVE_INTERVAL = "30", MQTT_TOPIC = "telemetry" }
17+
allowed_outbound_hosts = ["mqtt://127.0.0.1:1883"]
18+
[component.mqtt-outbound.build]
19+
command = "tinygo build -target=wasip2 --wit-package $(go list -mod=readonly -m -f '{{.Dir}}' github.com/spinframework/spin-go-sdk/v3)/wit --wit-world http-trigger -gc=leaking -o main.wasm main.go"
20+
watch = ["**/*.go", "go.mod"]

v3/mqtt/mqtt.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package mqtt
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/spinframework/spin-go-sdk/v3/internal/fermyon/spin/v2.0.0/mqtt"
8+
"go.bytecodealliance.org/cm"
9+
)
10+
11+
type Connection struct {
12+
conn mqtt.Connection
13+
}
14+
15+
// OpenConnection initializes an MQTT connection
16+
func OpenConnection(address, username, password string, keepAliveIntervalInSecs uint64) (Connection, error) {
17+
conn, err, isErr := mqtt.ConnectionOpen(address, username, password, keepAliveIntervalInSecs).Result()
18+
if isErr {
19+
return Connection{}, toError(&err)
20+
}
21+
22+
return Connection{conn: conn}, nil
23+
}
24+
25+
// Publish publishes an MQTT message
26+
func (c *Connection) Publish(topic string, payload []byte, qos QoS) error {
27+
_, err, isErr := c.conn.Publish(topic, mqtt.Payload(cm.ToList(payload)), mqtt.Qos(qos)).Result()
28+
if isErr {
29+
return toError(&err)
30+
}
31+
32+
return nil
33+
}
34+
35+
// QoS for publishing Mqtt messages
36+
type QoS = mqtt.Qos
37+
38+
const (
39+
QosAtMostOnce = mqtt.QosAtMostOnce
40+
QosAtLeastOnce = mqtt.QosAtLeastOnce
41+
QosExactlyOnce = mqtt.QosExactlyOnce
42+
)
43+
44+
func toError(err *mqtt.Error) error {
45+
if err == nil {
46+
return nil
47+
}
48+
49+
if err.String() == "connection-failed" {
50+
return fmt.Errorf("connection-failed: %s", *err.ConnectionFailed())
51+
}
52+
53+
if err.String() == "other" {
54+
return fmt.Errorf(*err.Other())
55+
}
56+
57+
return errors.New(err.String())
58+
}

0 commit comments

Comments
 (0)