Learn By Building A Data Pipeline – Part 0

I just wanted to build something where I will be forced to explore and learn new things. I didn’t really know where to start so I started going back to one thing I have always thought had the potential to build interesting things. Reddit’s PRAW library, which is the python wrapper for reddit api.

Motivation and Initial Idea.

Reddit.com is controversial for many reasons, but it does have a lot of content that gets posted everyday and I really appreciate that they have opened up a pretty significant part of their data through PRAW. Some people have used this to build mobile clients for the website and others to build interesting bots on the website. I don’t have an intention to build anything production ready, but I am going to use it as a data source and build a real time data pipeline for performing analytics.

To start off I had the following rough sketch on my mind.

Alt Text

The Producer produces a stream of data, continuously posting it to the Consumer which is continuously listening for data to come in. The Consumer will then try to perform some necessary processing and eventually store the data to the Datastore. This is pretty basic data pipeline that I have seen referenced in various books and blog posts.

The PRAW library has a nifty feature to generate a stream of submissions or posts.

Once we instantiate a reddit instance with necessary authentication information, we can access the stream as below.

all_ = reddit.subreddit('all')
for post in all_.stream.submissions():
    process(post)

The process function here is a placeholder for any processing or extracting we’d like to do on the data. Each post contains information like creation time, url, title text, user who posted it and the subreddit it belongs to. In the spirit of approaching the task in small iterations I have decided to limit the data being extracted to the subreddit name. This will allow me to implement a counter that will track the number of posts being posted on reddit under each subreddit name. As one of the outputs to our task, we can hope to view some kind of tabular data showing subreddit names with respective number of posts. Of course, the number of posts here is merely the number of posts from the time we start listening on the stream and doesn't relate to overall number of posts in a lifetime. It was really helpful in visualising this before implementing any code, so as to have a clear goal in mind.

First Steps in Implementation.

The very next question that came to my mind was how should I architect this system. I can very simply write a function that has the above code displayed, and another function that can consume the data eventually populating a dictionary of subreddit_name:count. But that would be just too trivial. As I was in the mood for exploring. Instead of treating each box as a function I decided to treat them as a service. Something I could place inside a docker container. And similarly instead of maintaining a dictionary I decided to make use of redis key value store, which also can be put inside a docker container.

As I have never done anything complex on docker before, I had to spent sometime familiarizing myself with its features. Thankfully as I had a clear goal in mind and a diagram on paper I was able to quickly narrow down the specific docker commands and syntax that was needed.

I decided to use Flask to build the Consumer API. A POST endpoint that’ll will accept a message payload containing the subreddit name.

from flask import Flask, escape, request, jsonify
import redis
import time

app = Flask(__name__)


cache = redis.Redis(host='redis', port=6379)


def store_count(subreddit):
    """
    Store number of posts for each subreddit.
    """
    return cache.incr(f'subreddit-{subreddit}')


@app.route('/posts', methods=['POST'])
def consume_posts():
    data = request.json
    subreddit = data['subreddit']
    print(f'Received a post from {subreddit}')
    store_count(subreddit)
    return jsonify({'success': True})

I configured the Consumer flask app with the help of this dev.to post.

For the Producer I wrote a simple python script that called the POST API on Consumer.

def publish(data):
    """
    Post data to a consumer api.
    """
    response = requests.post(
        publish_url,
        data=json.dumps(data),
        headers={'content-type':'application/json'}
    )
    return response


def read():
    """
    Read reddit's stream of submissions on r/all.
    """
    count = 0
    for post in all_.stream.submissions():
        # Let's start with publishing
        # just the subreddit name
        if count > 100: break
        data = {'subreddit': post.subreddit.display_name}
        publish(data)
        print(f'Published {count+1} posts.')
        count += 1
        time.sleep(2) # add a bit of delay

I dockerized the script with the help of the below dev.to post.

Eventually I figured that I need to use docker-compose in order to operate multiple containers, so after a brief exploration I ended up with the following configuration for docker-compose.yml

version: '3'
services:
  consumer:
    build: consumer-api/
    ports:
      - "5000:5000"
  redis:
    image: "redis:alpine"
  producer:
    build: producer/

I realized I needed my Producer to wait until my Consumer is ready. So I added a bit of delay ahead of starting the stream.

if __name__ == '__main__':
    time.sleep(20)
    print("---- Starting the reddit post stream ----")
    read()

Seeing The Initial Result.

After some trials I eventually managed to process the 101 posts from reddit. Opening up the redis-cli via docker-exec command I saw the keys created.

docker exec -it e0407395d363 'redis-cli'
127.0.0.1:6379> keys subreddit-*
  1) "subreddit-Advice"
  2) "subreddit-worldnews"
  3) "subreddit-houstonr4r"
  4) "subreddit-buildapc"
  5) "subreddit-FootFetish"
  6) "subreddit-MrLove"
  7) "subreddit-bootyshorts"
  8) "subreddit-u_AujasCybersecurity"
  9) "subreddit-u_KeyGur2"
 10) "subreddit-Fire_Emblem_R34"
 11) "subreddit-RaidShadowLegends"
 12) "subreddit-youngpeopleyoutube"
 13) "subreddit-germany"

127.0.0.1:6379> get subreddit-Coronavirus
"3"
127.0.0.1:6379> get subreddit-CoronavirusUK
"1"

Observations and Next Steps

I initially reckoned that my approach in using a Flask API might not be scalable and I might see the delays in processing of the http requests. But I realised that way I was looping over each post from the reddit stream and synchronously calling the publish meant that I really am not achieving true “real time” capabilities here. So for the next steps I will explore minimizing the overhead in publishing the stream data onto the Consumer and at the same time try to refactor the Consumer to be reactive and event driven.

I will also need to write a “Viewer” service that will read the data present in redis and display them in a readable way.

Having a clear goal in mind helped me in making a quick progress without getting bogged on details. Reddit's PRAW library handy stream feature minimized the time to start putting code on editor.

Conclusion.

I completed the very basic first step in my learning and understanding of real time data streaming systems. Although what I have built cannot be categorized as too trivial, it is far from being close complete and has a big scope for improvement. In the coming days and weeks I would like to explore various ways to enhance this data pipeline.

Thanks for reading. Please feel free to give me any feedback.

Cheers.