Friday, April 19, 2013

My No SQL - Big Data Presentation to Wellington - NZ

Raju Rama Krishna
Wellington, NZ
17th May
Topics:- High Scalability, MongoDB, Redis

WJUG 2013-04-17: Raju Ramakrishna - NoSQL Family from John Hurst on Vimeo.


Sunday, March 31, 2013

Neo4j Graph Case Study - Facebook & Trip Planner

NoSQL Graph Database - Neo4j

Graphs are everywhere - social networks, routing algorithms, payment flow, and many more. Facebook friends, Twitter followers, LinkedIn graph are a few scenarios for Graph. Quite recently Facebook released Social Graph search. Neo4j is an open source NOSQL Graph database, immensely popular and being used by companies like Adobe, Intuit. Twitter uses FlockDB for the Graphs. 

Download and extract Neo4j Community edition from http://www.neo4j.org/download. Start the server by running /bin/Neo4j.bat. The Neo4j monitoring tool can be accessed from http://localhost:7474/webadmin/. The monitoring tool provides several utilities as shown below. 


Gremlin - A Groovy tool for querying Graph data


Neo4j Monitoring console


Neo4j Data Browser

Apart from the above utilities, Neo4j provides support for Graph Query language called Cypher. Since Ne04j is built on Java, it runs on JVM and can be used as a Embedded database. Or it could be used as a standalone database, through REST api. There is another interesting tool called Neoclipse which is very rich in visualizing Graphs. The following Graphs for facebook friends and railway routes are generated with Neoclipse.



Applications of Neo4j

