Add these maven dependency in your project:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.0</version>
</dependency>
Create a Topic to whom producer will send the tweets:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twitter_topic
producer will send tweets to this topic
private static final String topic = "twitter_topic";
Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Lists.newArrayList("Twitter-API","#TOI")); //put search keyword here
Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
secret); //Twitter authentication (get key, token value from
https://dev.twitter.com/ )
Client client = new ClientBuilder().hosts(Constants.STREAM_HOST).endpoint(endpoint).authentication(auth).processor(new StringDelimitedProcessor(queue)).build();
//create connection to client
client.connect();
for (int msgRead = 0; msgRead < 1000; msgRead++) {
KeyedMessage<String, String> message = null;
try {
message = new KeyedMessage<String, String>(topic, queue.take());
System.out.println("message : \n"+message);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(message);
}
producer.close();
client.stop();
Run this program as a java application
This will act as twitter -producer for kafka
For consuming tweets from topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic twitter_topic --from-beginning