JetStream

4 minutes read
Edit on GitHub

By default, plgd hub services use NATS as an EventBus and MongoDB as an EventStore. Some use-cases require subscription directly to the internal messaging system instead of communicating with the plgd using its gateways. To simplify the data reconciliation and scale consumers easier, plgd supports JetStream technology as an alternative EventBus. JetStream is built on top of NATS, persisting all published events. Using JetStream as an EventBus allows you to access older, not yet processed messages without accessing the EventStore.

Note

There are still some edge-cases when the plgd event couldn’t be published to the JetStream but it was stored to the EventStore. In such a case you need to identify that one event was lost and if needed, retrieve it using plgd gRPC Gateway.

Note

More information about the JetStream can be found here.

Note

Deployment of the JetStream as an EventBus will be controlled by a single configuration option available in the plgd HELM chart. This is currently WIP.

Important

It’s required from you to create event streams before the JetStream can be used as the plgd EventBus. If streams are not created, plgd services won’t work.

Set env variable JETSTREAM=true of bundle

SOURCE Copy
Copied
        docker run -it --rm -e JETSTREAM=true --network=host -v `pwd`/.tmp/data:/data ghcr.io/plgd-dev/hub/bundle:latest)
    
Important

Required nats-server 2.3+ Required nats client

Append jetstream configuration to nats.config of nats-server:

SOURCE Copy
Copied
        ...
jetstream: {
  store_dir: "$JETSTREAM_PATH"
  // 1GB of memory
  max_memory_store: 1073741824

  // 10GB of memory
  max_file_store: 10737418240
}
    
Note

More information about nats-server configuration.

and start it:

SOURCE Copy
Copied
        nats-server -c nats.config
    

Setup events stream stream.json where all events of hub will be stored:

SOURCE Copy
Copied
        {
  "name": "EVENTS", // A name for the Stream that may not have spaces, tabs, period (.), greater than (>), or asterisk (*).
  "subjects": [ // A list of subjects to consume, supports wildcards.
    "plgd.>"
  ],
  "retention": "limits",  // How message retention is considered, limits (default), interest or workQueue.
  "max_consumers": -1, // How many Consumers can be defined for a given Stream, -1 for unlimited.
  "max_msgs_per_subject": 0, // / How many messages may be in a subject of Stream, -1 for unlimited.
  "max_msgs": -1,  // How many messages may be in a Stream. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this size, -1 for unlimited.
  "max_bytes": -1, // How many bytes the Stream may contain. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this number of messages, -1 for unlimited.
  "max_age": 0,  // Maximum age of any message in the Stream, expressed in nanoseconds, -1 for unlimited.
  "max_msg_size": -1, // The largest message that will be accepted by the Stream. -1 for unlimited
  "storage": "file", // The type of storage backend, file and memory
  "discard": "old", // discard old messages when stream is full
  "num_replicas": 1, // How many replicas to keep for each message in a clustered JetStream, maximum 5
  "duplicate_window": 600000000000 // The window within which to track duplicate messages, expressed in nanoseconds.
}
    
Note

More information about stream configuration.

And then apply the configuration to the nats-server via:

SOURCE Copy
Copied
        nats str add EVENTS --config /configs/jetstream.json
    

Creates NATS Server with JetStream enabled as a leaf node connection.

SOURCE Copy
Copied
        kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-server/nats-js-leaf.yml
    

Now install the JetStream CRDs and Controller:

SOURCE Copy
Copied
        helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats --set nats.image=synadia/nats-server:nightly --set=nats.jetstream.enabled=true
helm install nack nats/nack --set=jetstream.nats.url=nats://nats:4222
    

Create one events stream

SOURCE Copy
Copied
        ---
apiVersion: jetstream.nats.io/v1beta1
kind: Stream
metadata:
  name: events
spec:
  name: events
  subjects: ["plgd.>"]
    

Set clients.eventBus.nats.jetstream to true value.

SOURCE Copy
Copied
        ...
clients:
  eventBus:
    nats:
      jetstream: true
...
    
Jul 1, 2021

Get started

plgd makes it simpler to build a successful IoT initiative – to create a proof of concept, evaluate, optimize, and scale.

Get Started Illustration Get Started Illustration