1. Facebook friend connect

  • Login to Facebook and follow the instructions from here to download the Facebook Graph data.
  • Next, run the following program to download the Profile photos from Facebook to your local machine. The photos are used only for visualization purpose for neoclipse. This step uses the data downloaded from the previous step. change the "file" variable.
  • package com.raj.facebook.photos;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.FileReader;
    import java.net.URL;
    import java.nio.channels.Channels;
    import java.nio.channels.ReadableByteChannel;
    public class PhotoFetcher {
    public static void main(String[] args) throws Exception {
    String dir = "C:\\Project\\workspace\\FacebookGraph\\resources\\images\\";
    File file = new File("C:\\Project\\workspace\\FacebookGraph\\resources\\Raju Ramakrishna_1364522831.graphml");
    BufferedReader br = new BufferedReader(new FileReader(file));
    String line = br.readLine();
    String url = null;
    String name = null;
    while( line != null ) {
    if( line.indexOf("<data key=\"pic_big\">") != -1 ) {
    if(line.indexOf(".jpg") != -1)
    url = line.substring(line.indexOf("http:"), line.indexOf(".jpg")+4);
    else
    url = line.substring(line.indexOf("http:"), line.indexOf(".gif")+4);
    URL picUrl = new URL(url);
    File outFile = new File(dir+ name + ".jpg");
    ReadableByteChannel rbc = Channels.newChannel(picUrl.openStream());
    FileOutputStream fos = new FileOutputStream(outFile);
    fos.getChannel().transferFrom(rbc, 0, 1 << 24);
    fos.close();
    }
    if( line.indexOf("node id") != -1) {
    name = line.substring(10, line.length()-2);
    }
    line = br.readLine();
    if( line.indexOf("edge id") != -1) {
    break;
    }
    }
    br.close();
    }
    }
    view raw gistfile1.java hosted with ❤ by GitHub
  • The next step is to import the graph data to Neo4j. Run the below program. I have used the data for two Facebook profiles and loaded all the friends and interconnection data.
    package com.raj.facebook.graph;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import org.neo4j.graphalgo.GraphAlgoFactory;
    import org.neo4j.graphalgo.PathFinder;
    import org.neo4j.graphdb.GraphDatabaseService;
    import org.neo4j.graphdb.Node;
    import org.neo4j.graphdb.Path;
    import org.neo4j.graphdb.Relationship;
    import org.neo4j.graphdb.RelationshipType;
    import org.neo4j.graphdb.Transaction;
    import org.neo4j.graphdb.factory.GraphDatabaseFactory;
    import org.neo4j.graphdb.index.IndexHits;
    import org.neo4j.kernel.Traversal;
    public class GraphUtils {
    public static final String DB_PATH="c://apps//neo4j-1.8.2//facebook";
    public static GraphDatabaseService graphDb = null;
    public static final String ID = "id";
    public static final String USER = "user:";
    public static void main(String[] args) throws Exception {
    new GraphUtils().createGraph();
    }
    public void createGraph() throws Exception {
    graphDb = new GraphDatabaseFactory().newEmbeddedDatabase( DB_PATH );
    registerShutdownHook( graphDb );
    File file = new File("C:\\Project\\workspace\\FacebookGraph\\resources\\Raju Ramakrishna_1364522831.graphml");
    BufferedReader br = new BufferedReader(new FileReader(file));
    String line = br.readLine();
    Node node = null;
    String key = null;
    String val = null;
    String source = null;
    String target = null;
    Transaction tx = graphDb.beginTx();
    Node myself = graphDb.createNode();
    myself.setProperty("fb_id", "Raju_Ramakrishna_100000243684316");
    myself.setProperty("uid", "100000243684316");
    myself.setProperty("name", "Raju Ramakrishna");
    myself.setProperty("sex", "male");
    myself.setProperty("mutual_friends", 27);
    myself.setProperty("likes", 16);
    myself.setProperty("friend_count", 180);
    graphDb.index().forNodes("myfriends").add(myself, "fb_id", "Raju_Ramakrishna_100000243684316");
    while( line != null ) {
    if(line.indexOf("node id") != -1) {
    val = line.substring(line.indexOf("node id")+9, line.length()-2);
    System.out.println("ID=" +val);
    node = graphDb.createNode();
    node.setProperty("fb_id", val);
    graphDb.index().forNodes("myfriends").add(node, "fb_id", val);
    Relationship relationship = myself.createRelationshipTo(node, RelTypes.FRIEND);
    relationship.setProperty("link", myself.getProperty("fb_id") + " <==> " +node.getProperty("fb_id"));
    } else if(line.indexOf("</node>") != -1) {
    System.out.println("----------");
    } else if(line.indexOf("data key") != -1){
    if(line.indexOf("uid") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Label = " +val);
    node.setProperty("uid", val);
    } else if(line.indexOf("name") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Name = " +val);
    node.setProperty("name", val);
    } else if(line.indexOf("sex") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Sex = " +val);
    node.setProperty("sex", val);
    } else if(line.indexOf("mutual_friend_count") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Mutual Friends = " +val);
    node.setProperty("mutual_friends", val);
    } else if(line.indexOf("likes_count") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Likes = " +val);
    node.setProperty("likes", val);
    } else if(line.indexOf("friend_count") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Friends = " +val);
    node.setProperty("friend_count", val);
    } else if(line.indexOf("pic_big") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("picture = " +val);
    // node.setProperty("picture", val);
    }
    } else if(line.indexOf("edge id") != -1) {
    source = line.substring(line.indexOf("source")+8, line.indexOf("target")-2);
    target = line.substring(line.indexOf("target")+8, line.indexOf("</edge>")-2);
    System.out.println(source+ " ---- " +target);
    Node sourceNode = graphDb.index().forNodes("myfriends").get("fb_id", source).next();
    Node targetNode = graphDb.index().forNodes("myfriends").get("fb_id", target).next();
    Relationship relationship = sourceNode.createRelationshipTo(targetNode, RelTypes.FRIEND);
    relationship.setProperty("link", sourceNode.getProperty("fb_id") + " <==> " +targetNode.getProperty("fb_id"));
    }
    line = br.readLine();
    }
    br.close();
    file = new File("C:\\Project\\workspace\\FacebookGraph\\resources\\Geeta Raj_1364532420.graphml");
    br = new BufferedReader(new FileReader(file));
    line = br.readLine();
    boolean nodeFound = false;
    myself = graphDb.index().forNodes("myfriends").get("fb_id", "Geeta_Raj_556485423").next();
    while( line != null ) {
    if(line.indexOf("node id") != -1) {
    val = line.substring(line.indexOf("node id")+9, line.length()-2);
    System.out.println("ID=" +val);
    IndexHits<Node> nodes = graphDb.index().forNodes("myfriends").get("fb_id", val);
    if( nodes.hasNext() ) {
    node = nodes.next();
    nodeFound = true;
    } else {
    node = graphDb.createNode();
    node.setProperty("fb_id", val);
    graphDb.index().forNodes("myfriends").add(node, "fb_id", val);
    Relationship relationship = myself.createRelationshipTo(node, RelTypes.FRIEND);
    relationship.setProperty("link", myself.getProperty("fb_id") + " <==> " +node.getProperty("fb_id"));
    }
    } else if(line.indexOf("</node>") != -1) {
    nodeFound = false;
    System.out.println("----------");
    } else if( !nodeFound && line.indexOf("data key") != -1){
    if(line.indexOf("uid") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Label = " +val);
    node.setProperty("uid", val);
    } else if(line.indexOf("name") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Name = " +val);
    node.setProperty("name", val);
    } else if(line.indexOf("sex") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Sex = " +val);
    node.setProperty("sex", val);
    } else if(line.indexOf("mutual_friend_count") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Mutual Friends = " +val);
    node.setProperty("mutual_friends", val);
    } else if(line.indexOf("likes_count") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Likes = " +val);
    node.setProperty("likes", val);
    } else if(line.indexOf("friend_count") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("Friends = " +val);
    node.setProperty("friend_count", val);
    } else if(line.indexOf("pic_big") != -1) {
    key = "CDATA";
    val = line.substring(line.indexOf(key)+key.length()+1, line.indexOf("</data>")-3);
    System.out.println("picture = " +val);
    // node.setProperty("picture", val);
    }
    } else if(line.indexOf("edge id") != -1) {
    source = line.substring(line.indexOf("source")+8, line.indexOf("target")-2);
    target = line.substring(line.indexOf("target")+8, line.indexOf("</edge>")-2);
    System.out.println(source+ " ---- " +target);
    Node sourceNode = graphDb.index().forNodes("myfriends").get("fb_id", source).next();
    Node targetNode = graphDb.index().forNodes("myfriends").get("fb_id", target).next();
    //Only if there is no link already
    PathFinder<Path> finder = GraphAlgoFactory.shortestPath(
    Traversal.expanderForTypes( RelTypes.FRIEND ), 1 );
    Iterable<Path> paths = finder.findAllPaths( sourceNode, targetNode );
    if(!paths.iterator().hasNext()) {
    Relationship relationship = sourceNode.createRelationshipTo(targetNode, RelTypes.FRIEND);
    relationship.setProperty("link", sourceNode.getProperty("fb_id") + " <==> " +targetNode.getProperty("fb_id"));
    }
    }
    line = br.readLine();
    }
    br.close();
    tx.success();
    tx.finish();
    }
    public static enum RelTypes implements RelationshipType {
    FRIEND
    }
    private static void registerShutdownHook( final GraphDatabaseService graphDb )
    {
    // Registers a shutdown hook for the Neo4j instance so that it
    // shuts down nicely when the VM exits (even if you "Ctrl-C" the
    // running example before it's completed)
    Runtime.getRuntime().addShutdownHook( new Thread()
    {
    @Override
    public void run()
    {
    graphDb.shutdown();
    }
    } );
    }
    }
    view raw gistfile1.java hosted with ❤ by GitHub
  • Now the data is ready, I implemented a search utility which - given 2 Facebook IDs, it will provide all posible ways for the 2 people to get connected. 
    package com.raj.facebook.graph;
    import org.neo4j.graphalgo.GraphAlgoFactory;
    import org.neo4j.graphalgo.PathFinder;
    import org.neo4j.graphdb.GraphDatabaseService;
    import org.neo4j.graphdb.Node;
    import org.neo4j.graphdb.Path;
    import org.neo4j.graphdb.Relationship;
    import org.neo4j.graphdb.RelationshipType;
    import org.neo4j.graphdb.factory.GraphDatabaseFactory;
    import org.neo4j.kernel.Traversal;
    public class SearchFriends {
    public static final String DB_PATH="c://apps//neo4j-1.8.2//facebook";
    public static GraphDatabaseService graphDb = null;
    public static final String ID = "id";
    public static final String USER = "user:";
    public static void main(String[] args) throws Exception {
    new SearchFriends().search();
    }
    public void search() throws Exception {
    graphDb = new GraphDatabaseFactory().newEmbeddedDatabase( DB_PATH );
    registerShutdownHook( graphDb );
    Node you = graphDb.index().forNodes("myfriends").get("fb_id", "Raju_Ramakrishna_100000243684316").next();
    Node me = graphDb.index().forNodes("myfriends").get("fb_id", "Geeta_Madye_1178898406").next();
    PathFinder<Path> finder = GraphAlgoFactory.shortestPath(
    Traversal.expanderForTypes( RelTypes.FRIEND ), 2 );
    Iterable<Path> paths = finder.findAllPaths( you, me );
    for( Path path: paths ) {
    for( Relationship relation: path.relationships()) {
    System.out.println(relation.getProperty("link"));
    }
    System.out.println();
    }
    }
    public static enum RelTypes implements RelationshipType {
    FRIEND
    }
    private static void registerShutdownHook( final GraphDatabaseService graphDb )
    {
    // Registers a shutdown hook for the Neo4j instance so that it
    // shuts down nicely when the VM exits (even if you "Ctrl-C" the
    // running example before it's completed)
    Runtime.getRuntime().addShutdownHook( new Thread()
    {
    @Override
    public void run()
    {
    graphDb.shutdown();
    }
    } );
    }
    }
    view raw gistfile1.java hosted with ❤ by GitHub

