Sunday, March 3, 2013

Real-Time Data Analysis using Twitter Stream

Experiment

Twitter processes half a billion tweets per day. The size of a tweet is max 140 characters. For experimentation purpose, Twitter provides these tweets to the public. Initially they were providing it through REST API and then moved onto OAuth. Twitter Streaming API makes this possible, and provides real-time streaming tweets at 1% of the Twitter's load. So that means, on an average Twitter receives 5787  ( 500000000 Tweets / (24 Hours * 60 Mins * 60 Secs) ) Tweets per second and the Streaming API sends us 58 Tweets per second. And this data is arriving constantly, so we need a mechanism to store 58 tweets every second and run real-time analytics on them. Even though each Tweet is 140 characters or 280 Bytes ( Char is 2 bytes ), the Streaming-API sends us a lot of information for each Tweet (1 Tweet = 4 KB). This information is sent in the JSON format.

The Twitter data provides a very valuable tool in the field of marketing. Vendors can do sentiment analysis using Tweets for a specific hashTag. So if Samsung wants to know how content people are about their products, they can do so with the Tweet data. As a result a lot of NLP (Natural Language Processing) field researches have started. Apart from this, we can do a lot of machine learning tasks on these Tweets.

As part of this experiment I implemented a Consumer to read the stream of JSON tweets and persist in MongoDB. Since its a write-heavy application, I load-balanced (Sharded) my MongoDB. This application keeps running forever. Now the data starts filling up my MongoDB clusters. To keep the storage minimum, I extracted and stored only TweetID, Tweeter Name, Actual Tweet, the Source of the Tweet ( eg. twitter.com, facebook.com, blackberry, etc). Then I setup a MongoDB incremental Map-Reduce job to run every 10 minutes. This job gets the statistics of unique sources and their counts. From this I generated the top 10 statistics and create chart using JFreeChart. 

Architectural Overview


Execution

Setup Twitter Account

