JetStream

6 minutes read
Edit on GitHub

By default, plgd hub services use NATS as an EventBus and MongoDB as an EventStore. Some use-cases require subscription directly to the internal messaging system instead of communicating with the plgd using its gateways. To simplify the data reconciliation and scale consumers easier, plgd supports JetStream technology as an alternative EventBus. JetStream is built on top of NATS, persisting all published events. Using JetStream as an EventBus allows you to access older, not yet processed messages without accessing the EventStore.

Note

There are still some edge-cases when the plgd event couldn’t be published to the JetStream but it was stored to the EventStore. In such a case you need to identify that one event was lost and if needed, retrieve it using plgd gRPC Gateway.

Note

More information about the JetStream can be found here.

  • ownerID is the owner of the device. It is calculated as uuid.NewV5(uuid.NamespaceURL, value of JWT ownerClaim).
  • deviceID is the UUID of the device.
  • hrefID is the identifier of resource. It is calculated as uuid.NewV5(uuid.NamespaceURL, href), where the href is a resource path. (eg “/oic/d”).

Each event is compressed by snappy and encoded in protobuf devices event envelope. The event envelope consist of Event.data containing the event and Event.event_type describing the type of the event.

  • plgd.owners.{ownerID}.devices.{deviceID}.resource-links.{eventType} publishes resource-links events of types resourcelinkspublished, resourcelinksunpublished, resourcelinkssnapshottaken for device deviceID and ownerID
  • plgd.owners.{ownerID}.devices.{deviceID}.metadata.{eventType} publishes metadata events of types devicemetadataupdatepending,devicemetadataupdated, devicemetadatasnapshottaken for device deviceID and ownerID
  • plgd.owners.{ownerID}.devices.{deviceID}.resources.{hrefID}.{eventType} publishes resources events of types resourcechanged, resourcecreated, resourcecreatepending, resourcedeleted resourcedeletepending, resourceretrieved, resourceretrievepending, resourcestatesnapshottaken, resourceupdated, resourceupdatepending for resource hrefID, deviceID and ownerID.

Each event is encoded in protobuf event envelope and then compressed by snappy.

  • plgd.owners.{ownerID}.registrations.{eventType} publishes owner events of types devicesregistered,devicesunregistered for ownerID.

For the consumers of events you can subscribe to:

  • plgd.owners.> retrieves all events of the hub.
  • plgd.owners.{ownerId}.> retrieves all events of the owner with the ownerId.
  • plgd.owners.*.devices.{deviceId}.> retrieves all events of the device with the deviceId.
  • plgd.owners.*.devices.{deviceId}.resource-links.> retrieves all resource link events of the device with the deviceId.
  • plgd.owners.*.devices.{deviceId}.resource-links.resourcelinkspublished retrieves the resourcelinkspublished event of the device with the deviceId.
  • plgd.owners.*.devices.*.resource-links.> retrieves all resource link events of all devices.
  • plgd.owners.{ownerId}.devices.*.resource-links.> retrieves all resource link events of all devices belonging to the owner with the ownerId.
  • plgd.owners.*.devices.{deviceId}.metadata.> retrieves all metadata events of the device with the deviceId.
  • plgd.owners.*.devices.{deviceId}.metadata.devicemetadataupdated retrieves the devicemetadataupdated event of the device with the deviceId.
  • plgd.owners.*.devices.*.metadata.> retrieves all metadata events of all devices.
  • plgd.owners.{ownerId}.devices.*.metadata.> retrieves all metadata events of all devices belonging to the owner with the ownerId.
  • plgd.owners.*.devices.{deviceId}.resources.> retrieves all resource events of the device with the deviceId.
  • plgd.owners.*.devices.{deviceId}.resources.{hrefId}.> retrieves all events of the resource with the hrefId for the device with the deviceId.
  • plgd.owners.*.devices.{deviceId}.resources.{hrefId}.resourcechanged retrieves the resourcechanged events of the resource with the hrefId for the device with the deviceId.
  • plgd.owners.*.devices.{deviceId}.resources.*.resourcechanged retrieves the resourcechanged events of all resources for the device with the deviceId.
  • plgd.owners.*.devices.*.resources.*.resourcechanged retrieves the resourcechanged events of all resources for all devices.
  • plgd.owners.*.devices.*.resources.{hrefId}.resourcechanged retrieves the resourcechanged events of the resource with the hrefId for all devices.
  • plgd.owners.{ownerId}.devices.*.resources.*.resourcechanged retrieves the resourcechanged events of all resources for all devices belonging to the owner with the ownerId.
  • plgd.owners.{ownerId}.devices.*.resources.{hrefId}.resourcechanged retrieves the resourcechanged events of the resource with the hrefId for all devices belonging to the owner with the ownerId.