2. Railway Trip Planner

  • For this application, I used data from Indian Railways site. A few trains were randomly chosen and their routes and information were stored in local file system as a tab-delimited text. The route information contains station code, names, distances, time taken etc.  I created directories with name as train number. Each directory contains info.csv and schedule.csv. Download it and save the directories and its contents locally. It can be downloaded from here
  • Next, run the below program to import the graph data to Neo4j.
    package com.raj.railways;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import org.neo4j.graphdb.GraphDatabaseService;
    import org.neo4j.graphdb.Node;
    import org.neo4j.graphdb.Relationship;
    import org.neo4j.graphdb.RelationshipType;
    import org.neo4j.graphdb.Transaction;
    import org.neo4j.graphdb.factory.GraphDatabaseFactory;
    import org.neo4j.graphdb.index.IndexHits;
    public class DataLoader {
    public static final String DB_PATH="c://apps//neo4j-1.8.2//indianrail";
    public static GraphDatabaseService graphDb = null;
    /**
    * @param args
    */
    public static void main(String[] args) throws Exception {
    graphDb = new GraphDatabaseFactory().newEmbeddedDatabase( DB_PATH );
    registerShutdownHook( graphDb );
    Transaction tx = graphDb.beginTx();
    String loc = "C:\\Project\\dataset\\indian-railways\\";
    File mainDir = new File(loc);
    String[] dirs = mainDir.list();
    for( String s: dirs ) {
    File dir = new File(loc+"\\"+s);
    // System.out.println(dir.getName());
    File file = new File(loc + "\\" +s+ "\\info.csv");
    BufferedReader br = new BufferedReader(new FileReader(file));
    String line = br.readLine();
    String trainNumber = null;
    String trainName = null;
    while(line != null) {
    line = br.readLine();
    if( line != null ) {
    String[] sArr = line.split("\t");
    trainNumber = sArr[0];
    trainName = sArr[1];
    System.out.println(trainNumber+":"+trainName);
    }
    }
    br.close();
    Train train = new Train( trainNumber, trainName );
    file = new File(loc + "\\" +s+ "\\schedule.csv");
    br = new BufferedReader(new FileReader(file));
    line = br.readLine();
    int dist = -1;
    Node previousNode = null;
    while(line != null) {
    line = br.readLine();
    if( line != null ) {
    String[] sArr = line.split("\t");
    String stationCode = sArr[1];
    String stationName = sArr[2];
    int totalDist = Integer.parseInt(sArr[7]);
    if( dist == -1 ) {
    dist =0;
    }
    System.out.println(stationCode+":"+stationName+ " ~ " +(totalDist-dist));
    IndexHits<Node> nodes = graphDb.index().forNodes("station").get("code", stationCode);
    Node node = null;
    if( nodes.hasNext() ) {
    node = nodes.next();
    String trains = (String)node.getProperty("trains");
    trains = trains + "|" +train.toString();
    node.setProperty("trains", trains);
    } else {
    node = graphDb.createNode();
    node.setProperty("code", stationCode);
    node.setProperty("name", stationName);
    node.setProperty("trains", train.toString());
    graphDb.index().forNodes("station").add( node, "code", stationCode);
    }
    if( previousNode != null ) {
    String prevCode = (String)previousNode.getProperty("code");
    String relation = prevCode + "-" +stationCode;
    if(!graphDb.index().forRelationships("route").get("path", relation).hasNext()) {
    Relationship relationship = previousNode.createRelationshipTo(node, RelTypes.ROUTE);
    relationship.setProperty("distance", (totalDist-dist));
    graphDb.index().forRelationships("route").add(relationship, "path", relation);
    }
    }
    dist = totalDist;
    previousNode = node;
    }
    }
    br.close();
    }
    tx.success();
    tx.finish();
    }
    public static enum RelTypes implements RelationshipType {
    ROUTE
    }
    private static void registerShutdownHook( final GraphDatabaseService graphDb )
    {
    // Registers a shutdown hook for the Neo4j instance so that it
    // shuts down nicely when the VM exits (even if you "Ctrl-C" the
    // running example before it's completed)
    Runtime.getRuntime().addShutdownHook( new Thread()
    {
    @Override
    public void run()
    {
    graphDb.shutdown();
    }
    } );
    }
    }
    class Train {
    String trainNumber;
    String trainName;
    public Train(String trainNumber, String trainName) {
    super();
    this.trainNumber = trainNumber;
    this.trainName = trainName;
    }
    public String toString() {
    return trainNumber + "~" +trainName;
    }
    }
    view raw gistfile1.java hosted with ❤ by GitHub
  • Now the data is available, I created a trip planner. It provides 2 options - Search for a path between 2 stations with least distance, Search for a path between 2 stations with least number of stops. In each case print the total distance. 
    package com.raj.railways;
    import java.util.HashSet;
    import java.util.Set;
    import org.neo4j.graphalgo.GraphAlgoFactory;
    import org.neo4j.graphalgo.PathFinder;
    import org.neo4j.graphalgo.WeightedPath;
    import org.neo4j.graphdb.Direction;
    import org.neo4j.graphdb.GraphDatabaseService;
    import org.neo4j.graphdb.Node;
    import org.neo4j.graphdb.Path;
    import org.neo4j.graphdb.Relationship;
    import org.neo4j.graphdb.RelationshipType;
    import org.neo4j.graphdb.factory.GraphDatabaseFactory;
    import org.neo4j.kernel.Traversal;
    public class TrainSearch {
    public static final String DB_PATH="c://apps//neo4j-1.8.2//indianrail";
    public static GraphDatabaseService graphDb = null;
    /**
    * @param args
    */
    public static void main(String[] args) throws Exception {
    graphDb = new GraphDatabaseFactory().newEmbeddedDatabase( DB_PATH );
    registerShutdownHook( graphDb );
    new TrainSearch().search("SBC", "UBL");
    System.out.println();
    new TrainSearch().search("UBL", "BGM");
    }
    public void search( String code1, String code2 ) {
    System.out.println("**************************************");
    Node source = graphDb.index().forNodes("station").get("code", code1).next();
    Node target = graphDb.index().forNodes("station").get("code", code2).next();
    System.out.println("SOURCE: " +source.getProperty("name")+ " DESTINATION: " +target.getProperty("name"));
    //Get Least number of stops
    System.out.println();
    System.out.println("Route for least number of Stops");
    PathFinder<Path> finder = GraphAlgoFactory.shortestPath(
    Traversal.expanderForTypes( RelTypes.ROUTE ), 200 );
    Path path = finder.findAllPaths( source, target ).iterator().next();
    int total = 0;
    for(Node node : path.nodes()) {
    System.out.print(node.getProperty("name") + " -> ");
    }
    System.out.println();
    for( Relationship relation: path.relationships()) {
    total += (int)relation.getProperty("distance");
    }
    System.out.println();
    System.out.println("Distance = " +total+ " KM");
    PathFinder<WeightedPath> weightedFinder = GraphAlgoFactory.dijkstra(
    Traversal.expanderForTypes( RelTypes.ROUTE, Direction.BOTH ), "distance" );
    WeightedPath weightedPath = weightedFinder.findSinglePath( source, target );
    // Get the weight for the found path
    double weight = weightedPath.weight();
    System.out.println();
    System.out.println("Route for minimum distance");
    for(Node node : weightedPath.nodes()) {
    System.out.print(node.getProperty("name") + " -> ");
    }
    System.out.println();
    System.out.println("Distance = " +(int)weight+ " KM");
    }
    public static enum RelTypes implements RelationshipType {
    ROUTE
    }
    private static void registerShutdownHook( final GraphDatabaseService graphDb )
    {
    // Registers a shutdown hook for the Neo4j instance so that it
    // shuts down nicely when the VM exits (even if you "Ctrl-C" the
    // running example before it's completed)
    Runtime.getRuntime().addShutdownHook( new Thread()
    {
    @Override
    public void run()
    {
    graphDb.shutdown();
    }
    } );
    }
    }
    view raw gistfile1.java hosted with ❤ by GitHub
  • The output of the above search program is
    SOURCE: BANGALORE CY JN DESTINATION: HUBLI JN
    Route for least number of Stops
    BANGALORE CY JN -> YESVANTPUR JN -> DHARMAVARAM JN -> ANANTAPUR -> GUNTAKAL JN -> BELLARY JN -> HOSPET JN -> KOPPAL -> GADAG JN -> HUBLI JN ->
    Distance = 541 KM
    Route for minimum distance
    BANGALORE CY JN -> YESVANTPUR JN -> TUMKUR -> AMMASANDRA -> TIPTUR -> ARSIKERE JN -> KADUR -> BIRUR JN -> DAVANGERE -> HARIHAR -> RANIBENNUR -> HAVERI -> HUBLI JN ->
    Distance = 469 KM
    view raw gistfile1.txt hosted with ❤ by GitHub

