The usage of distributed system architecture is growing day by day. It leverages interconnected components that work in synergy to achieve application decoupling, enabling scalability, fault tolerance, high performance, and enhanced availability for applications.
In order to build your distributed application you should carefully choose the different components of your application. This blog post will guide you through the process of building a simple weather application utilizing NATS JetStream as the message-oriented middleware (broker), with a focus on implementing the application using the Go programming language.
Go, also called Golang or Go language, is an open source, compiled, and statically typed programming language that Google developed.
The choice of Go as a programming language by developers is on the rise due to several reasons:
And for NATS Jetstream users:
NATS JetStream is the NATS persistence engine providing streaming, message, and worker queues with At-Least-Once semantics. JetStream stores messages in streams. It is designed to build distributed systems with key features such as persistence for durable messages storage with stream-based messaging and publish-subscribe capabilities.
To create your Golang NATS distributed application, you need first install a NATS Server. Its docs provide multiple ways to install a NATS server based on your environment. In this guide, we will use Docker to do so.
pull nats:latest
docker run -p 4222:4222 -ti nats:latest
The Docker compose code will spin a nats-server container server on port 4222. By default, the NATS server will start listening for NATS client connections on port 4222. While running the server, the command will run the flag -js to enable Jetstream. It launches the NATS server and configures it to support NATS Jetstream.
Alternatively, you can use your favorite package manager and still install NATS.
brew install nats-server
nats-server -js
Now your NATS server is running, and Jetstream is ready to manage streams. To use NATS as a publish-subscribe messaging system, you need a client. In this case, you are going to use Golang. NATS Go client provides all functionalities you need for a Go client to communicate with the NATS messaging system.
Before installing the NATS Go client, ensure you have Golang installed on your command and unitized in your working directory using the following command:
go init mod go_nats
To install the NATS Go client, run the following command within your working directory:
go get github.com/nats-io/nats.go
Within your working directory create a main.go file. The code samples used in the coming steps will be saved in this file. Ensure you have the following imports ready, as you will use them later:
package main
import (
“encoding/json”
“fmt”
“log”
“math/rand”
“time”
“github.com/nats-io/nats.go”
)
To publish messages to your servers, you need to establish a connection to the NATS server Default URL (nats://127.0.0.1:4222).
In your main.go file, create the connect function as follows:
func connect() *nats.Conn {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
return nc
}
nats.Connect will execute a connection to the server using constant DefaultURL. Up to now, you should have the NATS server running and accessible at the default URL.
Using the above-created connection, initialize Jetstream. This will allow you to have Jetstream operations such as creating streams, publishing messages, and subscribing to messages.
func initializeJetStream(nc *nats.Conn) nats.JetStreamContext {
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
return js
}
A stream creates a message store that defines how messages are stored. Here you need to create the stream name for storing the application messages. NATS also uses Subjects that create a subject-based messaging.
A subject is just a string of characters that form a name the publisher and subscriber can use to find each other.
In this example will build a simple weather, publish-subscribe model. First, create the following constants describing your stream. The stream constants will be as follows:
const (
StreamName = “weather”
Subjects = “weather-data.*”
)
Use AddStream to create a Jetstream stream with the above-specified name and subjects:
func createStream(js nats.JetStreamContext) {
streamConfig := &nats.StreamConfig{
Name: StreamName,
Subjects: []string{Subjects},
}
_, err := js.AddStream(streamConfig)
if err != nil {
log.Fatal(err)
}
}
A message contains the payload that will be saved in your stream. Since we don’t have ready weather data, we will simulate a data generation example. This will generate multiple messages the publisher will use to publish to the stream:
// data
type WeatherData struct {
Timestamp time.Time
Temperature float64
Humidity float64
}
func publishWeatherData(js nats.JetStreamContext) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Simulate data
temperature := 25.0 + 50*(rand.Float64())
humidity := 50.0 + 40*(rand.Float64())
data := WeatherData{
Timestamp: time.Now(),
Temperature: temperature,
Humidity: humidity,
}
// Publish
dataBytes, err := json.Marshal(data)
if err != nil {
log.Println(err)
continue
}
msg := &nats.Msg{
Subject: fmt.Sprintf(“weatherdata.%s”, data.Timestamp.Format(“2006-01-02”)),
Data: dataBytes,
}
log.Printf(“Publish => Timestamp=%s, Temperature=%.2f, Humidity=%.2f”, data.Timestamp, data.Temperature, data.Humidity)
_, err = js.PublishMsg(msg)
if err != nil {
log.Println(err)
}
}
}
NATS server should now be able to receive messages and publish them. Each message will have the timestamp of when the message was created, and computed temperature and humidity values.
Let’s check if this works as expected. Ensure you have a main function to execute all the above-created functions as follows:
func main() {
nc := connect()
defer nc.Close()
js := initializeJetStream(nc)
createStream(js)
go publishMessage(js)
select {}
}
Then within the working directory, execute go run main.go. The application should publish the messages to your stream:
To complete the publish-subscribe model, you need consumers who subscribe to the published messages.
NATS’s publish-subscribe model uses a one-to-many (fan-out) communication. The publisher will send messages on the subject created. Any active subscriber can listen on that subject and consume messages as such:
To allows subscribers to consume the messages, process the received weather messages as follows:
func handleIncomingMessage(msg *nats.Msg) {
var data WeatherData
err := json.Unmarshal(msg.Data, &data)
if err != nil {
log.Println(err)
return
}
log.Printf(“Subscriber => Timestamp=%s, Temperature=%.2f, Humidity=%.2f”, data.Timestamp, data.Temperature, data.Humidity)
}
Create a Jetstream subscription for the subjects and subscribe to the Jetstream stream:
func consumeMessages(js nats.JetStreamContext) *nats.Subscription {
sub, err := js.Subscribe(Subjects, func(msg *nats.Msg) {
handleIncomingMessage(msg)
})
if err != nil {
log.Fatal(err)
}
return sub
}
Since NATS uses a one-to-many model, this example will execute with two subscribers consuming the messages published to the weatherdata subjects. Update the const and add SubscriberCount as follows:
const (
StreamName = “weather”
Subjects = “weatherdata.*”
SubscriberCount = 2
)
To execute the subscribers update the main() functions as follows:
func main() {
nc := connect()
defer nc.Close()
js := initializeJetStream(nc)
createStream(js)
go publishMessage(js)
for i := 1; i <= SubscriberCount; i++ {
sub := consumeMessages(js)
defer sub.Unsubscribe()
}
select {}
}
Rerun your application (go run main.go), and you should now have the two subscribers consuming each published message: