Development

Building Distributed Systems with NATS Jetstream and Golang

David Daniel July 12, 2023 8 min read

The usage of distributed system architecture is growing day by day. It leverages interconnected components that work in synergy to achieve application decoupling, enabling scalability, fault tolerance, high performance, and enhanced availability for applications.

In order to build your distributed application you should carefully choose the different components of your application. This blog post will guide you through the process of building a simple weather application utilizing NATS JetStream as the message-oriented middleware (broker), with a focus on implementing the application using the Go programming language.


Why Golang

Go, also called Golang or Go language, is an open source, compiled, and statically typed  programming language that Google developed.

The choice of Go as a programming language by developers is on the rise due to several reasons:

  • It boasts a straightforward and clean syntax, prioritizing simplicity and readability. This ease of use extends to its robust standard library, providing developers with a wealth of packages for various tasks.
  • Go’s compiled nature ensures high performance, while its built-in concurrency support with goroutines and channels allows for efficient concurrent programming.
  • With its scalability, Go excels in handling large numbers of concurrent connections, making it suitable for building scalable systems.
  • Go’s cross-platform compatibility enables code portability across different operating systems and architectures.
  • Its statically typed nature enhances code reliability and catches errors at compile-time.
  • Go benefits from a vibrant open-source community, offering extensive resources and support to developers.

And for NATS Jetstream users:

  • NATS offers official NATS Go client, This creates an easy-to-use API for interacting with a NATS server and utilizing JetStream features.

Setting up NATS Server and Jetstream

NATS JetStream is the NATS persistence engine providing streaming, message, and worker queues with At-Least-Once semantics. JetStream stores messages in streams. It is designed to build distributed systems with key features such as persistence for durable messages storage with stream-based messaging and publish-subscribe capabilities.

To create your Golang NATS distributed application, you need first install a NATS Server. Its docs provide multiple ways to install a NATS server based on your environment. In this guide, we will use Docker to do so.

  • Make sure Docker is installed on your environment.
  • Navigate to your working directory and pull the latest NATS docker image.
  • Run on Docker

docker run -p 4222:4222 -ti nats:latest

The Docker compose code will spin a nats-server container server on port 4222. By default, the NATS server will start listening for NATS client connections on port 4222. While running the server, the command will run the flag -js to enable Jetstream. It launches the NATS server and configures it to support NATS Jetstream.

Alternatively, you can use your favorite package manager and still install NATS.

  • For example, if on Mac OS, run the following command:
  • To run the server, use the following command to execute the NATS server:
  • This command should log messages indicating that Jetstream is enabled and the NATS server is running.

NATS server is running


Running NATS Jetstream Go Client

Now your NATS server is running, and Jetstream is ready to manage streams. To use NATS as a publish-subscribe messaging system, you need a client. In this case, you are going to use Golang. NATS Go client provides all functionalities you need for a Go client to communicate with the NATS messaging system.

Before installing the NATS Go client, ensure you have Golang installed on your command and unitized in your working directory using the following command:

To install the NATS Go client, run the following command within your working directory:

go get github.com/nats-io/nats.go

Within your working directory create a main.go file. The code samples used in the coming steps will be saved in this file. Ensure you have the following imports ready, as you will use them later:

package main

 

import (

    “encoding/json”

    “fmt”

    “log”

    “math/rand”

    “time”

    “github.com/nats-io/nats.go”

)


Connecting to NATS Server using Go

