Building a Simple Publisher-Subscriber Messaging Service in Python

We will start by creating a PubSub class, which will be responsible for managing the subscribers and handling the publishing and subscribing of messages. Here is the code for the PubSub class:

The PubSub Class

import threading

class PubSub:
    def __init__(self):
        self.subscribers = {}
        self.lock = threading.Lock()

    def publish(self, topic, message):
        with self.lock:
            if topic in self.subscribers:
                for subscriber in self.subscribers[topic]:
                    subscriber.receive(topic, message)

    def subscribe(self, topic, subscriber):
        with self.lock:
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            self.subscribers[topic].append(subscriber)

    def unsubscribe(self, topic, subscriber):
        with self.lock:
            if topic in self.subscribers:
                if subscriber in self.subscribers[topic]:
                    self.subscribers[topic].remove(subscriber)
                    if len(self.subscribers[topic]) == 0:
                        del self.subscribers[topic]

The PubSub class has three methods: publish, subscribe, and unsubscribe. The publish method takes a topic and a message as arguments and sends the message to all subscribers that have subscribed to the topic.

The subscribe method takes a topic and a subscriber as arguments and adds the subscriber to the list of subscribers for that topic.

The unsubscribe method takes a topic and a subscriber as arguments and removes the subscriber from the list of subscribers for that topic.

The subscribers dictionary is used to keep track of the subscribers for each topic. Each key in the dictionary represents a topic, and the value associated with the key is a list of subscribers that have subscribed to that topic.

Why Thread Lock is needed?

The lock here is used to synchronize access to the subscribers dictionary. Since the publish, subscribe, and unsubscribe methods can be called from different threads, we need to ensure that they do not modify the dictionary at the same time, which could result in unpredictable behavior.

Without the lock, two threads could potentially modify the dictionary at the same time, causing data corruption or race conditions. For example, if one thread is adding a new topic to the subscribers dictionary while another thread is removing a topic, the subscribers dictionary could become inconsistent and lead to errors.

By using the lock, we ensure that only one thread can modify the subscribers dictionary at a time, which prevents such conflicts and ensures the integrity of the data structure.

Next, we will create a Subscriber class that will represent a subscriber to the messaging service. Here is the code for the Subscriber class:

Subscriber

class Subscriber:
    def receive(self, topic, message):
        print(f"Received message '{message}' on topic '{topic}'")

The Subscriber class has a receive method that takes a topic and a message as arguments and prints out the message received on the specified topic.

Finally, we will create some sample code to demonstrate how to use the pub-sub messaging service. Here is the code for the sample:

# create a pub-sub instance
pubsub = PubSub()

# create some subscribers
sub1 = Subscriber()
sub2 = Subscriber()
sub3 = Subscriber()

# subscribe subscribers to topics
pubsub.subscribe("topic1", sub1)
pubsub.subscribe("topic1", sub2)
pubsub.subscribe("topic2", sub2)
pubsub.subscribe("topic3", sub3)

# publish messages to topics
pubsub.publish("topic1", "hello, topic1!")
pubsub.publish("topic2", "hello, topic2!")
pubsub.publish("topic3", "hello, topic3!")

# unsubscribe a subscriber from a topic
pubsub.unsubscribe("topic1", sub1)

# publish messages again to topics
pubsub.publish("topic1", "goodbye, topic1!")
pubsub.publish("topic2", "goodbye, topic2!")
pubsub.publish("topic3", "goodbye, topic3!")

In this example, we create a PubSub instance and some Subscriber instances. We then subscribe the subscribers to various topics using the subscribe method of the PubSub instance. We publish messages to the topics using the publish method of the PubSub instance. We then unsubscribe a subscriber from a topic using the unsubscribe method of the PubSub instance. Finally, we publish messages to the topics again to demonstrate that the unsubscribed subscriber no longer receives messages for that topic.

When you run this code, you should see the following output:

Received message 'hello, topic1!' on topic 'topic1'
Received message 'hello, topic1!' on topic 'topic1'
Received message 'hello, topic2!' on topic 'topic2'
Received message 'hello, topic3!' on topic 'topic3'
Received message 'goodbye, topic1!' on topic 'topic1'
Received message 'hello, topic2!' on topic 'topic2'
Received message 'goodbye, topic2!' on topic 'topic2'
Received message 'goodbye, topic3!' on topic 'topic3'

As you can see when we publish messages to a topic, all subscribers that have subscribed to that topic receive the message. When we unsubscribe a subscriber from a topic, that subscriber no longer receives messages for that topic.

That's it! You now have a simple publisher-subscriber messaging service using Python.

Your engagements are welcome. Subscribe for more such low-level design questions frequently asked in interviews