How to write Junit test cases for the start method...???
package com.serendio.code;
import java.util.ArrayList; import java.util.HashMap; import java.util.Map;
import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import twitter4j.FilterQuery; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import twitter4j.json.DataObjectFactory;
import com.cybozu.labs.langdetect.Detector; import com.cybozu.labs.langdetect.DetectorFactory; import com.cybozu.labs.langdetect.LangDetectException; /** * A Flume Source, which pulls data from Twitter's streaming API. Currently, * this only supports pulling from the sample API, and only gets new status * updates. */ public class TwitterSourceXML extends AbstractSource implements EventDrivenSource, Configurable {
private static final Logger logger =
LoggerFactory.getLogger(TwitterSourceXML.class);
/** Information necessary for accessing the Twitter API */
private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;
private String lang_profile_path;
private String keywords_path;
private Integer tweet_counter;
private String[] keywords;
/** The actual Twitter stream. It's set up to collect raw JSON data */
private final TwitterStream twitterStream = new TwitterStreamFactory(
new ConfigurationBuilder()
.setJSONStoreEnabled(true)
.build()).getInstance();
/**
* The initialization method for the Source. The context contains all the
* Flume configuration info, and can be used to retrieve any configuration
* values necessary to set up the Source.
*/
public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
// workspace_id = context.getString(TwitterSourceConstants.WORKSPACE_ID);
lang_profile_path = context.getString(TwitterSourceConstants.LANGUAGE_PROFILE_PATH,"/home/gpadmin/new_flume/lang_profiles/profiles/");
tweet_counter = context.getInteger(TwitterSourceConstants.TWEET_COUNTER,10);
System.out.println("tweet counter ***" +tweet_counter);
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
keywords = keywordString.split(",");
for (int i = 0; i < keywords.length; i++) {
keywords[i] = keywords[i].trim();
}
System.out.println("keywordss*****" + keywords);
}
/**
* Start processing events. This uses the Twitter Streaming API to sample
* Twitter, and process tweets.
*/
@Override
public void start() {
// The channel is the piece of Flume that sits between the Source and Sink,
// and is used to process events.
final ChannelProcessor channel = getChannelProcessor();
final Map<String, String> headers = new HashMap<String, String>();
// The StatusListener is a twitter4j API, which can be added to a Twitter
// stream, and will execute methods every time a message comes in through
// the stream.
final ArrayList<String> tweetList = new ArrayList<String>();
try {
//DetectorFactory.loadProfile("/home/gpadmin/new_flume/lang_profiles/profiles/");
DetectorFactory.loadProfile(lang_profile_path);
System.out.println("i am in loading profile");
} catch (LangDetectException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
StatusListener listener = new StatusListener() {
// The onStatus method is executed every time a new tweet comes in.
public void onStatus(Status status) {
String text = status.getText();
if (!text.isEmpty()){
Detector detector = null;
try {
detector = DetectorFactory.create();
} catch (LangDetectException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
detector.append(text);
String textLang = null;
//For checking language of tweet
try {
textLang = detector.detect();
if (textLang.equals ...
Comments