3. Facebook Graph Search

  • Facebook Graph Search is released. It provides search based on your friends connections, likes and recommendations. So I could ask it for "Suggest me Indian Vegetarian restaurants in and around South London". There is a wonderful article on how to build this with Neo4j at http://jimwebber.org/2013/02/neo4j-facebook-graphsearch-for-the-rest-of-us/
  • I implemented the code based on the above design.  First build the Graph <
    package com.raj.socialgraph;
    import org.neo4j.graphdb.GraphDatabaseService;
    import org.neo4j.graphdb.Node;
    import org.neo4j.graphdb.RelationshipType;
    import org.neo4j.graphdb.Transaction;
    import org.neo4j.graphdb.factory.GraphDatabaseFactory;
    public class SocialGraph {
    public static final String DB_PATH="c://apps//neo4j-1.8.2//socialgraph";
    public static GraphDatabaseService graphDb = null;
    /**
    * @param args
    */
    public static void main(String[] args) {
    graphDb = new GraphDatabaseFactory().newEmbeddedDatabase( DB_PATH );
    registerShutdownHook( graphDb );
    Transaction tx = graphDb.beginTx();
    loadData();
    tx.success();
    tx.finish();
    System.out.println("Created Social Graph!!");
    }
    public static void loadData() {
    Node jim = graphDb.createNode();
    jim.setProperty("name", "Jim");
    graphDb.index().forNodes("name").add(jim, "name", "Jim");
    Node kath = graphDb.createNode();
    kath.setProperty("name", "Kath");
    graphDb.index().forNodes("name").add(kath, "name", "Kath");
    Node martin = graphDb.createNode();
    martin.setProperty("name", "Martin");
    graphDb.index().forNodes("name").add(martin, "name", "Martin");
    Node simon = graphDb.createNode();
    simon.setProperty("name", "Simon");
    graphDb.index().forNodes("name").add(simon, "name", "Simon");
    jim.createRelationshipTo(kath, RelTypes.FRIEND);
    jim.createRelationshipTo(martin, RelTypes.FRIEND);
    jim.createRelationshipTo(simon, RelTypes.FRIEND);
    martin.createRelationshipTo(simon, RelTypes.FRIEND);
    Node tandooriNights = graphDb.createNode();
    tandooriNights.setProperty("restaurant", "Tandoori Nights");
    graphDb.index().forNodes("restaurant").add(tandooriNights, "name", "Tandoori Nights");
    Node indianMischief = graphDb.createNode();
    indianMischief.setProperty("restaurant", "Indian Mischief");
    graphDb.index().forNodes("restaurant").add(indianMischief, "name", "Indian Mischief");
    Node babur = graphDb.createNode();
    babur.setProperty("restaurant", "Babur");
    graphDb.index().forNodes("restaurant").add(babur, "name", "Babur");
    Node indian = graphDb.createNode();
    indian.setProperty("category", "Indian");
    graphDb.index().forNodes("category").add(indian, "name", "Indian");
    Node vegetarian = graphDb.createNode();
    vegetarian.setProperty("category", "Vegetarian");
    graphDb.index().forNodes("category").add(vegetarian, "name", "Vegetarian");
    tandooriNights.createRelationshipTo(indian, RelTypes.CUISINE);
    indianMischief.createRelationshipTo(indian, RelTypes.CUISINE);
    babur.createRelationshipTo(indian, RelTypes.CUISINE);
    indianMischief.createRelationshipTo(vegetarian, RelTypes.CUISINE);
    Node eastDulwich = graphDb.createNode();
    eastDulwich.setProperty("neighbourhood", "East Dulwich");
    graphDb.index().forNodes("neighbourhood").add(eastDulwich, "name", "East Dulwich");
    Node forestHill = graphDb.createNode();
    forestHill.setProperty("neighbourhood", "Forest Hill");
    graphDb.index().forNodes("neighbourhood").add(forestHill, "name", "Forest Hill");
    tandooriNights.createRelationshipTo(eastDulwich, RelTypes.IN);
    indianMischief.createRelationshipTo(eastDulwich, RelTypes.IN);
    babur.createRelationshipTo(forestHill, RelTypes.IN);
    Node southwark = graphDb.createNode();
    southwark.setProperty("borough", "Southwark");
    graphDb.index().forNodes("borough").add(southwark, "name", "Southwark");
    Node southLondon = graphDb.createNode();
    southLondon.setProperty("area", "South London");
    graphDb.index().forNodes("area").add(southLondon, "name", "South London");
    Node london = graphDb.createNode();
    london.setProperty("city", "London");
    graphDb.index().forNodes("city").add(london, "name", "London");
    eastDulwich.createRelationshipTo(southwark, RelTypes.IN);
    forestHill.createRelationshipTo(southwark, RelTypes.IN);
    southwark.createRelationshipTo(southLondon, RelTypes.IN);
    southLondon.createRelationshipTo(london, RelTypes.IN);
    jim.createRelationshipTo(tandooriNights, RelTypes.LIKES);
    kath.createRelationshipTo(tandooriNights, RelTypes.LIKES);
    martin.createRelationshipTo(babur, RelTypes.LIKES);
    simon.createRelationshipTo(babur, RelTypes.LIKES);
    }
    public static enum RelTypes implements RelationshipType {
    FRIEND,
    CUISINE,
    LIKES,
    IN
    }
    private static void registerShutdownHook( final GraphDatabaseService graphDb )
    {
    Runtime.getRuntime().addShutdownHook( new Thread()
    {
    @Override
    public void run()
    {
    graphDb.shutdown();
    }
    } );
    }
    }
    view raw gistfile1.java hosted with ❤ by GitHub
  • Next run search on restaurants
    package com.raj.socialgraph;
    import org.neo4j.cypher.ExecutionEngine;
    import org.neo4j.cypher.ExecutionResult;
    import org.neo4j.graphdb.GraphDatabaseService;
    import org.neo4j.graphdb.factory.GraphDatabaseFactory;
    import org.neo4j.kernel.impl.util.StringLogger;
    import scala.collection.immutable.Map;
    public class Recommender {
    public static final String DB_PATH="c://apps//neo4j-1.8.2//socialgraph";
    public static GraphDatabaseService graphDb = null;
    /**
    * @param args
    */
    public static void main(String[] args) {
    graphDb = new GraphDatabaseFactory().newEmbeddedDatabase( DB_PATH );
    registerShutdownHook( graphDb );
    ExecutionEngine engine = new ExecutionEngine(graphDb, StringLogger.SYSTEM );
    String query = new StringBuffer("START jim = node:name(name='Jim'),")
    .append(" southwark = node:borough(name='Southwark'),")
    .append(" indian = node:category(name='Indian')")
    .append(" MATCH jim-[:FRIEND]->friend-[:LIKES]->restaurant-[:IN]->()-[:IN]->southwark,")
    .append(" restaurant-[:CUISINE]->indian,")
    .append(" jim-[:LIKES]->myrestaurant")
    .append(" WHERE restaurant.restaurant <> myrestaurant.restaurant")
    .append(" RETURN distinct restaurant.restaurant;").toString();
    ExecutionResult result = engine.execute( query );
    Map<String, Object> map = result.next();
    System.out.println( map.get("restaurant.restaurant") );
    }
    private static void registerShutdownHook( final GraphDatabaseService graphDb )
    {
    Runtime.getRuntime().addShutdownHook( new Thread()
    {
    @Override
    public void run()
    {
    graphDb.shutdown();
    }
    } );
    }
    }
    view raw gistfile1.java hosted with ❤ by GitHub

