Interacting with the Message Bus¶
The message bus is a simple, lightweight, and fast way to send messages between different parts (services) of your application. It is a simple publish/subscribe system that allows you to send messages to a specific subject and have any listeners on that subject receive the message.
Running NATS for development
Now that we want to interact with the message bus, we need to have NATS running. If you’re using the --dev
mode for testing your service, it’s sufficient to run the nats-server
executable in a separate terminal window. This will start a local NATS server on port 4222
which is the default port for NATS.
If you have a prdouction-like setup, you’ll need to pass the NATS_USER
and NATS_PASSWORD
corresponding to your NATS configuration as configuration parameters to your service for authentication.
Connecting to the Message Bus¶
Connecting to the event bus is automatically handled by the startService
function. It will connect to the message bus using the NATS_URL
environment variable. If you’re running the service in --dev
mode, this will be nats://localhost:4222
by default.
All you need to do compared to the previous examples is to omit the { nats: false }
parameter from the startService
function call:
import {
startService
} from "./lib.ts";
const {nc/* (1)! */} = await startService(/* (2)! */);
- Store the NATS connection in a variable called
nc
for later use. - Omit the
{ nats: false }
parameter from thestartService
function call since we want to connect to the message bus.
Note
startService
actually returns an object containing the NATS connection (nc
) and a few other things. In our example, we use destructuring to only get the nc
variable. This is equivalent to the following code:
Publishing Messages¶
Publishing messages is as simple as calling the publish
function on the NATS connection:
- The subject (sometimes also called channel) to which the message gets published.
- The message data (also called payload or body).
However, Telestion Hub uses a specific message format for all messages sent over the message bus. A message can be either JSON or a binary message. The binary message is used for sending large amounts of data, e.g., images or video streams. The JSON message is used for all other messages.
JSON Messages¶
To send a JSON message, you need to create a JSON object and pass it to the publish
function:
import {
JSONCodec,
startService
} from "./lib.ts";
// or: import { JSONCodec } from "https://deno.land/x/nats/src/mod.ts";
const {nc} = await startService();
const jsonCodec = JSONCodec();//(2)!
await nc.publish("subject", jsonCodec.encode/*(3)!*/({
foo: "some arbitrary JSON-compatible data",
bar: 42
}));
- Import the
JSONCodec
(for convenience, this gets re-exported by thelib.ts
, but you can also import it directly from the NATS library). - Create a new
JSONCodec
instance. - Encode the JSON object using the
JSONCodec
instance.
Binary Messages¶
To send a binary message, you need to create a Uint8Array
containing the bytes and pass it to the publish
function:
import {
startService
} from "./lib.ts";
const {nc} = await startService();
await nc.publish("subject", new Uint8Array([0x01, 0x02, 0x03]));
Uint8Arrays
You can learn more about how you can use Uint8Array
on MDN.
Subscribing to Messages¶
There are multiple ways to subscribe to messages on a subject. The most common way is to use the subscribe
function in combination with a for await
loop:
import {
startService
} from "./lib.ts";
const {nc} = await startService();
const subjectSubscription/*(1)!*/ = await nc.subscribe("subject"/*(2)!*/);
for await (const message of subjectSubscription) {//(3)!
console.log(message.data);//(4)!
}
- Store the subscription in a variable called
subjectSubscription
for later use. - Subscribe to the
subject
subject. - For each message received on the subject, …
- … print the message data to the console.
Unfortunately, this won’t decode our JSON messages automatically. We need to do this ourselves:
import {
JSONCodec,
startService
} from "./lib.ts";
const {nc} = await startService();
const jsonCodec = JSONCodec();
const subjectSubscription = await nc.subscribe("subject");
for await (const message of subjectSubscription) {
const jsonMessage = jsonCodec.decode(message.data);//(1)!
console.log(jsonMessage.foo);//(2)!
}
- Decode the message data using the
JSONCodec
instance. - Print the
foo
property of the decoded JSON message to the console.
Danger
Can you spot the problem with this code? What happens if the message data doesn’t contain a foo
property? Or if it’s not a JSON message at all? This would lead to our service crashing!
Never assume a message’s structure!
You should always validate the message data before using it. We’ll cover this in the next section.
Validating Messages¶
A Telestion service must validate all messages it receives. This is to ensure that the service doesn’t crash when it receives invalid messages.
Validating the message type¶
The first “layer” of validation is the message type. A message can either be a JSON message or a binary message. The jsonCodec.decode
function will throw an error if the message data is not a valid JSON message. Therefore, we can use a try
/catch
block to catch the error and handle it accordingly:
// ...
for await (const message of subjectSubscription) {
try/*(3)!*/{
const jsonMessage = jsonCodec.decode(message.data);
console.log(jsonMessage.foo);
} catch (_e) {
console.error/*(2)!*/("Received invalid message:", message);
}
}
- Catch the error thrown by
jsonCodec.decode
. - Print the error message to the console (or do whatever else you want to do when you receive an invalid message).
- Wrap the code that decodes the message in a
try
/catch
block.
Binary Messages
Since any messages get sent as binary messages (in fact, the JSONCodec
does nothing else than convert the JSON message to a Uint8Array
and back), there’s no way to validate that a message is supposed to be a binary message. This makes the next section even more important.
Validating the message structure¶
The second “layer” of validation is the message structure. This is where you validate that the message data contains all the properties you expect it to contain. For example, if you expect a message to contain a foo
property, you must verify its existence before using it.
For structured JSON data, we recommend that you use the zod
library for validation. This is also used in our lib.ts
file to validate the configuration. You can find more information about zod
in the library’s GitHub repository.
Let’s create a zod
schema for our JSON message in a new file called foo-message.ts
:
import {
z
} from "https://deno.land/x/zod@v3.16.1/mod.ts";
export const fooMessageSchema = z.object/*(1)!*/(({
foo: z.string()/*(2)!*/,
bar: z.number().min(-10)/*(3)!*/
});
export type FooMessage = z.infer<typeof fooMessageSchema>;//(4)!
- A
FooMessage
must be an object. - A
FooMessage
must have afoo
property that is a string. - A
FooMessage
must have abar
property that is a number and is greater than or equal to-10
. - This is a TypeScript type that represents the
FooMessage
type. While we won’t use it in this example, it’s good practice to create a type for each schema you create. This allows you to use the type anywhere in your code:
Now we can use this schema to validate the message data:
import {
fooMessageSchema
} from "./foo-message.ts";
// ...
for await (const message of subjectSubscription) {
try {
const jsonMessage = fooMessageSchema.parse/*(1)!*/(
jsonCodec.decode(message.data)
);
console.log(jsonMessage/*(2)!*/.foo);
} catch (_e) {
console.error("Received invalid message:", message);
}
}
- Validate the message data using the
fooMessageSchema
schema. This will throw an error if the message data doesn’t match the schema. - TypeScript now knows that
jsonMessage
is a validFooMessage
object. Therefore, we can access thefoo
property without any problems.
Success
If your editor has great TypeScript support and has shown you warnings/errors before, they are now gone! This is because TypeScript now knows that the jsonMessage
variable is a valid FooMessage
object. In other words, your code is now safe from invalid messages!
Binary Messages
For binary messages, you can’t use zod
to validate the message structure. Instead, you should use the Uint8Array
methods to validate the message structure. For example, you can check the length of the message data using the length
property of the Uint8Array
:
However, the exact validation required completely depends on your use case. Just make sure that your code doesn’t crash when it receives an invalid message.
Subscribing to Multiple Topics¶
So far, we’ve used the for await
loop. This is a convenient way to subscribe to a single topic. However, if you want to do more than just react to messages from a specific subject, we get into trouble. Since the for await
loop is blocking, we can’t do anything else while we’re waiting for messages.
We can solve this by wrapping the for await
loop in an async
function and calling it in a separate thread. This allows us to do other things while we’re waiting for messages:
// ...
const subjectMessages = nc.subscribe("foo");
(async () => {//(1)!
for await (const message of subjectMessages) {
// Handle messages from the "foo" subject
}
})();
// ... (2)
- Wrap the
for await
loop in anasync
function and call it immediately. This will start the subscription in parallel to the rest of the code. - Do other things while we’re waiting for messages.
Note that we’re storing the return value of nc.subscribe
in a variable outside the async
function. This is important so that we can close the subscription or check its status later.
Closing the Subscription
You can close the subscription by calling the unsubscribe
method on the subscription object:
You must call unsubscribe
on the subscription object. Calling nc.unsubscribe
will unsubscribe from all subscriptions!
This now allows us to subscribe to multiple topics:
// ...
const fooMessages = nc.subscribe("foo");//(1)!
(async () => {
for await (const message of fooMessages) {
// Handle messages from the "foo" subject
}
})();
const barMessages = nc.subscribe("bar");//(2)!
(async () => {
for await (const message of barMessages) {
// Handle messages from the "bar" subject
if (shouldUnsubscribeFoo(message))
fooMessages.unsubscribe/*(3)!*/();
if (shouldUnsubscribeBar(message))
barMessages.unsubscribe/*(4)!*/();
}
})();
await Promise.all/*(5)!*/([
fooMessages.closed,
barMessages.closed
]);
console.log("All subscriptions closed!");//(6)!
- Subscribe to the
foo
subject. - Subscribe to the
bar
subject (in parallel to thefoo
subscription). - Unsubscribe from the
foo
subject if theshouldUnsubscribeFoo
function returnstrue
. - Unsubscribe from the
bar
subject if theshouldUnsubscribeBar
function returnstrue
. - Wait for both subscriptions to close. This will happen when the
unsubscribe
method is called on the subscription object.The
closed
property is aPromise
that resolves when the subscription is closed.Promise.all
is a convenient way to wait for multiple promises to resolve. It returns aPromise
that resolves when all promises passed to it have resolved. - Log a message when both subscriptions are closed.
Queue Groups¶
Info
Queue groups are a way to distribute messages between multiple subscribers. If you have multiple subscribers to a subject, you can use queue groups to distribute messages between them. This is useful if you want to distribute messages between multiple instances of a service (for example, if you want to scale your service horizontally because processing a message takes too long).
All you have to do to use queue groups is to pass a queue
option to the subscribe
method. You can use any string as the queue name, but by its definition, the SERVICE_NAME
configuration parameter works perfect for this. For convenience, this gets exposed as serviceName
on the object returned by startService
:
// ...
const {
nc,
serviceName/*(1)!*/
} = await startService();
const fooMessages = nc.subscribe(
"foo",
{queue: serviceName/*(2)!*/}
);
(async () => {
for await (const message of fooMessages) {
// Handle messages from the "foo" subject
}
})();
// ...
- Get the
serviceName
from the object returned bystartService
. - Pass the
serviceName
as thequeue
option to thesubscribe
method.
If you now run multiple instances of your service, you’ll see that messages are distributed between them. This is because the queue
option tells the message bus to distribute messages between all subscribers with the same queue name.
Service names in development mode
When you run your service in development mode, the serviceName
will be generated. This means that you’ll get a different service name every time you start your service. To avoid this, you can either set the SERVICE_NAME
environment variable or pass a service name via the CLI:
Wildcards¶
Wildcards are a way to subscribe to multiple subjects at once. This is useful if you want to subscribe to multiple subjects that have a common prefix. For example, you could have a service that handles all requests to the /api
endpoint. You could then subscribe to all requests to the /api
endpoint by subscribing to the api.>
subject.
There are two types of wildcards: *
and >
. The *
wildcard matches a single token. The >
wildcard matches one or more tokens. For example, the api.*
subject matches api.foo
and api.bar
, but not api.foo.bar
. The api.>
subject matches api.foo
, api.bar
, and api.foo.bar
.
You can use wildcards in the subscribe
method and then use the subject
property of the message to check which subject the message was sent to:
// ...
/**
* A simple key-value store.
*/
const store: Record<string, unknown> = {};
const kvMessages = nc.subscribe/*(1)!*/("kv.>");
(async () => {
for await (const message of kvMessages) {
try {
const [_kv, action, ...keyParts] =
message.subject.split/*(2)!*/(".");
const key = keyParts.join(".");
if (action === "get") {
// retrieve the value from the store
message.respond(
jsonCodec.encode(store[key])
);
} else if (action === "set") {
// set the value in the store
store[key] = jsonCodec.decode(message.data);
message.respond(jsonCodec.encode({ok: true});
}
} catch (error) {
message.respond(
jsonCodec.encode({error: error.message})
);
}
}
})();
- Subscribe to the
kv.>
subject. This matches all subjects that start withkv.
. - Split the subject into tokens. The first token is
kv
, the second token is the action, and the rest of the tokens are the key. We store these into variables using array destructuring.
In this example, we subscribe to the foo.*
subject. We then use the subject
property of the message to check which action was requested. If the action is get
, we get the value from the store
object and respond with it. If the action is set
, we set the value in the store
object.
For example, if we send a message to the foo.get.bar
subject, we’ll get the value of the bar
key in the store
object. If we send a message to the foo.set.bar
subject with the value 42
, we’ll set the value of the bar
key in the store
object to 42
.
Success
Woohoo! You’ve just re-implemented a key-value store using the message bus, which (with a few convenience features on top) is an essential part of Telestion’s standard services!
Request/Reply¶
So far, we’ve looked at publishing messages and subscribing to messages. However, there’s one more thing we can do with the message bus: request/reply.
Request/reply is a pattern where one service sends a request to another service and waits for a response. This is useful if you want to get data from another service. For example, you could have a service that stores data in a database. Other services can then request data from this service.
Sending a Request¶
Let’s start by looking at how we can send a request. We can use the request
method on the NatsConnection
object to send a request. This makes it incredibly easy to send a request:
// ...
const response = await nc.request/*(1)!*/(
"fooRequest"/*(2)!*/,
jsonCodec.encode({foo: "bar"})/*(3)!*/
);
console.log(response.data);
- Call the
request
method on theNatsConnection
object. This method returns aPromise
that resolves when the response is received. The response has the same form as the messages we’ve already seen in ourfor await
loops. - Specify the subject to send the request to.
- Encode the request message data using the
jsonCodec
codec. This is the same as we’ve done before.
Tip: Specifying a timeout
As it is, our code will wait forever for a response. This is probably not what we want. We can specify a timeout by passing a second argument to the request
method:
This will cause the request
method to reject the Promise
if no response is received within 1000 milliseconds. Make sure to handle the rejection by handling it appropriately.
Handling a Request¶
Now that we know how to send a request, let’s look at how we can handle a request. We can use the subscribe
method on the NatsConnection
object to subscribe to a subject. This allows us to handle requests:
// ...
const requestMessages = nc.subscribe/*(1)!*/("fooRequest");
(async () => {
for await (const message of requestMessages) {//(2)!
message.respond/*(3)!*/(jsonCodec.encode({bar: "baz"}));
}
})();
- Subscribe to the
fooRequest
subject as usual. - Iterate over the messages received from the
fooRequest
subject as usual. - Respond to the request by calling the
respond
method on the message object. This method takes a single argument: the response message data. This is the same as we’ve done before.
Tip
The message
received from the fooRequest
subject is the same as the message
received from the foo
subject. This means that we can use the same steps to handle the message as we’ve done before if we need the data to handle the request.
Related Links¶
While we’ve covered the basics of interacting with the message bus in this article, there are a few more things you can do with the message bus. You can find more information about the message bus in the NATS documentation. While the connection to the message bus is handled by the startService
function, topics like receiving and sending messages are covered more extensively (including useful concepts like request/reply, queue groups, etc.) in the NATS documentation.