Friday, 9 September 2016

Yarn Questions

Q. what is YARN ?

Ans .
YARN consists of the following main components:
  • ResourceManager
  • NodeManager
  • ApplicationsMaster

1 . ResourceManager :

The ResourceManager typically runs on its own machine and is responsible for scheduling and allocating resources. The two main components of the ResourceManager are:

  1. Scheduler
  2. Applications Manager (AsM)

if you are familiar with Hadoop 1.x, note that YARN splits up the functionality of the JobTracker into two separate processes:

The ResourceManager allocates resources for applications but does not manage the lifecycle of applications. Instead, applications are managed by an ApplicationMaster that runs  on a node in the cluster. Each application running in the cluster requires its own ApplicationMaster.

2. NodeManager :

The NodeManager is a daemon process that runs on each DataNode in the cluster. Its responsibilities include:


  1. Communicating its status with the RM
  2. Tracking the health of the node
  3. Overseeing the lifecycle management of containers
  4. Monitoring resource usage of each container (i.e. memory and CPUs)
  5. Managing resource localization (for JAR files, libraries, and any other application-specific files used by containers)
  6. Managing the logs generated by containers



3. ApplicationMaster :

The per-application ApplicationMaster is the bootstrap process that initiates a YARN application once it gets past the application submission and its own launch.
The responsibilities of the AM include:

  1. Negotiating appropriate containers from the ResourceManager
  2. Working with the NodeManagers to execute and monitor the containers and their resource consumption
  3. Providing fault tolerance.

The benefits of the AM include:
  1. Extensibility Hadoop computing can now be more than Java MapReduce applications
  2. Scalability Hadoop clusters can now be considerably larger, because the ResourceManager does not manage fault tolerance (a problem with the old JobTracker that caused bottlenecks and limited the size of a Hadoop cluster)
  3. YARN applications have been executed on clusters of over 10,000 nodes.
YARN LIFE CYCLE :





Q. What is YARN container ? How does it work ?

Ans .
Containers :

A container in YARN represents a unit of work in an application. A container has the following behaviors:
  1. Runs on a node, managed by a NodeManager
  2. Makes use of some resources on the node, specifically: memory and CPU currently allocated to a container
  3. Depends on some libraries that are represented as local resources, which are provided by the NodeManager using a LocalResource
  4. Performs needed work
  5. The container does the actual work of the specific YARN application. This is where custom code appears that allows you to do whatever it is you need to do to your big data on Hadoop.



Q. What is difference between MR1 and MR2 (YARN) ?

Ans .


MRv1 uses the JobTracker to create and assign tasks to task trackers, which can become a resource bottleneck when the cluster scales out far enough (usually around 4,000 clusters).

MRv2 (aka YARN, "Yet Another Resource Negotiator") has a Resource Manager for each cluster, and each data node runs a Node Manager. In MapReduce MRv2, the functions of the JobTracker have been split between three services. 
The ResourceManager is a persistent YARN service that receives and runs applications (a MapReduce job is an application) on the cluster. It contains the scheduler, which, as previously, is pluggable.
 The MapReduce-specific capabilities of the JobTracker have been moved into the MapReduce Application Master, one of which is started to manage each MapReduce job and terminated when the job completes. 
The JobTracker function of serving information about completed jobs has been moved to the JobHistory Server. 
The TaskTracker has been replaced with the NodeManager, a YARN service that manages resources and deployment on a host. It is responsible for launching containers, each of which can house a map or reduce task.
Q. How does Read/write happens in HDFS ?
Ans .

Step 1: The client creates the file by calling create() method on DistributedFileSystem.

Step 2: DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it.The namenode performs various checks to make sure the file doesn’t already exist and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException. TheDistributedFileSystem returns an FSDataOutputStream for the client to start writing data to.

Step 3: As the client writes data, DFSOutputStream splits it into packets, which it writes to an internal queue, called the data queue. The data queue is consumed by the DataStreamer, which is responsible for asking the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas. The list of datanodes forms a pipeline, and here we’ll assume the replication level is three, so there are three nodes in the pipeline. TheDataStreamer streams the packets to the first datanode in the pipeline, which stores the packet and forwards it to the second datanode in the pipeline.

Step 4: Similarly, the second datanode stores the packet and forwards it to the third (and last) datanode in the pipeline.

Step 5: DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes, called the ack queue. A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline.

Step 6: When the client has finished writing data, it calls close() on the stream.

Step 7: This action flushes all the remaining packets to the datanode pipeline and waits for acknowledgments before contacting the namenode to signal that the file is complete The namenode already knows which blocks the file is made up of , so it only has to wait for blocks to be minimally replicated before returning successfully.




