Sunday, March 31, 2013

Neo4j Graph Case Study - Facebook & Trip Planner

NoSQL Graph Database - Neo4j

Graphs are everywhere - social networks, routing algorithms, payment flow, and many more. Facebook friends, Twitter followers, LinkedIn graph are a few scenarios for Graph. Quite recently Facebook released Social Graph search. Neo4j is an open source NOSQL Graph database, immensely popular and being used by companies like Adobe, Intuit. Twitter uses FlockDB for the Graphs. 

Download and extract Neo4j Community edition from http://www.neo4j.org/download. Start the server by running /bin/Neo4j.bat. The Neo4j monitoring tool can be accessed from http://localhost:7474/webadmin/. The monitoring tool provides several utilities as shown below. 


Gremlin - A Groovy tool for querying Graph data


Neo4j Monitoring console


Neo4j Data Browser

Apart from the above utilities, Neo4j provides support for Graph Query language called Cypher. Since Ne04j is built on Java, it runs on JVM and can be used as a Embedded database. Or it could be used as a standalone database, through REST api. There is another interesting tool called Neoclipse which is very rich in visualizing Graphs. The following Graphs for facebook friends and railway routes are generated with Neoclipse.



Applications of Neo4j

1. Facebook friend connect

  • Login to Facebook and follow the instructions from here to download the Facebook Graph data.
  • Next, run the following program to download the Profile photos from Facebook to your local machine. The photos are used only for visualization purpose for neoclipse. This step uses the data downloaded from the previous step. change the "file" variable.
  • The next step is to import the graph data to Neo4j. Run the below program. I have used the data for two Facebook profiles and loaded all the friends and interconnection data.
  • Now the data is ready, I implemented a search utility which - given 2 Facebook IDs, it will provide all posible ways for the 2 people to get connected. 

2. Railway Trip Planner

  • For this application, I used data from Indian Railways site. A few trains were randomly chosen and their routes and information were stored in local file system as a tab-delimited text. The route information contains station code, names, distances, time taken etc.  I created directories with name as train number. Each directory contains info.csv and schedule.csv. Download it and save the directories and its contents locally. It can be downloaded from here
  • Next, run the below program to import the graph data to Neo4j.
  • Now the data is available, I created a trip planner. It provides 2 options - Search for a path between 2 stations with least distance, Search for a path between 2 stations with least number of stops. In each case print the total distance. 
  • The output of the above search program is

3. Facebook Graph Search

  • Facebook Graph Search is released. It provides search based on your friends connections, likes and recommendations. So I could ask it for "Suggest me Indian Vegetarian restaurants in and around South London". There is a wonderful article on how to build this with Neo4j at http://jimwebber.org/2013/02/neo4j-facebook-graphsearch-for-the-rest-of-us/
  • I implemented the code based on the above design.  First build the Graph <
  • Next run search on restaurants

High Availability

Neo4j enterprise edition provides high availability in the form of replication and shard caching. I tried this on a 63 Million Node relationships with a cluster of 3 nodes. Planning to write up in next series....


Sunday, March 3, 2013

Real-Time Data Analysis using Twitter Stream

Experiment

Twitter processes half a billion tweets per day. The size of a tweet is max 140 characters. For experimentation purpose, Twitter provides these tweets to the public. Initially they were providing it through REST API and then moved onto OAuth. Twitter Streaming API makes this possible, and provides real-time streaming tweets at 1% of the Twitter's load. So that means, on an average Twitter receives 5787  ( 500000000 Tweets / (24 Hours * 60 Mins * 60 Secs) ) Tweets per second and the Streaming API sends us 58 Tweets per second. And this data is arriving constantly, so we need a mechanism to store 58 tweets every second and run real-time analytics on them. Even though each Tweet is 140 characters or 280 Bytes ( Char is 2 bytes ), the Streaming-API sends us a lot of information for each Tweet (1 Tweet = 4 KB). This information is sent in the JSON format.

The Twitter data provides a very valuable tool in the field of marketing. Vendors can do sentiment analysis using Tweets for a specific hashTag. So if Samsung wants to know how content people are about their products, they can do so with the Tweet data. As a result a lot of NLP (Natural Language Processing) field researches have started. Apart from this, we can do a lot of machine learning tasks on these Tweets.

