Recently, I was providing a lecture on stream processing and planned to use the Twitter streaming API as an example. To provide a framework for the attendees, I created the one below. It has been tested with Python 3.4 but as it does not include testing or other things it should only be used as an example.
from threading import Thread from collections import deque from twython import TwythonStreamer from requests.exceptions import ChunkedEncodingError
For the imports, we need them for the following reasons:
Thread as the consumer of the Twitter stream will run in a separate thread.
We use a double-ended queue (deque) as a lock-free way to move messages from one thread to another. Most operations on the deque are lock-free/atomic, and we will use
popleft() in this case and for that reason.
TwythonStreamer is a Twitter Streaming API class from the twython library (
pip install twython if you don’t have it).
ChunkedEncodingError exception is used to handle the case when the API responds with the wrong number of bytes, which it is known to do. In case that happens, we just call the stream consumer again with the same deque.
class TwitterStream(TwythonStreamer): def __init__(self, consumer_key, consumer_secret, token, token_secret, tqueue): self.tweet_queue = tqueue super(TwitterStream, self).__init__(consumer_key, consumer_secret, token, token_secret) def on_success(self, data): if 'text' in data: self.tweet_queue.append(data) def on_error(self, status_code, data): print(status_code) # Want to stop trying to get data because of the error? # Uncomment the next line! # self.disconnect()
TwitterStream class extends the
TwythonStreamer class and provides information on what to do when a tweet is received and on errors. In this case, when the tweet is received we append it to the deque and on errors we simply print.
def stream_tweets(tweets_queue): # Input your credentials below consumer_key = '' consumer_secret = '' token = '' token_secret = '' try: stream = TwitterStream(consumer_key, consumer_secret, token, token_secret, tweets_queue) # You can filter on keywords, or simply draw from the sample stream #stream.statuses.filter(track='twitter', language='en') stream.statuses.sample(language='en') except ChunkedEncodingError: # Sometimes the API sends back one byte less than expected which results in an exception in the # current version of the requests library stream_tweets(tweet_queue)
This is the function that does most of the work, connecting to the streaming API, starting to sample from it, and handling the
ChunkedEncodingError exceptions mentioned earlier.
def process_tweets(tweets_queue): while True: if len(tweets_queue) > 0: # Do something with the tweets print(tweets_queue.popleft())
The process_tweets function is a small helper function that (inefficiently) loops over the deque and processes the contents element-wise.
if __name__ == '__main__': tweet_queue = deque() tweet_stream = Thread(target=stream_tweets, args=(tweet_queue,), daemon=True) tweet_stream.start() process_tweets(tweet_queue)
Here we tie everything together by constructing the deque, starting the thread for the stream which will append to the deque, and the process tweet function that will handle the tweets.
So that was an easy skeleton for playing around with streams from Twitter, you can find the code in the twitterstreamtemplate repository on GitHub.