Wednesday, 24 August 2016

Hive Questions

Q. What is Serde in Hive ?
Ans. 
SerDe - Serializer, Deserializer instructs hive on how to process a record (Row). Hive enables semi-structured (XML, Email, etc) or unstructured records (Audio, Video, etc) to be processed also. For Example If you have 1000 GB worth of RSS Feeds (RSS XMLs). You can ingest those to a location in HDFS. You would need to write a custom SerDe based on your XML structure so that Hive knows how to load XML files to Hive tables or other way around

When we let Hive (as well as any other database) to work in its own internal formats - we do not care. 
When we want Hive to process our own files as tables (external tables) we have to let him know - how to translate data in files into records. This is exactly the role of SerDe. You can see it as plug-in which enables Hive to read / write your data. 

The SerDe interface allows you to instruct Hive as to how a record should be processed. A SerDe is a combination of a Serializer and a Deserializer (hence, Ser-De). The Deserializer interface takes a string or binary representation of a record, and translates it into a Java object that Hive can manipulate. The Serializer, however, will take a Java object that Hive has been working with, and turn it into something that Hive can write to HDFS or another supported system. Commonly, Deserializers are used at query time to execute SELECTstatements, and Serializers are used when writing data, such as through an INSERT-SELECT statement.
In this article, we will examine a SerDe for processing JSON data, which can be used to transform a JSON record into something that Hive can process.
Developing a SerDe
To start, we can write a basic template for a SerDe, which utilizes the Hive serde2 API (org.apache.hadoop.hive.serde2). This API should be used in favor of the older serde API, which has been deprecated:
package com.cloudera.hive.serde; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
* A template for a custom Hive SerDe
*/
public class BoilerplateSerDe implements SerDe {
 private StructTypeInfo rowTypeInfo;
 private ObjectInspector rowOI;
 private List<String> colNames;
 private List<Object> row = new ArrayList<Object>();
 /**
  * An initialization function used to gather information about the table.
  * Typically, a SerDe implementation will be interested in the list of
  * column names and their types. That information will be used to help 
  * perform actual serialization and deserialization of data.
  */
 @Override
 public void initialize(Configuration conf, Properties tbl)
     throws SerDeException {
   // Get a list of the table's column names.
   String colNamesStr = tbl.getProperty(Constants.LIST_COLUMNS);
   colNames = Arrays.asList(colNamesStr.split(","));
  
   // Get a list of TypeInfos for the columns. This list lines up with
   // the list of column names.
   String colTypesStr = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
   List<TypeInfo> colTypes =
       TypeInfoUtils.getTypeInfosFromTypeString(colTypesStr);
  
   rowTypeInfo =
       (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(colNames, colTypes);
   rowOI =
       TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
 }
 /**
  * This method does the work of deserializing a record into Java objects
  * that Hive can work with via the ObjectInspector interface.
  */
 @Override
 public Object deserialize(Writable blob) throws SerDeException {
   row.clear();
   // Do work to turn the fields in the blob into a set of row fields
   return row;
 }
 /**
  * Return an ObjectInspector for the row of data
  */
 @Override
 public ObjectInspector getObjectInspector() throws SerDeException {
   return rowOI;
 }
 /**
  * Unimplemented
  */
 @Override
 public SerDeStats getSerDeStats() {
   return null;
 }
 /**
  * Return the class that stores the serialized data representation.
  */
 @Override
 public Class<? extends Writable> getSerializedClass() {
   return Text.class;
 }
 /**
  * This method takes an object representing a row of data from Hive, and
  * uses the ObjectInspector to get the data for each column and serialize
  * it.
  */
 @Override
 public Writable serialize(Object obj, ObjectInspector oi)
     throws SerDeException {
   // Take the object and transform it into a serialized representation
   return new Text();
 }
}


Breaking this down a bit, the initialize() method is called only once and gathers some commonly-used pieces of information from the table properties, such as the column names and types. Using the type info of the row, you can instantiate an ObjectInspector for the row (ObjectInspectors are Hive objects that are used to describe and examine complex type hierarchies.) The two other important methods are serialize() anddeserialize(), which do the namesake work of the SerDe.

In a SerDe, the serialize() method takes a Java object representing a row of data, and converts that object into a serialized representation of the row. The serialized class is determined by the return type ofgetSerializedClass(). In the JSONSerDe, the serialize() method converts the object into a JSON string represented by a Text object. To do the serialization from Java into JSON, I’ve opted to use the Jackson JSON library, which allows me to convert a Java object to a JSON string with just a small amount of code.

Q.What is Avro data format ?

Ans.

Avro is a data serialization system.
Avro provides:
  • Rich data structures.
  • A compact, fast, binary data format.
  • A container file, to store persistent data.
  • Remote procedure call (RPC).


Data Serialization

Avro data is always serialized with its schema. Files that store Avro data should always also include the schema for that data in the same file. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data.
Because the schema used to write data is always available when the data is read, Avro data itself is not tagged with type information. The schema is required to parse data.

Encodings

Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster. But, for debugging and web-based applications, the JSON encoding may sometimes be appropriate.

Object Container Files

Avro includes a simple object container file format. A file has a schema, and all objects stored in the file must be written according to that schema. Objects are stored in blocks that may be compressed. Syncronization markers are used between blocks to permit efficient splitting of files for MapReduce processing.
Files may include arbitrary user-specified metadata.
A file consists of:
  • header, followed by
  • one or more blocks.


Overview – Working with Avro from Hive

The AvroSerde allows users to read or write Avro data as Hive tables. The AvroSerde's bullet points:
  • Infers the schema of the Hive table from the Avro schema. Starting in Hive 0.14, the Avro schema can be inferred from the Hive table schema.
  • Reads all Avro files within a table against a specified schema, taking advantage of Avro's backwards compatibility abilities
  • Supports arbitrarily nested schemas.
  • Translates all Avro data types into equivalent Hive types. Most types map exactly, but some Avro types don't exist in Hive and are automatically converted by the AvroSerde.
  • Understands compressed Avro files.
  • Transparently converts the Avro idiom of handling nullable types as Union[T, null] into just T and returns null when appropriate.
  • Writes any Hive table to Avro files.
  • Has worked reliably against our most convoluted Avro schemas in our ETL process.


Q. What is Schema evolution in Hive ?
Ans .

Q.What are diferent Compression codec in hadooop  ?

Ans .

GZIP  is not splittable .it uses more CPU resources than Snappy or LZO, but provides a higher compression ratio. GZip is often a good choice for cold data, which is accessed infrequently. Snappy or LZO are a better choice for hot data, which is accessed frequently.

BZIP2 is splittable in hadoop - it provides very good compression ratio but from CPU time and performances is not providing optimal results, as compression is very CPU consuming.BZip2 can also produce more compression than GZip for some types of files, at the cost of some speed when compressing and decompressing. HBase does not support BZip2 compression.
LZO is splittable in hadoop - leveraging hadoop-lzo you have splittable compressed LZO files. You need to have external .lzo.index files to be able to process in parallel. The library provides all means of generating these indexes in local or distributed manner.
LZ4 is splittable in hadoop - leveraging hadoop-4mc you have splittable compressed 4mc files. You don't need any external indexing, and you can generate archives with provided command line tool or by Java/C code, inside/outside hadoop. 4mc makes available on hadoop LZ4 at any level of speed/compression-ratio: from fast mode reaching 500 MB/s compression speed up to high/ultra modes providing increased compression ratio, almost comparable with GZIP one.
Snappy often performs better than LZO. It is worth running tests to see if you detect a significant difference.
Note : 
1.Compression is not recommended if your data is already compressed (such as images in JPEG format). In fact, the resulting file can actually be larger than the original.
2.For MapReduce, if you need your compressed data to be splittable, BZip2, LZO, and Snappy formats are splittable, but GZip is not. Splittability is not relevant to HBase data.



Q. What is SMB join in Hive ?
Ans .

In SMB join in Hive, each mapper reads a bucket from the first table and the corresponding bucket from the second table and then a merge sort join is performed. Sort Merge Bucket (SMB) join in hive is mainly used as there is no limit on file or partition or table join. SMB join can best be used when the tables are large. In SMB join the columns are bucketed and sorted using the join columns. All tables should have the same number of buckets in SMB join.
Q.Difference between sortBy ,distributeBy,OrderBy ?

Ans .

ORDER BY x: guarantees global ordering, but does this by pushing all data through just one reducer. This is basically unacceptable for large datasets. You end up one sorted file as output.

SORT BY x: orders data at each of N reducers, but each reducer can receive overlapping ranges of data. You end up with N or more sorted files with overlapping ranges. DISTRIBUTE BY x: ensures each of N reducers gets non-overlapping ranges of x, but doesn't sort the output of each reducer. You end up with N or unsorted files with non-overlapping ranges. CLUSTER BY x: ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers. This gives you global ordering, and is the same as doing (DISTRIBUTE BY x and SORT BY x). You end up with N or more sorted files with non-overlapping ranges. Make sense? So CLUSTER BY is basically the more scalable version of ORDER BY.