High Availability

Neo4j enterprise edition provides high availability in the form of replication and shard caching. I tried this on a 63 Million Node relationships with a cluster of 3 nodes. Planning to write up in next series....


Sunday, March 3, 2013

Real-Time Data Analysis using Twitter Stream

Experiment

Twitter processes half a billion tweets per day. The size of a tweet is max 140 characters. For experimentation purpose, Twitter provides these tweets to the public. Initially they were providing it through REST API and then moved onto OAuth. Twitter Streaming API makes this possible, and provides real-time streaming tweets at 1% of the Twitter's load. So that means, on an average Twitter receives 5787  ( 500000000 Tweets / (24 Hours * 60 Mins * 60 Secs) ) Tweets per second and the Streaming API sends us 58 Tweets per second. And this data is arriving constantly, so we need a mechanism to store 58 tweets every second and run real-time analytics on them. Even though each Tweet is 140 characters or 280 Bytes ( Char is 2 bytes ), the Streaming-API sends us a lot of information for each Tweet (1 Tweet = 4 KB). This information is sent in the JSON format.

The Twitter data provides a very valuable tool in the field of marketing. Vendors can do sentiment analysis using Tweets for a specific hashTag. So if Samsung wants to know how content people are about their products, they can do so with the Tweet data. As a result a lot of NLP (Natural Language Processing) field researches have started. Apart from this, we can do a lot of machine learning tasks on these Tweets.

As part of this experiment I implemented a Consumer to read the stream of JSON tweets and persist in MongoDB. Since its a write-heavy application, I load-balanced (Sharded) my MongoDB. This application keeps running forever. Now the data starts filling up my MongoDB clusters. To keep the storage minimum, I extracted and stored only TweetID, Tweeter Name, Actual Tweet, the Source of the Tweet ( eg. twitter.com, facebook.com, blackberry, etc). Then I setup a MongoDB incremental Map-Reduce job to run every 10 minutes. This job gets the statistics of unique sources and their counts. From this I generated the top 10 statistics and create chart using JFreeChart. 

Architectural Overview


Execution

Setup Twitter Account

