Skip to main content

Step 1 - Create Consumer (Subscriber)

Learning Objectives​

StepTitleConcept CoveredLearning objectivesFurther Reading
step 1Create our consumer before the Provider API even existsConsumer-driven design with Hexagonal Architecture
  • Understand use case

Hexagonal Architecture​

We recommend that you split the code that is responsible for handling the protocol specific things - for example an AWS lambda handler and the AWS SNS input body - and the piece of code that actually handles the payload.

You're may be familiar with layered architectures such as Ports and Adapters (also referred to as a Hexagonal architecture) - If not, we recommend reading Alistair Cockburn's guide.

Following a modular architecture will allow you to do this much more easily:

Ports and Adapters architecture

Our Scenario​

Let's walk through an example using a product event published through a message broker, in our instance Kafka as an example.

The consumer expects to receive a message of the following shape:

{
"id": "some-uuid-1234-5678",
"type": "spare",
"name": "3mm hex bolt",
"version": "v1",
"event": "UPDATED"
}

Adapter​

With this view, the "Adapter" will be the code that deals with the specific queue implementation. For example, it might be the lambda handler that receives the SNS message that wraps this payload, or the function that can read the message from a Kafka queue (wrapped in a Kafka specific container). Here is the kafka version:

in consumer-js-kafka/src/service/kafka.js:

const consumeProductStream = async (handler) => {
await consumer.connect();
await consumer.subscribe({ topic: "products", fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log("received message");
try {
await handler(JSON.parse(message.value.toString()));
} catch (e) {
console.error("unable to handle incoming message", e);
}
},
});
};

Port​

The "Port" is the code that is unaware of the fact it's talking to SNS or Kafka, and only deals in the domain itself - in this case the product event.

This will be the target of our test

in consumer-js-kafka/src/product/product.handler.js:

const handler = (product) => {
console.log("received product:", product);
console.log("received product event:", product.event);

if (product.event == "CREATED" || product.event == "UPDATED") {
return Promise.resolve(
repository.insert(
new Product(product.id, product.type, product.name, product.version)
)
);
} else if (product.event == "DELETED") {
return Promise.resolve(
repository.delete(
new Product(product.id, product.type, product.name, product.version)
)
);
}
throw new Error("Unable to process event");
};

Step 2​

We can now move onto step 2, where we will write a test for our event processor.

Move on to step 2