Interacting with the Message Bus¶
The message bus is a simple, lightweight, and fast way to send messages between different parts (services) of your application. It is a simple publish/subscribe system that allows you to send messages to a specific subject and have any listeners on that subject receive the message.
Running NATS for development
Now that we want to interact with the message bus, we need to have NATS running. If you’re using the --dev mode for testing your service, it’s sufficient to run the nats-server executable in a separate terminal window. This will start a local NATS server on port 4222 which is the default port for NATS.
If you have a prdouction-like setup, you’ll need to pass the NATS_USER and NATS_PASSWORD corresponding to your NATS configuration as configuration parameters to your service for authentication.
Connecting to the Message Bus¶
Connecting to the event bus is automatically handled by the startService function. It will connect to the message bus using the NATS_URL environment variable. If you’re running the service in --dev mode, this will be nats://localhost:4222 by default.
All you need to do compared to the previous examples is to omit the { nats: false } parameter from the startService function call:
import {
startService
} from "./lib.ts";
const {nc/* (1)! */} = await startService(/* (2)! */);
- Store the NATS connection in a variable called
ncfor later use. - Omit the
{ nats: false }parameter from thestartServicefunction call since we want to connect to the message bus.
Note
startService actually returns an object containing the NATS connection (nc) and a few other things. In our example, we use destructuring to only get the nc variable. This is equivalent to the following code:
Publishing Messages¶
Publishing messages is as simple as calling the publish function on the NATS connection:
- The subject (sometimes also called channel) to which the message gets published.
- The message data (also called payload or body).
However, Telestion Hub uses a specific message format for all messages sent over the message bus. A message can be either JSON or a binary message. The binary message is used for sending large amounts of data, e.g., images or video streams. The JSON message is used for all other messages.
JSON Messages¶
To send a JSON message, you need to create a JSON object and pass it to the publish function:
import {
JSONCodec,
startService
} from "./lib.ts";
// or: import { JSONCodec } from "https://deno.land/x/nats/src/mod.ts";
const {nc} = await startService();
const jsonCodec = JSONCodec();//(2)!
await nc.publish("subject", jsonCodec.encode/*(3)!*/({
foo: "some arbitrary JSON-compatible data",
bar: 42
}));
- Import the
JSONCodec(for convenience, this gets re-exported by thelib.ts, but you can also import it directly from the NATS library). - Create a new
JSONCodecinstance. - Encode the JSON object using the
JSONCodecinstance.
Binary Messages¶
To send a binary message, you need to create a Uint8Array containing the bytes and pass it to the publish function:
import {
startService
} from "./lib.ts";
const {nc} = await startService();
await nc.publish("subject", new Uint8Array([0x01, 0x02, 0x03]));
Uint8Arrays
You can learn more about how you can use Uint8Array on MDN.
Subscribing to Messages¶
There are multiple ways to subscribe to messages on a subject. The most common way is to use the subscribe function in combination with a for await loop:
import {
startService
} from "./lib.ts";
const {nc} = await startService();
const subjectSubscription/*(1)!*/ = await nc.subscribe("subject"/*(2)!*/);
for await (const message of subjectSubscription) {//(3)!
console.log(message.data);//(4)!
}
- Store the subscription in a variable called
subjectSubscriptionfor later use. - Subscribe to the
subjectsubject. - For each message received on the subject, …
- … print the message data to the console.
Unfortunately, this won’t decode our JSON messages automatically. We need to do this ourselves:
import {
JSONCodec,
startService
} from "./lib.ts";
const {nc} = await startService();
const jsonCodec = JSONCodec();
const subjectSubscription = await nc.subscribe("subject");
for await (const message of subjectSubscription) {
const jsonMessage = jsonCodec.decode(message.data);//(1)!
console.log(jsonMessage.foo);//(2)!
}
- Decode the message data using the
JSONCodecinstance. - Print the
fooproperty of the decoded JSON message to the console.
Danger
Can you spot the problem with this code? What happens if the message data doesn’t contain a foo property? Or if it’s not a JSON message at all? This would lead to our service crashing!
Never assume a message’s structure!
You should always validate the message data before using it. We’ll cover this in the next section.
Validating Messages¶
A Telestion service must validate all messages it receives. This is to ensure that the service doesn’t crash when it receives invalid messages.
Validating the message type¶
The first “layer” of validation is the message type. A message can either be a JSON message or a binary message. The jsonCodec.decode function will throw an error if the message data is not a valid JSON message. Therefore, we can use a try/catch block to catch the error and handle it accordingly:
// ...
for await (const message of subjectSubscription) {
try/*(3)!*/{
const jsonMessage = jsonCodec.decode(message.data);
console.log(jsonMessage.foo);
} catch (_e) {
console.error/*(2)!*/("Received invalid message:", message);
}
}
- Catch the error thrown by
jsonCodec.decode. - Print the error message to the console (or do whatever else you want to do when you receive an invalid message).
- Wrap the code that decodes the message in a
try/catchblock.
Binary Messages
Since any messages get sent as binary messages (in fact, the JSONCodec does nothing else than convert the JSON message to a Uint8Array and back), there’s no way to validate that a message is supposed to be a binary message. This makes the next section even more important.
Validating the message structure¶
The second “layer” of validation is the message structure. This is where you validate that the message data contains all the properties you expect it to contain. For example, if you expect a message to contain a foo property, you must verify its existence before using it.
For structured JSON data, we recommend that you use the zod library for validation. This is also used in our lib.ts file to validate the configuration. You can find more information about zod in the library’s GitHub repository.
Let’s create a zod schema for our JSON message in a new file called foo-message.ts:
import {
z
} from "https://deno.land/x/zod@v3.16.1/mod.ts";
export const fooMessageSchema = z.object/*(1)!*/(({
foo: z.string()/*(2)!*/,
bar: z.number().min(-10)/*(3)!*/
});
export type FooMessage = z.infer<typeof fooMessageSchema>;//(4)!
- A
FooMessagemust be an object. - A
FooMessagemust have afooproperty that is a string. - A
FooMessagemust have abarproperty that is a number and is greater than or equal to-10. - This is a TypeScript type that represents the
FooMessagetype. While we won’t use it in this example, it’s good practice to create a type for each schema you create. This allows you to use the type anywhere in your code:
Now we can use this schema to validate the message data:
import {
fooMessageSchema
} from "./foo-message.ts";
// ...
for await (const message of subjectSubscription) {
try {
const jsonMessage = fooMessageSchema.parse/*(1)!*/(
jsonCodec.decode(message.data)
);
console.log(jsonMessage/*(2)!*/.foo);
} catch (_e) {
console.error("Received invalid message:", message);
}
}
- Validate the message data using the
fooMessageSchemaschema. This will throw an error if the message data doesn’t match the schema. - TypeScript now knows that
jsonMessageis a validFooMessageobject. Therefore, we can access thefooproperty without any problems.
Success
If your editor has great TypeScript support and has shown you warnings/errors before, they are now gone! This is because TypeScript now knows that the jsonMessage variable is a valid FooMessage object. In other words, your code is now safe from invalid messages!
Binary Messages
For binary messages, you can’t use zod to validate the message structure. Instead, you should use the Uint8Array methods to validate the message structure. For example, you can check the length of the message data using the length property of the Uint8Array:
However, the exact validation required completely depends on your use case. Just make sure that your code doesn’t crash when it receives an invalid message.
Subscribing to Multiple Topics¶
So far, we’ve used the for await loop. This is a convenient way to subscribe to a single topic. However, if you want to do more than just react to messages from a specific subject, we get into trouble. Since the for await loop is blocking, we can’t do anything else while we’re waiting for messages.
We can solve this by wrapping the for await loop in an async function and calling it in a separate thread. This allows us to do other things while we’re waiting for messages:
// ...
const subjectMessages = nc.subscribe("foo");
(async () => {//(1)!
for await (const message of subjectMessages) {
// Handle messages from the "foo" subject
}
})();
// ... (2)
- Wrap the
for awaitloop in anasyncfunction and call it immediately. This will start the subscription in parallel to the rest of the code. - Do other things while we’re waiting for messages.
Note that we’re storing the return value of nc.subscribe in a variable outside the async function. This is important so that we can close the subscription or check its status later.
Closing the Subscription
You can close the subscription by calling the unsubscribe method on the subscription object:
You must call unsubscribe on the subscription object. Calling nc.unsubscribe will unsubscribe from all subscriptions!
This now allows us to subscribe to multiple topics:
// ...
const fooMessages = nc.subscribe("foo");//(1)!
(async () => {
for await (const message of fooMessages) {
// Handle messages from the "foo" subject
}
})();
const barMessages = nc.subscribe("bar");//(2)!
(async () => {
for await (const message of barMessages) {
// Handle messages from the "bar" subject
if (shouldUnsubscribeFoo(message))
fooMessages.unsubscribe/*(3)!*/();
if (shouldUnsubscribeBar(message))
barMessages.unsubscribe/*(4)!*/();
}
})();
await Promise.all/*(5)!*/([
fooMessages.closed,
barMessages.closed
]);
console.log("All subscriptions closed!");//(6)!
- Subscribe to the
foosubject. - Subscribe to the
barsubject (in parallel to thefoosubscription). - Unsubscribe from the
foosubject if theshouldUnsubscribeFoofunction returnstrue. - Unsubscribe from the
barsubject if theshouldUnsubscribeBarfunction returnstrue. - Wait for both subscriptions to close. This will happen when the
unsubscribemethod is called on the subscription object.The
closedproperty is aPromisethat resolves when the subscription is closed.Promise.allis a convenient way to wait for multiple promises to resolve. It returns aPromisethat resolves when all promises passed to it have resolved. - Log a message when both subscriptions are closed.
Queue Groups¶
Info
Queue groups are a way to distribute messages between multiple subscribers. If you have multiple subscribers to a subject, you can use queue groups to distribute messages between them. This is useful if you want to distribute messages between multiple instances of a service (for example, if you want to scale your service horizontally because processing a message takes too long).
All you have to do to use queue groups is to pass a queue option to the subscribe method. You can use any string as the queue name, but by its definition, the SERVICE_NAME configuration parameter works perfect for this. For convenience, this gets exposed as serviceName on the object returned by startService:
// ...
const {
nc,
serviceName/*(1)!*/
} = await startService();
const fooMessages = nc.subscribe(
"foo",
{queue: serviceName/*(2)!*/}
);
(async () => {
for await (const message of fooMessages) {
// Handle messages from the "foo" subject
}
})();
// ...
- Get the
serviceNamefrom the object returned bystartService. - Pass the
serviceNameas thequeueoption to thesubscribemethod.
If you now run multiple instances of your service, you’ll see that messages are distributed between them. This is because the queue option tells the message bus to distribute messages between all subscribers with the same queue name.
Service names in development mode
When you run your service in development mode, the serviceName will be generated. This means that you’ll get a different service name every time you start your service. To avoid this, you can either set the SERVICE_NAME environment variable or pass a service name via the CLI:
Wildcards¶
Wildcards are a way to subscribe to multiple subjects at once. This is useful if you want to subscribe to multiple subjects that have a common prefix. For example, you could have a service that handles all requests to the /api endpoint. You could then subscribe to all requests to the /api endpoint by subscribing to the api.> subject.
There are two types of wildcards: * and >. The * wildcard matches a single token. The > wildcard matches one or more tokens. For example, the api.* subject matches api.foo and api.bar, but not api.foo.bar. The api.> subject matches api.foo, api.bar, and api.foo.bar.
You can use wildcards in the subscribe method and then use the subject property of the message to check which subject the message was sent to:
// ...
/**
* A simple key-value store.
*/
const store: Record<string, unknown> = {};
const kvMessages = nc.subscribe/*(1)!*/("kv.>");
(async () => {
for await (const message of kvMessages) {
try {
const [_kv, action, ...keyParts] =
message.subject.split/*(2)!*/(".");
const key = keyParts.join(".");
if (action === "get") {
// retrieve the value from the store
message.respond(
jsonCodec.encode(store[key])
);
} else if (action === "set") {
// set the value in the store
store[key] = jsonCodec.decode(message.data);
message.respond(jsonCodec.encode({ok: true});
}
} catch (error) {
message.respond(
jsonCodec.encode({error: error.message})
);
}
}
})();
- Subscribe to the
kv.>subject. This matches all subjects that start withkv.. - Split the subject into tokens. The first token is
kv, the second token is the action, and the rest of the tokens are the key. We store these into variables using array destructuring.
In this example, we subscribe to the foo.* subject. We then use the subject property of the message to check which action was requested. If the action is get, we get the value from the store object and respond with it. If the action is set, we set the value in the store object.
For example, if we send a message to the foo.get.bar subject, we’ll get the value of the bar key in the store object. If we send a message to the foo.set.bar subject with the value 42, we’ll set the value of the bar key in the store object to 42.
Success
Woohoo! You’ve just re-implemented a key-value store using the message bus, which (with a few convenience features on top) is an essential part of Telestion’s standard services!
Request/Reply¶
So far, we’ve looked at publishing messages and subscribing to messages. However, there’s one more thing we can do with the message bus: request/reply.
Request/reply is a pattern where one service sends a request to another service and waits for a response. This is useful if you want to get data from another service. For example, you could have a service that stores data in a database. Other services can then request data from this service.
Sending a Request¶
Let’s start by looking at how we can send a request. We can use the request method on the NatsConnection object to send a request. This makes it incredibly easy to send a request:
// ...
const response = await nc.request/*(1)!*/(
"fooRequest"/*(2)!*/,
jsonCodec.encode({foo: "bar"})/*(3)!*/
);
console.log(response.data);
- Call the
requestmethod on theNatsConnectionobject. This method returns aPromisethat resolves when the response is received. The response has the same form as the messages we’ve already seen in ourfor awaitloops. - Specify the subject to send the request to.
- Encode the request message data using the
jsonCodeccodec. This is the same as we’ve done before.
Tip: Specifying a timeout
As it is, our code will wait forever for a response. This is probably not what we want. We can specify a timeout by passing a second argument to the request method:
This will cause the request method to reject the Promise if no response is received within 1000 milliseconds. Make sure to handle the rejection by handling it appropriately.
Handling a Request¶
Now that we know how to send a request, let’s look at how we can handle a request. We can use the subscribe method on the NatsConnection object to subscribe to a subject. This allows us to handle requests:
// ...
const requestMessages = nc.subscribe/*(1)!*/("fooRequest");
(async () => {
for await (const message of requestMessages) {//(2)!
message.respond/*(3)!*/(jsonCodec.encode({bar: "baz"}));
}
})();
- Subscribe to the
fooRequestsubject as usual. - Iterate over the messages received from the
fooRequestsubject as usual. - Respond to the request by calling the
respondmethod on the message object. This method takes a single argument: the response message data. This is the same as we’ve done before.
Tip
The message received from the fooRequest subject is the same as the message received from the foo subject. This means that we can use the same steps to handle the message as we’ve done before if we need the data to handle the request.
Related Links¶
While we’ve covered the basics of interacting with the message bus in this article, there are a few more things you can do with the message bus. You can find more information about the message bus in the NATS documentation. While the connection to the message bus is handled by the startService function, topics like receiving and sending messages are covered more extensively (including useful concepts like request/reply, queue groups, etc.) in the NATS documentation.