Goto https://dev.twitter.com/apps and click "Create a new application".
Fill up all mandatory information and submit the application
Goto the tab "OAuth Tool" and note down the Consumer Key and Consumer Secret.
Run the following program by changing the consumer key and consumer secret
package com.raj.tweets;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.logging.Level;
import java.util.logging.Logger;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.User;
import twitter4j.auth.AccessToken;
import twitter4j.auth.RequestToken;
public class TwitterOAuth {
public static void main(String args[])
{
try
{
// The factory instance is re-useable and thread safe.
Twitter twitter = new TwitterFactory().getInstance();
twitter.setOAuthConsumer("<consumer-key>", "<consumer-secret>");
RequestToken requestToken = twitter.getOAuthRequestToken();
AccessToken accessToken = null;
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
while (null == accessToken)
{
System.out.println("Open the following URL and grant access to your account:");
System.out.println(requestToken.getAuthorizationURL());
System.out.print("Enter the PIN(if aviailable) or just hit enter.[PIN]:");
String pin = null;
try
{
pin = br.readLine();
} catch (IOException ex)
{
Logger.getLogger(TwitterOAuth.class.getName()).log(Level.SEVERE, null, ex);
}
try
{
if (pin.length() > 0)
{
accessToken = twitter.getOAuthAccessToken(requestToken, pin);
} else
{
accessToken = twitter.getOAuthAccessToken();
}
} catch (TwitterException te)
{
if (401 == te.getStatusCode())
{
System.out.println("Unable to get the access token.");
} else
{
te.printStackTrace();
}
}
}
//persist to the accessToken for future reference.
storeAccessToken(twitter.verifyCredentials(), accessToken);
Status status = twitter.updateStatus("Done setup..");
System.out.println("Successfully updated the status to [" + status.getText() + "].");
System.exit(0);
} catch (TwitterException ex)
{
Logger.getLogger(TwitterOAuth.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void storeAccessToken(User user, AccessToken accessToken)
{
System.out.println(user.getScreenName());
}
}
view raw gistfile1.java hosted with ❤ by GitHub

Follow the instructions of the program to generate the Access Token and Access Token Secret.
It can later be obtained from the "OAuth Tool" tab.

Setup MongoDB

MongoDB provides sharding capability on database/collections. So I setup a simple MongoDB sharding setup with 2 Laptops - 1 TB, 4 GB RAM, Toshiba Windows 7

System-1 : 192.168.1.100
System-2 : 192.168.1.101
MongoDB 2.2.3 is installed in both the laptops at c:\\apps\mongodb.
Create directories in System-1
c:\\apps\mongodb\data1
c:\\apps\mongodb\data2
c:\\apps\mongodb\data3
c:\\apps\mongodb\conf

On System1
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data1 --port 27020
On System2
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data1 --port 27020
On System1
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data2 --port 27021

c:\apps\mongodb\bin> mongod --configsvr --dbpath c:\apps\mongodb\conf --port 27022

c:\apps\mongodb\bin> mongos --configsvr --configdb 192.168.1.100:27020,192.168.1.100:27021,192.168.1.101:27020 --port 27017

c:\apps\mongodb\bin> mongo 192.168.1.100:27017

mongos> use admin
switched to admin
mongos> db.runCommand({addShard:"192.168.1.100:27020"});
{"shardAdded": "shard0000", "ok": 1}
mongos> db.runCommand({addShard:"192.168.1.100:27021"});
{"shardAdded": "shard0001", "ok": 1}
mongos> db.runCommand({addShard:"192.168.1.101:27020"});
{"shardAdded": "shard0002", "ok": 1}
mongos> db.runCommand({listShards: 1})
{
 "shards" : [
   {
     "_id"  : "shard0000",
     "host" : "192.168.1.100:27020"
   },
   {
     "_id"  : "shard0001",
     "host" : "192.168.1.100:27021"
   },
   {
     "_id"  : "shard0002",
     "host" : "192.168.1.101:27020"
   }
 ]
 "ok" : 1
}

mongos> use twitterdb
switched to db twitterdb

mongos> db.createCollection("tweets")
{"ok" : 1}

mongos> use admin
switched to db admin

mongos> db.runCommand({enableSharding: "twitterdb"})
{ "ok" : 1}

mongos> db.runCommand({shardCollection: "twitterdb.tweets", key: {id_str: 1}})
{"collectionSharded" : "twitterdb.tweets", "ok" : 1}

mongos> use twitterdb
switched to db twitterdb

mongos> db.tweets.find().count()
0

Running the Application

So we have just finished setting up the Shards and the database setup. 

RUN The below Twitter Stream Application below (please change the appropriate values as per your settings). Data Starts pumping into MongoDB. Don't forget to stop the application when you are done, else twitter stream consumes network bandwidth and the mongodb storage will shoot up.
package com.raj.tweets;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.util.JSON;
public class TwitterStreaming {
public static void main(String[] args) {
try
{
Mongo m = new Mongo("192.168.1.100", 27017);
DB db = m.getDB("twitterdb");
final DBCollection coll = db.getCollection("tweets");
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey("<consumerKey>")
.setOAuthConsumerSecret("<consumerSecret>")
.setOAuthAccessToken("<accessToken>")
.setOAuthAccessTokenSecret("<accessTokenSecret>");
StatusListener listener = new StatusListener(){
int count = 0;
public void onStatus(Status status) {
DBObject dbObj = new BasicDBObject();
dbObj.put("id_str", status.getId());
dbObj.put("name", status.getUser().getName());
dbObj.put("text", status.getText());
dbObj.put("source", status.getSource());
coll.insert(dbObj);
System.out.println(++count);
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
public void onException(Exception ex) {
ex.printStackTrace();
}
@Override
public void onScrubGeo(long arg0, long arg1) {
// TODO Auto-generated method stub
}
@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
};
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
twitterStream.addListener(listener);
twitterStream.sample();
}catch(Exception e) {
e.printStackTrace();
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub
Keep a tab on the filesystem data:-
  System:1
c:\apps\mongodb\data1
c:\apps\mongodb\data2

  System:2
c:\apps\mongodb\data1

 The data files grows continously. Verify the counts of the tweets in the database.

  mongos> db.tweets.find().count()
25043

 So 25,000 tweets accumulated in 10 mins or 40 tweets per second Find out how many people tweeted in this 10 minutes using web

  mongos> db.tweets.find({"source" : "web"}).count()
4365

Now RUN the below MapReduce Job to run every 10 minutes and aggregate the results and generate the reports as Pie-Chart. These charts will be stored in your local file system.
package com.raj.tweets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.bson.types.ObjectId;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MapReduceCommand;
import com.mongodb.MapReduceOutput;
import com.mongodb.Mongo;
import com.mongodb.QueryBuilder;
public class TwitterMapReduce {
private Mongo mongo;
private DB db;
private MapReduceCommand cmd1;
private MapReduceCommand cmd2;
private DBCollection inputCollection;
private String map;
private String reduce;
private String outputCollection;
private Map<String, Double> result = new TreeMap<String, Double>();
private Map<Double, List<String>> top10 = new TreeMap<Double, List<String>>();
private String lastKey = null;
public TwitterMapReduce() throws Exception {
mongo = new Mongo("192.168.1.100", 27017);
db = mongo.getDB("twitterdb");
inputCollection = db.getCollection("tweets");
outputCollection = "tweetstat";
map = " function() { " +
" emit(this.source, {count: 1}) " +
" }";
reduce = "function(key, values) { " +
"var sum = 0; " +
"values.forEach(function(v) { " +
"sum = sum + v['count'] "+
"}); " +
"return {count: sum}} ";
cmd1 = new MapReduceCommand(inputCollection, map, reduce, outputCollection, MapReduceCommand.OutputType.INLINE, null);
}
public void run() {
System.out.println("Running map Reduce: Last Key = " +lastKey);
DBObject obj = new BasicDBObject();
obj.put("_id", -1);
MapReduceOutput out = null;
if( lastKey == null ) {
out = inputCollection.mapReduce( cmd1 );
lastKey = inputCollection.find().sort(obj).next().get("_id").toString();
} else {
QueryBuilder qb = new QueryBuilder();
qb.put("_id").greaterThan(new ObjectId(lastKey));
cmd2 = new MapReduceCommand(inputCollection, map, reduce,
outputCollection, MapReduceCommand.OutputType.INLINE, qb.get());
out = inputCollection.mapReduce( cmd2 );
lastKey = inputCollection.find().sort(obj).next().get("_id").toString();
}
result = new TreeMap<String, Double>();
double total = 0;
double count = 0;
DBObject value = null;
String name = null;
String trimmed = null;
for (DBObject o : out.results()) {
value = (DBObject)o.get("value");
count = (Double)value.get("count");
name = (String)o.get("_id");
trimmed = trim( name );
total += count;
if( result.containsKey(trimmed)) {
result.put(trimmed, count + result.get(trimmed));
} else {
result.put(trimmed, count);
}
}
top10 = new TreeMap<Double, List<String>>(Collections.reverseOrder());
for( String s: result.keySet() ) {
double val = result.get( s );
if( top10.containsKey(val) ) {
top10.get(val).add(s);
} else {
List<String> list = new ArrayList<String>();
list.add(s);
top10.put(val, list);
}
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
TwitterMapReduce app = new TwitterMapReduce();
try {
while( true ) {
DateFormat formatter = new SimpleDateFormat("dd-MMM-yyyy-hh_mm_ss");
String today = formatter.format(new Date());
app.run();
if( !app.top10.isEmpty() ) {
new TwitterChart("Twitter Stream", today, app.top10);
}
Thread.sleep(1000*60*10); //10 minutes
}
}catch(Exception e) {
e.printStackTrace();
}
}
public static String trim(String s) {
if( s.indexOf("href") != -1) {
s = s.substring(s.indexOf("href"), s.indexOf("rel") - 2);
if( s.indexOf("http://") != -1 ) {
s = s.substring(s.indexOf("http://") + 7);
} else if( s.indexOf("https://") != -1) {
s = s.substring(s.indexOf("https://") + 8);
}
if( s.indexOf("/") != -1 ) {
s = s.substring(0, s.indexOf("/"));
}
}
return s;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
Chart Utility
package com.raj.tweets;
import java.io.File;
import java.util.List;
import java.util.Map;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartUtilities;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.plot.PiePlot3D;
import org.jfree.data.general.DefaultPieDataset;
import org.jfree.data.general.PieDataset;
import org.jfree.util.Rotation;
public class TwitterChart {
private static final long serialVersionUID = 1L;
public TwitterChart(String applicationTitle, String chartTitle, Map<Double, List<String>> map) throws Exception {
// This will create the dataset
PieDataset dataset = createDataset( map );
// based on the dataset we create the chart
JFreeChart chart = createChart(dataset, chartTitle);
ChartUtilities.saveChartAsJPEG(new File("c:\\project\\Twitter\\" +chartTitle+ ".jpg"), chart, 500, 500);
}
/**
* Creates a sample dataset
*/
private PieDataset createDataset( Map<Double, List<String>> top10 ) {
DefaultPieDataset result = new DefaultPieDataset();
System.out.println("****RESULTS***");
int count = 0;
for( double val: top10.keySet() ) {
result.setValue(top10.get(val).toString(), val);
count++;
if( count == 7 ) {
break;
}
}
return result;
}
/**
* Creates a chart
*/
private JFreeChart createChart(PieDataset dataset, String title) {
JFreeChart chart = ChartFactory.createPieChart3D(title, // chart title
dataset, // data
true, // include legend
true,
false);
PiePlot3D plot = (PiePlot3D) chart.getPlot();
plot.setStartAngle(290);
plot.setDirection(Rotation.CLOCKWISE);
plot.setForegroundAlpha(0.5f);
return chart;
}
}
view raw gistfile1.java hosted with ❤ by GitHub


Download

The file can be downloaded here containing the entire project.

1 comment:

  1. i have some doubts regarding the above mentioned experiment need a bit of help can anyone please guide me for the same

    ReplyDelete

UA-36403895-1