As part of this experiment I implemented a Consumer to read the stream of JSON tweets and persist in MongoDB. Since its a write-heavy application, I load-balanced (Sharded) my MongoDB. This application keeps running forever. Now the data starts filling up my MongoDB clusters. To keep the storage minimum, I extracted and stored only TweetID, Tweeter Name, Actual Tweet, the Source of the Tweet ( eg. twitter.com, facebook.com, blackberry, etc). Then I setup a MongoDB incremental Map-Reduce job to run every 10 minutes. This job gets the statistics of unique sources and their counts. From this I generated the top 10 statistics and create chart using JFreeChart. 

Architectural Overview


Execution

Setup Twitter Account

Goto https://dev.twitter.com/apps and click "Create a new application".
Fill up all mandatory information and submit the application
Goto the tab "OAuth Tool" and note down the Consumer Key and Consumer Secret.
Run the following program by changing the consumer key and consumer secret

Follow the instructions of the program to generate the Access Token and Access Token Secret.
It can later be obtained from the "OAuth Tool" tab.

Setup MongoDB

MongoDB provides sharding capability on database/collections. So I setup a simple MongoDB sharding setup with 2 Laptops - 1 TB, 4 GB RAM, Toshiba Windows 7

System-1 : 192.168.1.100
System-2 : 192.168.1.101
MongoDB 2.2.3 is installed in both the laptops at c:\\apps\mongodb.
Create directories in System-1
c:\\apps\mongodb\data1
c:\\apps\mongodb\data2
c:\\apps\mongodb\data3
c:\\apps\mongodb\conf

On System1
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data1 --port 27020
On System2
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data1 --port 27020
On System1
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data2 --port 27021

c:\apps\mongodb\bin> mongod --configsvr --dbpath c:\apps\mongodb\conf --port 27022

c:\apps\mongodb\bin> mongos --configsvr --configdb 192.168.1.100:27020,192.168.1.100:27021,192.168.1.101:27020 --port 27017

c:\apps\mongodb\bin> mongo 192.168.1.100:27017

mongos> use admin
switched to admin
mongos> db.runCommand({addShard:"192.168.1.100:27020"});
{"shardAdded": "shard0000", "ok": 1}
mongos> db.runCommand({addShard:"192.168.1.100:27021"});
{"shardAdded": "shard0001", "ok": 1}
mongos> db.runCommand({addShard:"192.168.1.101:27020"});
{"shardAdded": "shard0002", "ok": 1}
mongos> db.runCommand({listShards: 1})
{
 "shards" : [
   {
     "_id"  : "shard0000",
     "host" : "192.168.1.100:27020"
   },
   {
     "_id"  : "shard0001",
     "host" : "192.168.1.100:27021"
   },
   {
     "_id"  : "shard0002",
     "host" : "192.168.1.101:27020"
   }
 ]
 "ok" : 1
}

mongos> use twitterdb
switched to db twitterdb

mongos> db.createCollection("tweets")
{"ok" : 1}

mongos> use admin
switched to db admin

mongos> db.runCommand({enableSharding: "twitterdb"})
{ "ok" : 1}

mongos> db.runCommand({shardCollection: "twitterdb.tweets", key: {id_str: 1}})
{"collectionSharded" : "twitterdb.tweets", "ok" : 1}

mongos> use twitterdb
switched to db twitterdb

mongos> db.tweets.find().count()
0

Running the Application

So we have just finished setting up the Shards and the database setup. 

RUN The below Twitter Stream Application below (please change the appropriate values as per your settings). Data Starts pumping into MongoDB. Don't forget to stop the application when you are done, else twitter stream consumes network bandwidth and the mongodb storage will shoot up.
Keep a tab on the filesystem data:-
  System:1
c:\apps\mongodb\data1
c:\apps\mongodb\data2

  System:2
c:\apps\mongodb\data1

 The data files grows continously. Verify the counts of the tweets in the database.

  mongos> db.tweets.find().count()
25043

 So 25,000 tweets accumulated in 10 mins or 40 tweets per second Find out how many people tweeted in this 10 minutes using web

  mongos> db.tweets.find({"source" : "web"}).count()
4365

Now RUN the below MapReduce Job to run every 10 minutes and aggregate the results and generate the reports as Pie-Chart. These charts will be stored in your local file system.
Chart Utility


Download

The file can be downloaded here containing the entire project.


UA-36403895-1