Thursday 24 December 2015

Create Custom Sink For Flume Sources

Steps To write Azure Sink
1.   Create plugins.d Dir in Flume_HOME(usr/lib/flume)
2     In plugins.d  dir create sink(any name  like azuresink)
3.    In azuresink dir create  three directories
a)lib ----it includes sink class ,one which you have written for custom sink
b)libext ---it includes dependencies of Sink class (all jar needed to run sink class)
c)native  contains dependencies .iso file.
     4. copy the sink jar in /usr/hdp/2.2.6.0-2800/flume/lib    (In HDP 2.2)

          In cloudera there is no need to paste it here.

Wednesday 2 December 2015

Bulk Loading Data into HBase Using Mapreduce

If you use BulkLoads with HBase, your workflow is similar to the following:
  1. Extract your data from its existing source. For instance, if your data is in a MySQL database, you might run the mysqldump command. The process you use depends on your data. If your data is already in TSV or CSV format, skip this step and use the included ImportTsv utility to process your data into HFiles. See theImportTsv documentation for details.
  2. Process your data into HFile format. See http://hbase.apache.org/book/hfile_format.html for details about HFile format. Usually you use a MapReduce job for the conversion, and you often need to write the Mapper yourself because your data is unique. The job must to emit the row key as the Key, and either a KeyValue, a Put, or a Delete as the Value. The Reducer is handled by HBase; configure it using HFileOutputFormat.configureIncrementalLoad() and it does the following:
    • Inspects the table to configure a total order partitioner
    • Uploads the partitions file to the cluster and adds it to the DistributedCache
    • Sets the number of reduce tasks to match the current number of regions
    • Sets the output key/value class to match HFileOutputFormat requirements
    • Sets the Reducer to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)
  3. One HFile is created per region in the output folder. Input data is almost completely re-written, so you need available disk space at least twice the size of the original data set. For example, for a 100 GB output from mysqldump, you should have at least 200 GB of available disk space in HDFS. You can delete the original input file at the end of the process.
  4. Load the files into HBase. Use the LoadIncrementalHFiles command (more commonly known as the completebulkload tool), passing it a URL that locates the files in HDFS. Each file is loaded into the relevant region on the RegionServer for the region. You can limit the number of versions that are loaded by passing the --versions= N option, where N is the maximum number of versions to include, from newest to oldest (largest timestamp to smallest timestamp).
    If a region was split after the files were created, the tool automatically splits the HFile according to the new boundaries. This process is inefficient, so if your table is being written to by other processes, you should load as soon as the transform step is done.
Source :Cloudera
You can get the complete code on my github repository :

Sunday 20 September 2015

Setting up pyspark in Eclipse

Install Pydev plugin in Eclipse

Then navigate to

Project -> Properties -> PyDev - PYTHONPATH -> External libraries

add source folders

/path/to/spark/spark-0.9.1/python
/path/to/pyspark-src.zip

Set SPARK_HOME in Eclipse

in begining of your program set this:

import os

os.environ["SPARK_HOME"]="/usr/hdp/spark"

Give root directory of spark installation

Multiple OutPut Format In Hadoop

The MultipleOutputs class simplifies writing data to multiple outputs.

  • Configure a named output with a name and an OutputFormat .
  • When writing out a < key ,value pair, specify the named output to send it to.


This is accomplished by assigning names to each output, using the static
 addNamedOutput method of MultipleOutputs
For example: 
In Driver Class add following:


MultipleOutputs.addNamedOutput(job, "QuantityData", TextOutputFormat.class, NullWritable.class, Text.class);

MultipleOutputs.addNamedOutput(job,"loadProfileChannelData",TextOutputFormat.class,NullWritable.class,Text.class);
MultipleOutputs.addNamedOutput(job, "registerValueData", TextOutputFormat.class, NullWritable.class, Text.class);

In Mapper Class add following:

public class MyParserMapper   extends
    Mapper<LongWritable, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable,Text> outs;
   @Override
   public void setup(Context context)throws IOException, InterruptedException{
  outs = new MultipleOutputs<NullWritable,Text>(context);
   }

Twitter Kafka Integration

Add these maven dependency in your project:


 <dependency>
       <groupId>com.twitter</groupId>
      <artifactId>hbc-core</artifactId>
      <version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.0</version>
</dependency>

Create a Topic to whom producer will send the tweets:


kafka-topics.sh  --create  --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twitter_topic

producer will send tweets to this topic
private static final String topic = "twitter_topic";

Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

endpoint.trackTerms(Lists.newArrayList("Twitter-API","#TOI")); //put search keyword here

Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
secret); //Twitter authentication   (get  key, token value from https://dev.twitter.com/ )

Client client = new ClientBuilder().hosts(Constants.STREAM_HOST).endpoint(endpoint).authentication(auth).processor(new StringDelimitedProcessor(queue)).build();

//create connection to client
client.connect();
for (int msgRead = 0; msgRead < 1000; msgRead++) {
KeyedMessage<String, String> message = null;
try {
message = new KeyedMessage<String, String>(topic, queue.take());
System.out.println("message : \n"+message);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(message);
}
producer.close();
client.stop();


Run this program as a java application

This will act as twitter -producer for kafka


For consuming tweets from topic 

kafka-console-consumer.sh --zookeeper localhost:2181 --topic twitter_topic --from-beginning