Adam Drake
  • Latest
  • About
  • Case Studies
  • Contact
  • Posts
  • Press
  • 2014-06-11 00:00:00 +0000 UTC
    Adam Drake

    Adam Drake

    Jun 11, 2014

    Using Twitter as a Stream Processing Source

    Recently, I was providing a lecture on stream processing and planned to use the Twitter streaming API as an example. To provide a framework for the attendees, I created the one below. It has been tested with Python 3.4 but as it does not include testing or other things it should only be used as an example.

    from threading import Thread
    from collections import deque
    from twython import TwythonStreamer
    from requests.exceptions import ChunkedEncodingError
    

    For the imports, we need them for the following reasons:

    Thread as the consumer of the Twitter stream will run in a separate thread.

    We use a double-ended queue (deque) as a lock-free way to move messages from one thread to another. Most operations on the deque are lock-free/atomic, and we will use append() and popleft() in this case and for that reason.

    TwythonStreamer is a Twitter Streaming API class from the twython library (pip install twython if you don’t have it).

    The ChunkedEncodingError exception is used to handle the case when the API responds with the wrong number of bytes, which it is known to do. In case that happens, we just call the stream consumer again with the same deque.

    class TwitterStream(TwythonStreamer):
    
        def __init__(self, consumer_key, consumer_secret, token, token_secret, tqueue):
            self.tweet_queue = tqueue
            super(TwitterStream, self).__init__(consumer_key, consumer_secret, token, token_secret)
    
        def on_success(self, data):
            if 'text' in data:
                self.tweet_queue.append(data)
    
        def on_error(self, status_code, data):
            print(status_code)
            # Want to stop trying to get data because of the error?
            # Uncomment the next line!
            # self.disconnect()
    

    The TwitterStream class extends the TwythonStreamer class and provides information on what to do when a tweet is received and on errors. In this case, when the tweet is received we append it to the deque and on errors we simply print.

    def stream_tweets(tweets_queue):
        # Input your credentials below
        consumer_key = ''
        consumer_secret = ''
        token = ''
        token_secret = ''
        try:
            stream = TwitterStream(consumer_key, consumer_secret, token, token_secret, tweets_queue)
            # You can filter on keywords, or simply draw from the sample stream
            #stream.statuses.filter(track='twitter', language='en')
            stream.statuses.sample(language='en')
        except ChunkedEncodingError:
            # Sometimes the API sends back one byte less than expected which results in an exception in the
            # current version of the requests library
            stream_tweets(tweet_queue)
    

    This is the function that does most of the work, connecting to the streaming API, starting to sample from it, and handling the ChunkedEncodingError exceptions mentioned earlier.

    def process_tweets(tweets_queue):
        while True:
            if len(tweets_queue) > 0:
                #  Do something with the tweets
                print(tweets_queue.popleft())
    

    The process_tweets function is a small helper function that (inefficiently) loops over the deque and processes the contents element-wise.

    if __name__ == '__main__':
    
        tweet_queue = deque()
    
        tweet_stream = Thread(target=stream_tweets, args=(tweet_queue,), daemon=True)
        tweet_stream.start()
    
        process_tweets(tweet_queue)
    

    Here we tie everything together by constructing the deque, starting the thread for the stream which will append to the deque, and the process tweet function that will handle the tweets.

    So that was an easy skeleton for playing around with streams from Twitter, you can find the code in the twitterstreamtemplate repository on GitHub.


Adam Drake leads technical business transformations in global and multi-cultural environments. He has a passion for helping companies become more productive by improving internal leadership capabilities, and accelerating product development through technology and data architecture guidance. Adam has served as a White House Presidential Innovation Fellow and is an IEEE Senior Member.