Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

How to write Junit test cases for the start method...???

package com.serendio.code;

import java.util.ArrayList; import java.util.HashMap; import java.util.Map;

import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import twitter4j.FilterQuery; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import twitter4j.json.DataObjectFactory;

import com.cybozu.labs.langdetect.Detector; import com.cybozu.labs.langdetect.DetectorFactory; import com.cybozu.labs.langdetect.LangDetectException; /** * A Flume Source, which pulls data from Twitter's streaming API. Currently, * this only supports pulling from the sample API, and only gets new status * updates. */ public class TwitterSourceXML extends AbstractSource implements EventDrivenSource, Configurable {

private static final Logger logger =
        LoggerFactory.getLogger(TwitterSourceXML.class);

/** Information necessary for accessing the Twitter API */
private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;
private String lang_profile_path;
private String keywords_path;
private Integer tweet_counter;
private String[] keywords;

/** The actual Twitter stream. It's set up to collect raw JSON data */
private final TwitterStream twitterStream = new TwitterStreamFactory(
        new ConfigurationBuilder()
        .setJSONStoreEnabled(true)
        .build()).getInstance();

/**
 * The initialization method for the Source. The context contains all the
 * Flume configuration info, and can be used to retrieve any configuration
 * values necessary to set up the Source.
 */
public void configure(Context context) {
    consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
    consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
    accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
    accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
    // workspace_id = context.getString(TwitterSourceConstants.WORKSPACE_ID);
    lang_profile_path = context.getString(TwitterSourceConstants.LANGUAGE_PROFILE_PATH,"/home/gpadmin/new_flume/lang_profiles/profiles/");
    tweet_counter =  context.getInteger(TwitterSourceConstants.TWEET_COUNTER,10);
    System.out.println("tweet counter ***" +tweet_counter);
    String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
    keywords = keywordString.split(",");
    for (int i = 0; i < keywords.length; i++) {
        keywords[i] = keywords[i].trim();
    }
    System.out.println("keywordss*****" + keywords);
}   

/**
 * Start processing events. This uses the Twitter Streaming API to sample
 * Twitter, and process tweets.
 */
@Override
public void start() {
    // The channel is the piece of Flume that sits between the Source and Sink,
    // and is used to process events.
    final ChannelProcessor channel = getChannelProcessor();

    final Map<String, String> headers = new HashMap<String, String>();

    // The StatusListener is a twitter4j API, which can be added to a Twitter
    // stream, and will execute methods every time a message comes in through
    // the stream.
    final ArrayList<String> tweetList = new ArrayList<String>();
    try {
        //DetectorFactory.loadProfile("/home/gpadmin/new_flume/lang_profiles/profiles/");
        DetectorFactory.loadProfile(lang_profile_path);
        System.out.println("i am in loading profile");
    } catch (LangDetectException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    StatusListener listener = new StatusListener() {
        // The onStatus method is executed every time a new tweet comes in.
        public void onStatus(Status status) {

            String text = status.getText();
            if (!text.isEmpty()){
                Detector detector = null;
                try {
                    detector = DetectorFactory.create();
                } catch (LangDetectException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                detector.append(text);
                String textLang = null;
                //For checking language of tweet
                try {
                    textLang = detector.detect();
                    if (textLang.equals("en")){
                        String tweet_json = DataObjectFactory.getRawJSON(status);
                        //Adding all the tweets in list
                        tweetList.add(tweet_json);
                        if (tweetList.size()>=tweet_counter){
                            String Eventstring = StringUtils.join(tweetList,"&--&");
                            System.out.println("list siz ***" + tweetList.size());
                            //System.out.println("event ***" + Eventstring);
                            tweetList.clear();
                            headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
                            Event event = EventBuilder.withBody(Eventstring.getBytes(), headers);
                            if (!(new String(event.getBody())).isEmpty())
                            {
                                System.out.println("###in process###" + event);
                                channel.processEvent(event);
                            }
                        }
                    }
                } catch (LangDetectException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }   

        // This listener will ignore everything except for new tweets
        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
        public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
        public void onScrubGeo(long userId, long upToStatusId) {}
        public void onException(Exception ex) {}
    };

    logger.debug("Setting up Twitter sample stream using consumer key {} and" +
            " access token {}", new String[] { consumerKey, accessToken });
    // Set up the stream's listener (defined above), and set any necessary
    // security information.
    twitterStream.addListener(listener);
    twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
    AccessToken token = new AccessToken(accessToken, accessTokenSecret);
    twitterStream.setOAuthAccessToken(token);

    // Set up a filter to pull out industry-relevant tweets
    if (keywords.length == 0) {
        logger.debug("Starting up Twitter sampling...");
        twitterStream.sample();
    } else {
        logger.debug("Starting up Twitter filtering...");
        FilterQuery query = new FilterQuery()
        .track(keywords)
        .setIncludeEntities(true);
        twitterStream.filter(query);
    }
    super.start();
}

/**
 * Stops the Source's event processing and shuts down the Twitter stream.
 */
@Override
public void stop() {
    logger.debug("Shutting down Twitter sample stream...");
    twitterStream.shutdown();
    super.stop();
}

}