Sunday 20 September 2015

Setting up pyspark in Eclipse

Install Pydev plugin in Eclipse

Then navigate to

Project -> Properties -> PyDev - PYTHONPATH -> External libraries

add source folders


Set SPARK_HOME in Eclipse

in begining of your program set this:

import os


Give root directory of spark installation

Multiple OutPut Format In Hadoop

The MultipleOutputs class simplifies writing data to multiple outputs.

  • Configure a named output with a name and an OutputFormat .
  • When writing out a < key ,value pair, specify the named output to send it to.

This is accomplished by assigning names to each output, using the static
 addNamedOutput method of MultipleOutputs
For example: 
In Driver Class add following:

MultipleOutputs.addNamedOutput(job, "QuantityData", TextOutputFormat.class, NullWritable.class, Text.class);

MultipleOutputs.addNamedOutput(job, "registerValueData", TextOutputFormat.class, NullWritable.class, Text.class);

In Mapper Class add following:

public class MyParserMapper   extends
    Mapper<LongWritable, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable,Text> outs;
   public void setup(Context context)throws IOException, InterruptedException{
  outs = new MultipleOutputs<NullWritable,Text>(context);

Twitter Kafka Integration

Add these maven dependency in your project:



Create a Topic to whom producer will send the tweets:  --create  --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twitter_topic

producer will send tweets to this topic
private static final String topic = "twitter_topic";

Properties properties = new Properties();
properties.put("", "localhost:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

endpoint.trackTerms(Lists.newArrayList("Twitter-API","#TOI")); //put search keyword here

Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
secret); //Twitter authentication   (get  key, token value from )

Client client = new ClientBuilder().hosts(Constants.STREAM_HOST).endpoint(endpoint).authentication(auth).processor(new StringDelimitedProcessor(queue)).build();

//create connection to client
for (int msgRead = 0; msgRead < 1000; msgRead++) {
KeyedMessage<String, String> message = null;
try {
message = new KeyedMessage<String, String>(topic, queue.take());
System.out.println("message : \n"+message);
} catch (InterruptedException e) {

Run this program as a java application

This will act as twitter -producer for kafka

For consuming tweets from topic --zookeeper localhost:2181 --topic twitter_topic --from-beginning