Goto https://dev.twitter.com/apps and click "Create a new application".
Fill up all mandatory information and submit the application
Goto the tab "OAuth Tool" and note down the Consumer Key and Consumer Secret.
Run the following program by changing the consumer key and consumer secret
package com.raj.tweets;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.logging.Level;
import java.util.logging.Logger;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.User;
import twitter4j.auth.AccessToken;
import twitter4j.auth.RequestToken;
public class TwitterOAuth {
public static void main(String args[])
{
try
{
// The factory instance is re-useable and thread safe.
Twitter twitter = new TwitterFactory().getInstance();
twitter.setOAuthConsumer("<consumer-key>", "<consumer-secret>");
RequestToken requestToken = twitter.getOAuthRequestToken();
AccessToken accessToken = null;
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
while (null == accessToken)
{
System.out.println("Open the following URL and grant access to your account:");
System.out.println(requestToken.getAuthorizationURL());
System.out.print("Enter the PIN(if aviailable) or just hit enter.[PIN]:");
String pin = null;
try
{
pin = br.readLine();
} catch (IOException ex)
{
Logger.getLogger(TwitterOAuth.class.getName()).log(Level.SEVERE, null, ex);
}
try
{
if (pin.length() > 0)
{
accessToken = twitter.getOAuthAccessToken(requestToken, pin);
} else
{
accessToken = twitter.getOAuthAccessToken();
}
} catch (TwitterException te)
{
if (401 == te.getStatusCode())
{
System.out.println("Unable to get the access token.");
} else
{
te.printStackTrace();
}
}
}
//persist to the accessToken for future reference.
storeAccessToken(twitter.verifyCredentials(), accessToken);
Status status = twitter.updateStatus("Done setup..");
System.out.println("Successfully updated the status to [" + status.getText() + "].");
System.exit(0);
} catch (TwitterException ex)
{
Logger.getLogger(TwitterOAuth.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void storeAccessToken(User user, AccessToken accessToken)
{
System.out.println(user.getScreenName());
}
}
view raw gistfile1.java hosted with ❤ by GitHub

Follow the instructions of the program to generate the Access Token and Access Token Secret.
It can later be obtained from the "OAuth Tool" tab.

Setup MongoDB

MongoDB provides sharding capability on database/collections. So I setup a simple MongoDB sharding setup with 2 Laptops - 1 TB, 4 GB RAM, Toshiba Windows 7

System-1 : 192.168.1.100
System-2 : 192.168.1.101
MongoDB 2.2.3 is installed in both the laptops at c:\\apps\mongodb.
Create directories in System-1
c:\\apps\mongodb\data1
c:\\apps\mongodb\data2
c:\\apps\mongodb\data3
c:\\apps\mongodb\conf

On System1
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data1 --port 27020
On System2
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data1 --port 27020
On System1
c:\apps\mongodb\bin> mongod --shardsvr --dbpath c:\apps\mongodb\data2 --port 27021

c:\apps\mongodb\bin> mongod --configsvr --dbpath c:\apps\mongodb\conf --port 27022

c:\apps\mongodb\bin> mongos --configsvr --configdb 192.168.1.100:27020,192.168.1.100:27021,192.168.1.101:27020 --port 27017

c:\apps\mongodb\bin> mongo 192.168.1.100:27017

mongos> use admin
switched to admin
mongos> db.runCommand({addShard:"192.168.1.100:27020"});
{"shardAdded": "shard0000", "ok": 1}
mongos> db.runCommand({addShard:"192.168.1.100:27021"});
{"shardAdded": "shard0001", "ok": 1}
mongos> db.runCommand({addShard:"192.168.1.101:27020"});
{"shardAdded": "shard0002", "ok": 1}
mongos> db.runCommand({listShards: 1})
{
 "shards" : [
   {
     "_id"  : "shard0000",
     "host" : "192.168.1.100:27020"
   },
   {
     "_id"  : "shard0001",
     "host" : "192.168.1.100:27021"
   },
   {
     "_id"  : "shard0002",
     "host" : "192.168.1.101:27020"
   }
 ]
 "ok" : 1
}

mongos> use twitterdb
switched to db twitterdb

mongos> db.createCollection("tweets")
{"ok" : 1}

mongos> use admin
switched to db admin

mongos> db.runCommand({enableSharding: "twitterdb"})
{ "ok" : 1}

mongos> db.runCommand({shardCollection: "twitterdb.tweets", key: {id_str: 1}})
{"collectionSharded" : "twitterdb.tweets", "ok" : 1}

mongos> use twitterdb
switched to db twitterdb

mongos> db.tweets.find().count()
0

Running the Application

So we have just finished setting up the Shards and the database setup. 

RUN The below Twitter Stream Application below (please change the appropriate values as per your settings). Data Starts pumping into MongoDB. Don't forget to stop the application when you are done, else twitter stream consumes network bandwidth and the mongodb storage will shoot up.
package com.raj.tweets;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.util.JSON;
public class TwitterStreaming {
public static void main(String[] args) {
try
{
Mongo m = new Mongo("192.168.1.100", 27017);
DB db = m.getDB("twitterdb");
final DBCollection coll = db.getCollection("tweets");
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey("<consumerKey>")
.setOAuthConsumerSecret("<consumerSecret>")
.setOAuthAccessToken("<accessToken>")
.setOAuthAccessTokenSecret("<accessTokenSecret>");
StatusListener listener = new StatusListener(){
int count = 0;
public void onStatus(Status status) {
DBObject dbObj = new BasicDBObject();
dbObj.put("id_str", status.getId());
dbObj.put("name", status.getUser().getName());
dbObj.put("text", status.getText());
dbObj.put("source", status.getSource());
coll.insert(dbObj);
System.out.println(++count);
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
public void onException(Exception ex) {
ex.printStackTrace();
}
@Override
public void onScrubGeo(long arg0, long arg1) {
// TODO Auto-generated method stub
}
@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
};
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
twitterStream.addListener(listener);
twitterStream.sample();
}catch(Exception e) {
e.printStackTrace();
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub
Keep a tab on the filesystem data:-
  System:1
c:\apps\mongodb\data1
c:\apps\mongodb\data2

  System:2
c:\apps\mongodb\data1

 The data files grows continously. Verify the counts of the tweets in the database.

  mongos> db.tweets.find().count()
25043

 So 25,000 tweets accumulated in 10 mins or 40 tweets per second Find out how many people tweeted in this 10 minutes using web

  mongos> db.tweets.find({"source" : "web"}).count()
4365

Now RUN the below MapReduce Job to run every 10 minutes and aggregate the results and generate the reports as Pie-Chart. These charts will be stored in your local file system.
package com.raj.tweets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.bson.types.ObjectId;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MapReduceCommand;
import com.mongodb.MapReduceOutput;
import com.mongodb.Mongo;
import com.mongodb.QueryBuilder;
public class TwitterMapReduce {
private Mongo mongo;
private DB db;
private MapReduceCommand cmd1;
private MapReduceCommand cmd2;
private DBCollection inputCollection;
private String map;
private String reduce;
private String outputCollection;
private Map<String, Double> result = new TreeMap<String, Double>();
private Map<Double, List<String>> top10 = new TreeMap<Double, List<String>>();
private String lastKey = null;
public TwitterMapReduce() throws Exception {
mongo = new Mongo("192.168.1.100", 27017);
db = mongo.getDB("twitterdb");
inputCollection = db.getCollection("tweets");
outputCollection = "tweetstat";
map = " function() { " +
" emit(this.source, {count: 1}) " +
" }";
reduce = "function(key, values) { " +
"var sum = 0; " +
"values.forEach(function(v) { " +
"sum = sum + v['count'] "+
"}); " +
"return {count: sum}} ";
cmd1 = new MapReduceCommand(inputCollection, map, reduce, outputCollection, MapReduceCommand.OutputType.INLINE, null);
}
public void run() {
System.out.println("Running map Reduce: Last Key = " +lastKey);
DBObject obj = new BasicDBObject();
obj.put("_id", -1);
MapReduceOutput out = null;
if( lastKey == null ) {
out = inputCollection.mapReduce( cmd1 );
lastKey = inputCollection.find().sort(obj).next().get("_id").toString();
} else {
QueryBuilder qb = new QueryBuilder();
qb.put("_id").greaterThan(new ObjectId(lastKey));
cmd2 = new MapReduceCommand(inputCollection, map, reduce,
outputCollection, MapReduceCommand.OutputType.INLINE, qb.get());
out = inputCollection.mapReduce( cmd2 );
lastKey = inputCollection.find().sort(obj).next().get("_id").toString();
}
result = new TreeMap<String, Double>();
double total = 0;
double count = 0;
DBObject value = null;
String name = null;
String trimmed = null;
for (DBObject o : out.results()) {
value = (DBObject)o.get("value");
count = (Double)value.get("count");
name = (String)o.get("_id");
trimmed = trim( name );
total += count;
if( result.containsKey(trimmed)) {
result.put(trimmed, count + result.get(trimmed));
} else {
result.put(trimmed, count);
}
}
top10 = new TreeMap<Double, List<String>>(Collections.reverseOrder());
for( String s: result.keySet() ) {
double val = result.get( s );
if( top10.containsKey(val) ) {
top10.get(val).add(s);
} else {
List<String> list = new ArrayList<String>();
list.add(s);
top10.put(val, list);
}
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
TwitterMapReduce app = new TwitterMapReduce();
try {
while( true ) {
DateFormat formatter = new SimpleDateFormat("dd-MMM-yyyy-hh_mm_ss");
String today = formatter.format(new Date());
app.run();
if( !app.top10.isEmpty() ) {
new TwitterChart("Twitter Stream", today, app.top10);
}
Thread.sleep(1000*60*10); //10 minutes
}
}catch(Exception e) {
e.printStackTrace();
}
}
public static String trim(String s) {
if( s.indexOf("href") != -1) {
s = s.substring(s.indexOf("href"), s.indexOf("rel") - 2);
if( s.indexOf("http://") != -1 ) {
s = s.substring(s.indexOf("http://") + 7);
} else if( s.indexOf("https://") != -1) {
s = s.substring(s.indexOf("https://") + 8);
}
if( s.indexOf("/") != -1 ) {
s = s.substring(0, s.indexOf("/"));
}
}
return s;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
Chart Utility
package com.raj.tweets;
import java.io.File;
import java.util.List;
import java.util.Map;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartUtilities;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.plot.PiePlot3D;
import org.jfree.data.general.DefaultPieDataset;
import org.jfree.data.general.PieDataset;
import org.jfree.util.Rotation;
public class TwitterChart {
private static final long serialVersionUID = 1L;
public TwitterChart(String applicationTitle, String chartTitle, Map<Double, List<String>> map) throws Exception {
// This will create the dataset
PieDataset dataset = createDataset( map );
// based on the dataset we create the chart
JFreeChart chart = createChart(dataset, chartTitle);
ChartUtilities.saveChartAsJPEG(new File("c:\\project\\Twitter\\" +chartTitle+ ".jpg"), chart, 500, 500);
}
/**
* Creates a sample dataset
*/
private PieDataset createDataset( Map<Double, List<String>> top10 ) {
DefaultPieDataset result = new DefaultPieDataset();
System.out.println("****RESULTS***");
int count = 0;
for( double val: top10.keySet() ) {
result.setValue(top10.get(val).toString(), val);
count++;
if( count == 7 ) {
break;
}
}
return result;
}
/**
* Creates a chart
*/
private JFreeChart createChart(PieDataset dataset, String title) {
JFreeChart chart = ChartFactory.createPieChart3D(title, // chart title
dataset, // data
true, // include legend
true,
false);
PiePlot3D plot = (PiePlot3D) chart.getPlot();
plot.setStartAngle(290);
plot.setDirection(Rotation.CLOCKWISE);
plot.setForegroundAlpha(0.5f);
return chart;
}
}
view raw gistfile1.java hosted with ❤ by GitHub


Download

The file can be downloaded here containing the entire project.


Saturday, February 23, 2013

Massive Movie Recommendation System using MongoDB

MongoDB Basics & Internals

Big Data is a buzz word of today. It means Velocity, Volume and Variety of data. Google processes 24 PetaBytes of data per day. FaceBook users upload 300 Million photos and 3.2 Billion Likes/Comments per day. These are some examples of Velocity and Volume. "Variety" refers to different types of data handled by these applications. For instance, Log files have unstructured/semi-structured data. This is not what RDBMS is designed for. They are suitable for structured data and volumes not exceeding TeraBytes. Since RDBMS are disk-based they are relatively slow (IO-bound). Indexing is not a solution, when data grows inserts will get slow. Joins are big bottlenecks and hence you have to go for de-normalization.

High volume sites were mostly built on LAMP (Linux, Apache, MySQL, PHP) stack. Load-Balancing was done at web-tier and application-tier levels. However this was difficult with data-tier. So one solution people used was Memcached, acting as a caching layer in front of data-tier. They utilize a cluster of MySQL instances to split the data among these clusters. And these clusters were replicated as well. Further optimizations were done on this - MySQL Handler Socket was used to tune 500K requests per second. One of the solutions to address high-volumes was to use CDN(Content Delivery Networks) like Akamai. HTTP Accelerators like Varnish were used to handle web-tier load. 

When Google published papers on Big Table and Map-Reduce, it changed the game. Doug Cutting, who was working with Yahoo then, built Hadoop (an open-source implementation of big table and map-reduce). Several NoSQL databases appeared then. We can broadly categorize them as Document-Oriented (MongoDB, CouchDB), Graph (Neo4J), Columnar (HBase, Cassandra) Databases. MongoDB is a document-oriented database from 10Gen. FourSquare, ShutterFly, CodeAcademy are a few of the customers who uses MongoDB. 

MongoDB works on JSON datatypes and each document is a JSON Document, internally stored as BSON (Binary representation of JSON). Its void of Joins and Transactions. It supports Geo-Spatial queries. This is a USP for FourSquare to use MongoDB. Indexing, Auto-Sharding and Replication support are few more things in MongoDB. I could configure my 3 laptops to store Millions of User data. I configured laptop-1 and laptop-2 as Shards with users A-M stored on laptop-1 and N-Z on laptop-2. And laptop-3 I configured as a replica of laptop-1. Replication was immediate (less than a sec) and even offline-replication was simply awesome (bring down the replica and start again, automatically synced up). Replica Sets support voting for choosing Primary. There is support for Map-Reduce with Mongo (built on top of Hadoop). MongoDB provides several tools - important ones are mongod, mongos, mongo, mongostat. MongoDB uses Memory-mapped files to map the files to virtual memory. Hence 64-Bit machines are preferred with MongoDB. Internally the data is stored as a 2 level Linked-List. When a database (e.g. movieLens) is created it creates 3 files automatically - movieLens.ns (16 MB), movieLens.0 (64 MB) and movieLens.1 (128 MB). The movieLens.ns file is a giant HashMap that stores the metadata of the database and the collections ( tables are called collections in MongoDB). The movieLens.0 and movieLens.1 files are data files and the numbers are increased sequentially, sizes increased exponentially till it achieves 2 GB. The internal architecture and setup of MongoDB is a very interesting and a long story (for next post). 

Movie Recommendation

We will build a movie recommendation system with MongoDB to handle massive volume of data. I used the 10 Million movie ratings provided by MovieLens. Download the data (MovieLens 10M) from http://www.grouplens.org/node/73. Extract it to a location like C:\Project\Recommender. It consists of 3 main files - movies.dat, ratings.dat, and tags.dat. We will load this data to MongoDB. For this, download MongoDB from http://www.mongodb.org/downloads.  Install MongoDB in some location like c:\mongodb. Create a directory c:\mongodb\data. Start mongod with this command -
c:\mongodb\bin>  mongod --dbpath c:\mongodb\data
Now we will run the following Java application to import data onto MongoDB. For this we need MongoDB Java driver and set the Heap Size to 1 GB.

package com.mongodb.mahout;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Date;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.Mongo;
public class MongoImporter {
/**
* @param args
*/
public static void main(String[] args) {
try {
long time1 = new Date().getTime();
BufferedReader br = new BufferedReader( new FileReader( new File( "C:\\Project\\Recommender\\ml-10M100K\\movies.dat" ) ));
Mongo mongo = new Mongo("localhost");
DB db = mongo.getDB("movieLens");
DBCollection collection = db.getCollection("movies");
String s = br.readLine();
int i=0;
while( s != null ) {
String[] sArr = s.split("::");
int movie_id = Integer.parseInt( sArr[0] );
String name = sArr[1];
String title = name.substring(0, name.lastIndexOf("(")-1);
int yearOfRel = Integer.parseInt( name.substring( name.lastIndexOf("(") + 1, name.length()-1));
String[] genres = sArr[2].split("\\|");
BasicDBObject object = new BasicDBObject();
object.put("_id", movie_id);
object.put("title", title);
object.put("year_of_release", yearOfRel);
BasicDBList list = new BasicDBList();
for( String genre: genres ) {
list.add( genre );
}
object.put("genres", genres);
collection.insert( object );
i++;
s = br.readLine();
}
br.close();
System.out.println("Inserted " +i+ " Movies");
br = new BufferedReader( new FileReader( new File( "C:\\Project\\Recommender\\ml-10M100K\\tags.dat" ) ));
collection = db.getCollection("tags");
s = br.readLine();
i=0;
while( s != null ) {
String[] sArr = s.split("::");
int user_id = Integer.parseInt( sArr[0] );
int movie_id = Integer.parseInt( sArr[1] );
String tag = sArr[2];
long timeStamp = Long.parseLong( sArr[3] );
BasicDBObject object = new BasicDBObject();
object.put("user_id", user_id);
object.put("item_id", movie_id);
object.put("tag", tag);
object.put("timestamp", timeStamp);
collection.insert( object );
i++;
s = br.readLine();
}
br.close();
System.out.println("Inserted " +i+ " Tags");
br = new BufferedReader( new FileReader( new File( "C:\\Project\\Recommender\\ml-10M100K\\ratings.dat" ) ));
collection = db.getCollection("ratings");
s = br.readLine();
i=0;
while( s != null ) {
String[] sArr = s.split("::");
int user_id = Integer.parseInt( sArr[0] );
int movie_id = Integer.parseInt( sArr[1] );
String rating = sArr[2];
long timeStamp = Long.parseLong( sArr[3] );
BasicDBObject object = new BasicDBObject();
object.put("user_id", user_id);
object.put("item_id", movie_id);
object.put("preference", rating);
object.put("timestamp", timeStamp);
collection.insert( object );
i++;
s = br.readLine();
}
br.close();
System.out.println("Inserted " +i+ " Ratings");
mongo.close();
long time2 = new Date().getTime();
long timeTaken = (time2-time1)/1000;
System.out.println("Completed in " +(int)timeTaken+ " Seconds");
} catch(Exception e) {
e.printStackTrace();
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub
The output is as follows-

*********************
Inserted 10681 Movies
Inserted 95580 Tags
Inserted 10000054 Ratings
Completed in 685 Seconds
*********************

Run mongo using the following command
c:\mongodb\bin>  mongod

Now verify that the movieLens database is created. Some of the commands you can run are as below-
show dbs
show collections
db.movies.findOne()
db.ratings.find().count()

Next, create Indexes:
db.ratings.ensureIndex({user_id:1, item_id:1})
db.movies.ensureIndex({genres:1});

Next run map-reduce task to create a new Collection to store userId/RatingCount

map = function() { emit({user_id: this.user_id}, {count:1}) }

reduce = function(k, v) { var count = 0; v.forEach( function(x) { count += x['count']; }); return {count: count}; }

db.ratings.mapReduce(map, reduce, "user_ratings");

{
        "result" : "user_ratings",
        "timeMillis" : 520420,
        "counts" : {
                "input" : 10000054,
                "emit" : 10000054,
                "reduce" : 169150,
                "output" : 69878
        },
        "ok" : 1,
}

Now we are ready to create our Recommendation Engine. The one which we are going to implement is a "Collabarative Filtering" application, similar to Amazon or Netflix. The algorithms used by these sites are normally Weighted Slope-One or Singular Value Decomposition (SVD). Twitter for instance uses SVD. Apache Mahout (an open source Machine Learning solution for Big Data) provides Recommendation System. I have used a similar strategy here. 

The idea used here is- 
Users rate movies on a scale of 1-5. To know what movie to recommend for a given user (user_1), we use the ratings that he provided for movies. Find out k-neighbours (users who are similar to him in taste). From this list of users, find out the movies that they have rated. Get a weighted list of movies based on ratings, ignoring movies that he (user_1) has already rated. The basics for this are - Manhattan Distance, Eucledians Distance, Minkowski Distance, Pearson's Co-efficient and K-Nearest-Neighbors. 


The output of the program is-

*******************

Loaded 10000054 ratings
Loaded 10681 movies
Recommended for 16
Lord of the Rings: The Two Towers, The
Liar Liar
Rudy
Field of Dreams
Christmas Story, A
Gone in 60 Seconds
Remember the Titans
Striptease
Hoosiers
No Country for Old Men
Recommended for 127
Clockwork Orange, A
Fight Club
Reservoir Dogs
12 Monkeys (Twelve Monkeys)
Shawshank Redemption, The
Silence of the Lambs, The
Terminator 2: Judgment Day
One Flew Over the Cuckoo's Nest
Fargo
Shining, The

In the next post I will discuss more about how the Recommendation System works and the algorithm


Saturday, January 19, 2013

Building an online Movie Library

This is more of a PoC (Proof of Concept) for the Movie Library Site. I have taken a couple of requirements to build a working code -

Requirements
The site maintains a list of tons of movies and each movie is identified by a title, price, release date and category. To assist the users in searching movies easily - a search field is provided. So if we enter "English" in the search box, it would return all English movies in the sorted order of title. It would also provide a "Faceted Navigation" - a concept which is used widely these days. So along with displaying the list of English movies, it would categories them on the left hand side section with search results based on "Categories". And the Categories will be displaying the title and will be sorted on the total occurrences found (desc order). Following is the result page for the application we are building (displaying Kannada movies)



The next requirement is to provide Pagination for the results as shown below. In the above page, if we click on "List All" we end up with the below page -


The next functionality we want to build is for the Suppliers. The suppliers want to add movies into our library and hence we provide them with a FTP location. Suppliers would create one or more CSV Files (Comma-Separated-File) of predefined format and put it onto the FTP location. We will create a scheduler application to poll the folder, pick up the files and extract and populate it in the database. On completion of this step, the movies will become available on the site. The format of the CSV is as below-

In a real-world scenario they would be creating a zip file containing CSV files and images.








Tools/Technologies Used-
JDK 1.6 (or above)
Maven 3.0.4
Eclipse Juno
Spring 3.1.1
Hibernate 4.1.4
Lucene/Hibernate Search 4.2.0
MySQL
Spring MVC 3.1.1
Jackson 1.9.4 (for JSON)
JQuery 1.7.2
Spring Integration 2.2.0 (Spring Batch support)
Mockito Test Framework for Test Driven Development (in progress...)

Implementation

1) Download and install JDK 1.6 from here
2) Download and install Eclipse IDE from here
3) Download and install Maven from here
4) Setup environment variables as follows-          
    JAVA_HOME=C:\Apps\Java\jdk1.7.0 
    M2_HOME=C:\Apps\apache-maven-3.0.4       
    Path=%JAVA_HOME%\bin;%M2_HOME%\bin
5) Download MYSQL and install from here 
6) Set the root username/password as root/root. Open MySQL Shell and create a database with name - "TPCH"
7) Create a table "CD" with initial data. Following SQL can be used-
https://gist.github.com/4572002
8) Next download the complete movie library project from here
9) Extract it to a folder, say C:\\Projects. This will create the following directory structure-

It is a maven project with 4 maven modules (highlighted in red). Apart from that there is a lucene folder, which is used for indexing.
The main project is a POM.
Web has dependency on Service and DAO
Service is dependent on DAO
Batch is independent stand-alone depends on DAO

Maven has a standard directory structure-
src/main/resources used for configuration
src/main/java used for source code
src/main/test used for test cases


10) Open Eclipse and import as maven project (root should be C:\\Projects\spring-mockito)
11) Configuration changes (if any)-

spring-mockito-dao/src/main/resources/hibernate.cfg.xml
Ensure the connection url, username and password are as specified by you.
The property "hibernate.search.default.indexBase" is used to specify where lucene has to store its index.

spring-mockito-dao/src/main/resources/spring-dao.xml
Ensure the values are right here

12) Run the following commands from a command prompt-
C:\Project\spring-mockito>mvn clean install
C:\Project\spring-mockito\spring-mockito-web>mvn jetty:run

The first command cleans up the maven modules and creates the targets.
The second command deploys the war file and starts-up jetty (light weight server). If you want to test on tomcat, please copy the war from spring-mockito-web\target folder and deploy to tomcat directory.

13) To access the application the url is http://localhost:8080/spring-mockito-web/index.jsp
14) To test the Supplier functionality first ensure that you have a directory C:\\FileServer\input created already. You can change the location in the file
spring-mockito-batch/src/main/resources/ApplicationContext-File.xml
Run the stand-alone java class com.raj.projects.batch.client.Main from spring-mockito-batch project. This will startup the poller and the integration environment.
15) Create files with .csv extension as shown below-

You can store as many files as you want. However ensure that the Ids are unique. 

Drop the files into the folder C:\\FileServer\input

They will be picked up and processed and movie entries written to database. Then the files will be deleted.




UA-36403895-1