Showing posts with label kafka Twitter. Show all posts
Showing posts with label kafka Twitter. Show all posts

Sunday, 20 September 2015

Twitter Kafka Integration

Add these maven dependency in your project:


 <dependency>
       <groupId>com.twitter</groupId>
      <artifactId>hbc-core</artifactId>
      <version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.0</version>
</dependency>

Create a Topic to whom producer will send the tweets:


kafka-topics.sh  --create  --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twitter_topic

producer will send tweets to this topic
private static final String topic = "twitter_topic";

Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

endpoint.trackTerms(Lists.newArrayList("Twitter-API","#TOI")); //put search keyword here

Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
secret); //Twitter authentication   (get  key, token value from https://dev.twitter.com/ )

Client client = new ClientBuilder().hosts(Constants.STREAM_HOST).endpoint(endpoint).authentication(auth).processor(new StringDelimitedProcessor(queue)).build();

//create connection to client
client.connect();
for (int msgRead = 0; msgRead < 1000; msgRead++) {
KeyedMessage<String, String> message = null;
try {
message = new KeyedMessage<String, String>(topic, queue.take());
System.out.println("message : \n"+message);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(message);
}
producer.close();
client.stop();


Run this program as a java application

This will act as twitter -producer for kafka


For consuming tweets from topic 

kafka-console-consumer.sh --zookeeper localhost:2181 --topic twitter_topic --from-beginning