Wednesday, 3 August 2016

Kafka Questions

1 .Benchmarking kafka :

Source :Lnkdin Blogs :
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying

Q.2 Difference between kafka Receiver based approach and Direct Approach
Ans .


 Receiver-based Approach :

This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
However, under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure

Direct Approach (No Receivers) :

Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch.
  • Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
  • Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
  • Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints.
Source :http://spark.apache.org/docs/latest/streaming-kafka-integration.html

Q.In Kafka how does messages get distributed to partitioner ?
Ans. Primary objective of Partition is to achieve parallelism. So you can have as many number of consumers reading from same topic as you have partitions.
So if you create partition  then you decide how messages should go to which partition. So either use "key" while sending messages to Topic or write your own custom logic for same. e.g. You can have 3 Partitions and divided your messages in High, Medium, Low priority. So you can implement a partitioner such that High goes to 1 , Medium goes to 1 and low goes to 2 

Q. How to purge kafka topic ?
Ans . 
Temporarily update the retention time on the topic to one second:

Command:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000


Then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.

Installing Multiple Broker's on Single kafka cluster

Installing  Multiple Broker's on Single kafka cluster :

Streps to create multiple brokers on single node :

1. cp /etc/kafka/conf/server.properties /etc/kafka/conf/server1.properties
it will create new server1.properties file .
Make following changes to the file

broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

2.  cp /etc/kafka/conf/server.properties /etc/kafka/conf/server2.properties
it will create new server2.properties file .
Make following changes to the file

server2.properties
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

then start all the brokers :

nohup /usr/hdp/2.2.9.1-11/kafka/bin/kafka-server-start.sh /etc/kafka/conf/server.properties
nohup /usr/hdp/2.2.9.1-11/kafka/bin/kafka-server-start.sh /etc/kafka/conf/server1.properties
nohup /usr/hdp/2.2.9.1-11/kafka/bin/kafka-server-start.sh /etc/kafka/conf/server2.properties

Now you can see three brokers are running on Node.

Note. make sure you have set these properties :
zookeeper.connect=host1:2181,host2:2181,..
listeners:PLAINTEXT://host:9092

Kafka Questions

1 .Benchmarking kafka :

Source :Lnkdin Blogs :


Q. What is difference between kafka Receiver based approach and Direct Approach
Ans .

 Receiver-based Approach :

This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
However, under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure

Direct Approach (No Receivers) :

Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch.

  • Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
  • Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
  • Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints.
Source :http://spark.apache.org/docs/latest/streaming-kafka-integration.html

Q.In Kafka how does messages get distributed to partitioner ?

Ans. Primary objective of Partition is to achieve parallelism. So you can have as many number of consumers reading from same topic as you have partitions.
So if you create partition  then you decide how messages should go to which partition. So either use "key" while sending messages to Topic or write your own custom logic for same. e.g. You can have 3 Partitions and divided your messages in High, Medium, Low priority. So you can implement a partitioner such that High goes to 1 , Medium goes to 1 and low goes to 2 

Q. How to purge kafka topic ?
Ans . 
Temporarily update the retention time on the topic to one second:

Command:

kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000

Then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.

Q. what is role of zookeeper in kafka ?
Kafka is built to use Zookeeper. 
Kafka is a distributed system and uses Zookeeper to track status of kafka cluster nodes. It also keeps track of Kafka topics, partitions etc.
It is basically used to maintain co-ordination between different nodes in a cluster, one of the most important thing for kafka is it uses zookeeper to periodically commit offset so that in case of node failure it can resume from the previously committed offset (imagine yourself taking care of all this by your own). Zookeeper also plays vital role for serving so many other purposes such as leader detection, configuration management, synchronization, detecting when a new node join or leaves the cluster etc
Now how Kafka uses ZooKeeper?
As of v0.8 Kafka uses zookeeper for storing variety of configurations as K,V in the ZK data tree and use them across the cluster in a distributed fashion Lets take 2 simple use cases for which Kafka maintains values in ZooKeeper

1.    Topics under a broker -/brokers/topics/[topic] 

2.   Next Offset for a Consumer/Topic/Partition combination -/consumers/[groupId]/offsets/[topic]/[partitionId]

Now think about “distributed-ness” Of course configurations like these are replicated and distributed throughout the ZooKeeper ensemble – Leader node and Follower nodes. So I don’t think Kafka will work without ZooKeeper (at least pre 0.8.2 version of Kafka).

Kafka uses Zookeeper for the following:


  1. Electing a controller. The controller is one of the brokers and is responsible for maintaining the leader/follower relationship for all the partitions. When a node shuts down, it is the controller that tells other replicas to become partition leaders to replace the partition leaders on the node that is going away. Zookeeper is used to elect a controller, make sure there is only one and elect a new one it if it crashes.
  2. Cluster membership - which brokers are alive and part of the cluster? this is also managed through ZooKeeper.
  3. Topic configuration - which topics exist, how many partitions each has, where are the replicas, who is the preferred leader, what configuration overrides are set for each topic
  4. (0.9.0) - Quotas - how much data is each client allowed to read and write
  5. (0.9.0) - ACLs - who is allowed to read and write to which topic
  6. (old high level consumer) - Which consumer groups exist, who are their members and what is the latest offset each group got from each partition.
  7.   Zookeeper is mainly used to track status of nodes present in Kafka cluster and also to keep track of Kafka topics, messages, etc.

 Q.2 How producer identify the leader in kafka?