Note

Deployment of the JetStream as an EventBus will be controlled by a single configuration option available in the plgd HELM chart. This is currently WIP.

Important

It’s required from you to create event streams before the JetStream can be used as the plgd EventBus. If streams are not created, plgd services won’t work.

Set env variable JETSTREAM=true of bundle

SOURCE Copy
Copied
        docker run -it --rm -e JETSTREAM=true --network=host -v `pwd`/.tmp/data:/data ghcr.io/plgd-dev/hub/bundle:latest)
    
Important

Required nats-server 2.3+ Required nats client

Append jetstream configuration to nats.config of nats-server:

SOURCE Copy
Copied
        ...
jetstream: {
  store_dir: "$JETSTREAM_PATH"
  // 1GB of memory
  max_memory_store: 1073741824

  // 10GB of memory
  max_file_store: 10737418240
}
    
Note

More information about nats-server configuration.

and start it:

SOURCE Copy
Copied
        nats-server -c nats.config
    

Setup events stream stream.json where all events of hub will be stored:

SOURCE Copy
Copied
        {
  "name": "EVENTS", // A name for the Stream that may not have spaces, tabs, period (.), greater than (>), or asterisk (*).
  "subjects": [ // A list of subjects to consume, supports wildcards.
    "plgd.>"
  ],
  "retention": "limits",  // How message retention is considered, limits (default), interest or workQueue.
  "max_consumers": -1, // How many Consumers can be defined for a given Stream, -1 for unlimited.
  "max_msgs_per_subject": 0, // / How many messages may be in a subject of Stream, -1 for unlimited.
  "max_msgs": -1,  // How many messages may be in a Stream. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this size, -1 for unlimited.
  "max_bytes": -1, // How many bytes the Stream may contain. Adheres to Discard Policy, removing oldest or refusing new messages if the Stream exceeds this number of messages, -1 for unlimited.
  "max_age": 0,  // Maximum age of any message in the Stream, expressed in nanoseconds, -1 for unlimited.
  "max_msg_size": -1, // The largest message that will be accepted by the Stream. -1 for unlimited
  "storage": "file", // The type of storage backend, file and memory
  "discard": "old", // discard old messages when stream is full
  "num_replicas": 1, // How many replicas to keep for each message in a clustered JetStream, maximum 5
  "duplicate_window": 600000000000 // The window within which to track duplicate messages, expressed in nanoseconds.
}
    
Note

More information about stream configuration.

And then apply the configuration to the nats-server via:

SOURCE Copy
Copied
        nats str add EVENTS --config /configs/jetstream.json
    

Creates NATS Server with JetStream enabled as a leaf node connection.

SOURCE Copy
Copied
        kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-server/nats-js-leaf.yml
    

Now install the JetStream CRDs and Controller:

SOURCE Copy
Copied
        helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats --set nats.image=synadia/nats-server:nightly --set=nats.jetstream.enabled=true
helm install nack nats/nack --set=jetstream.nats.url=nats://nats:4222
    

Create one events stream

SOURCE Copy
Copied
        ---
apiVersion: jetstream.nats.io/v1beta1
kind: Stream
metadata:
  name: events
spec:
  name: events
  subjects: ["plgd.>"]
    

Set clients.eventBus.nats.jetstream to true value.

SOURCE Copy
Copied
        ...
clients:
  eventBus:
    nats:
      jetstream: true
...
    
    Jul 1, 2021

    Get started

    plgd makes it simpler to build a successful IoT initiative – to create a proof of concept, evaluate, optimize, and scale.

    Get Started Illustration Get Started Illustration