To publish messages to your servers, you need to establish a connection to the NATS server Default URL (nats://127.0.0.1:4222).

In your main.go file, create the connect function as follows:

func connect() *nats.Conn {

    nc, err := nats.Connect(nats.DefaultURL)

    if err != nil {

        log.Fatal(err)

    }

    return nc

}

nats.Connect will execute a connection to the server using constant DefaultURL. Up to now, you should have the NATS server running and accessible at the default URL.

Using the above-created connection, initialize Jetstream. This will allow you to have Jetstream operations such as creating streams, publishing messages, and subscribing to messages.

func initializeJetStream(nc *nats.Conn) nats.JetStreamContext {

    js, err := nc.JetStream()

    if err != nil {

        log.Fatal(err)

    }

    return js

}


Creating Stream

A stream creates a message store that defines how messages are stored. Here you need to create the stream name for storing the application messages. NATS also uses Subjects that create a subject-based messaging.

A subject is just a string of characters that form a name the publisher and subscriber can use to find each other.

Example of Subject-Based Messaging

In this example will build a simple weather, publish-subscribe model.  First, create the following constants describing your stream. The stream constants will be as follows:

const (

    StreamName = “weather”

    Subjects   = “weather-data.*”

)

Use AddStream to create a Jetstream stream with the above-specified name and subjects:

func createStream(js nats.JetStreamContext) {

    streamConfig := &nats.StreamConfig{

        Name:     StreamName,

        Subjects: []string{Subjects},

    }

    _, err := js.AddStream(streamConfig)

    if err != nil {

        log.Fatal(err)

    }

}


Publishing Messages

A message contains the payload that will be saved in your stream. Since we don’t have ready weather data, we will simulate a data generation example. This will generate multiple messages the publisher will use to publish to the stream:

// data

type WeatherData struct {

    Timestamp   time.Time

    Temperature float64

    Humidity    float64

}

 

func publishWeatherData(js nats.JetStreamContext) {

    ticker := time.NewTicker(1 * time.Second)

    defer ticker.Stop()

 

    for range ticker.C {

        // Simulate data 

        temperature := 25.0 + 50*(rand.Float64())

        humidity := 50.0 + 40*(rand.Float64())

        data := WeatherData{

            Timestamp:   time.Now(),

            Temperature: temperature,

            Humidity:    humidity,

        }

 

        // Publish

        dataBytes, err := json.Marshal(data)

        if err != nil {

            log.Println(err)

            continue

        }

 

        msg := &nats.Msg{

            Subject: fmt.Sprintf(“weatherdata.%s”, data.Timestamp.Format(“2006-01-02”)),

            Data: dataBytes,

        }

 

        log.Printf(“Publish => Timestamp=%s, Temperature=%.2f, Humidity=%.2f”, data.Timestamp, data.Temperature, data.Humidity)

 

        _, err = js.PublishMsg(msg)

        if err != nil {

            log.Println(err)

        }

    }

}

NATS server should now be able to receive messages and publish them. Each message will have the timestamp of when the message was created, and computed temperature and humidity values.

Let’s check if this works as expected. Ensure you have a main function to execute all the above-created functions as follows:

func main() {

 

    nc := connect()

    defer nc.Close()

    js := initializeJetStream(nc)

    createStream(js)

    go publishMessage(js)

    select {}

}

Then within the working directory, execute go run main.go. The application should publish the messages to your stream:

execute go run main.go


Consuming Messages

To complete the publish-subscribe model, you need consumers who subscribe to the published messages.

NATS’s publish-subscribe model uses a one-to-many (fan-out) communication. The publisher will send messages on the subject created. Any active subscriber can listen on that subject and consume messages as such:

Example of NATS Publish-Subscribe Model

To allows subscribers to consume the messages, process the received weather messages as follows:

func handleIncomingMessage(msg *nats.Msg) {

    var data WeatherData

    err := json.Unmarshal(msg.Data, &data)

    if err != nil {

        log.Println(err)

        return

    }

    log.Printf(“Subscriber => Timestamp=%s, Temperature=%.2f, Humidity=%.2f”, data.Timestamp, data.Temperature, data.Humidity)

}

Create a Jetstream subscription for the subjects and subscribe to the Jetstream stream:

func consumeMessages(js nats.JetStreamContext) *nats.Subscription {

    sub, err := js.Subscribe(Subjects, func(msg *nats.Msg) {

        handleIncomingMessage(msg)

    })

 

    if err != nil {

        log.Fatal(err)

    }

    return sub

}

Since NATS uses a one-to-many model, this example will execute with two subscribers consuming the messages published to the weatherdata subjects. Update the const and add SubscriberCount as follows:

const (

    StreamName = “weather”

    Subjects   = “weatherdata.*”

    SubscriberCount = 2

)

To execute the subscribers update the main() functions as follows:

func main() {

    nc := connect()

    defer nc.Close()

    js := initializeJetStream(nc)

    createStream(js)

    go publishMessage(js)

    for i := 1; i <= SubscriberCount; i++ {

        sub := consumeMessages(js)

        defer sub.Unsubscribe()

    }

    select {}

}

Rerun your application (go run main.go), and you should now have the two subscribers consuming each published message:

go run main.go