Ans .The producer sends a Metadata request with a list of topics to one of the brokers in the broker-list you supplied when configuring the producer.
The broker responds with a list of partitions in those topics and the leader for each partition. The producer caches this information and knows where to redirect its produce messages. 
Q.3 Is Kafka good for a scenario involving 100s of thousands of topics (Where 10s of topics or a topic with 10s of partition will represent a data flow for a customer)?
Ans . Technically, Kafka will work nicely with 1000-3000 partitions per broker, so with enough brokers your plan will work.
However, in my experience, whenever people come up with a plan that ends up with 100K topics or more, they are not designing their data model correctly. This is especially true for people who used JMS queues before and are applying the same type of design to Kafka.
Q.4  What's the point of partitions in Kafka? How do I know how many partitions are best for a data set of size X?
Q .5 What happens when number of consumer is greater than partitions vice-versa ?
Ans .

 Consumers read from any single partition, allowing you to scale throughput of message consumption in a similar fashion to message production. Consumers can also be organized into consumer groups for a given topic — each consumer within the group reads from a unique partition and the group as a whole consumes all messages from the entire topic.

 If you have more consumers than partitions then some consumers will be idle because they have no partitions to read from. 

If you have more partitions than consumers then consumers will receive messages from multiple partitions

If you have equal numbers of consumers and partitions, each consumer reads messages in order from exactly one partition.

following diagram explains the scenario :




Q .6 How kafka guarantees consistency and availabiblty ?                                                              
                                                       
Ans.Before beginning the discussion on consistency and availability, keep in mind that these guarantees hold as long as you are producing to one partition and consuming from one partition. All guarantees are off if you are reading from the same partition using two consumers or writing to the same partition using two producers.
Kafka makes the following guarantees about data consistency and availability: (1) Messages sent to a topic partition will be appended to the commit log in the order they are sent, (2) a single consumer instance will see messages in the order they appear in the log, (3) a message is ‘committed’ when all in sync replicas have applied it to their log, and (4) any committed message will not be lost, as long as at least one in sync replica is alive.
The first and second guarantee ensure that message ordering is preserved for each partition. Note that message ordering for the entire topic is not guaranteed. The third and fourth guarantee ensure that committed messages can be retrieved. In Kafka, the partition that is elected the leader is responsible for syncing any messages received to replicas. Once a replica has acknowledged the message, that replica is considered to be in sync. To understand this further, lets take a closer look at what happens during a write.

Installing  Multiple Broker's on Single kafka cluster :

Streps to create multiple brokers on single node :

1. cp /etc/kafka/conf/server.properties /etc/kafka/conf/server1.properties
it will create new server1.properties file .
Make following changes to the file

broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

2.  cp /etc/kafka/conf/server.properties /etc/kafka/conf/server2.properties
it will create new server2.properties file .
Make following changes to the file

server2.properties
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

then start all the brokers :

nohup /usr/hdp/2.2.9.1-11/kafka/bin/kafka-server-start.sh /etc/kafka/conf/server.properties
nohup /usr/hdp/2.2.9.1-11/kafka/bin/kafka-server-start.sh /etc/kafka/conf/server1.properties
nohup /usr/hdp/2.2.9.1-11/kafka/bin/kafka-server-start.sh /etc/kafka/conf/server2.properties

Now you can see three brokers are running on Node.

Note. make sure you have set these properties :
zookeeper.connect=host1:2181,host2:2181,..
listeners:PLAINTEXT://host:9092

Thursday, 24 December 2015

Create Custom Sink For Flume Sources

Steps To write Azure Sink
1.   Create plugins.d Dir in Flume_HOME(usr/lib/flume)
2     In plugins.d  dir create sink(any name  like azuresink)
3.    In azuresink dir create  three directories
a)lib ----it includes sink class ,one which you have written for custom sink
b)libext ---it includes dependencies of Sink class (all jar needed to run sink class)
c)native  contains dependencies .iso file.
     4. copy the sink jar in /usr/hdp/2.2.6.0-2800/flume/lib    (In HDP 2.2)

          In cloudera there is no need to paste it here.

Wednesday, 2 December 2015

Bulk Loading Data into HBase Using Mapreduce

If you use BulkLoads with HBase, your workflow is similar to the following:
  1. Extract your data from its existing source. For instance, if your data is in a MySQL database, you might run the mysqldump command. The process you use depends on your data. If your data is already in TSV or CSV format, skip this step and use the included ImportTsv utility to process your data into HFiles. See theImportTsv documentation for details.
  2. Process your data into HFile format. See http://hbase.apache.org/book/hfile_format.html for details about HFile format. Usually you use a MapReduce job for the conversion, and you often need to write the Mapper yourself because your data is unique. The job must to emit the row key as the Key, and either a KeyValue, a Put, or a Delete as the Value. The Reducer is handled by HBase; configure it using HFileOutputFormat.configureIncrementalLoad() and it does the following:
    • Inspects the table to configure a total order partitioner
    • Uploads the partitions file to the cluster and adds it to the DistributedCache
    • Sets the number of reduce tasks to match the current number of regions
    • Sets the output key/value class to match HFileOutputFormat requirements
    • Sets the Reducer to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)
  3. One HFile is created per region in the output folder. Input data is almost completely re-written, so you need available disk space at least twice the size of the original data set. For example, for a 100 GB output from mysqldump, you should have at least 200 GB of available disk space in HDFS. You can delete the original input file at the end of the process.
  4. Load the files into HBase. Use the LoadIncrementalHFiles command (more commonly known as the completebulkload tool), passing it a URL that locates the files in HDFS. Each file is loaded into the relevant region on the RegionServer for the region. You can limit the number of versions that are loaded by passing the --versions= N option, where N is the maximum number of versions to include, from newest to oldest (largest timestamp to smallest timestamp).
    If a region was split after the files were created, the tool automatically splits the HFile according to the new boundaries. This process is inefficient, so if your table is being written to by other processes, you should load as soon as the transform step is done.
Source :Cloudera
You can get the complete code on my github repository :