Updates & Upserts in Hadoop Ecosystem with Apache Kudu

A new open source Apache Hadoop ecosystem project, Apache Kudu completes Hadoop's storage layer to enable fast analytics on fast data.



By Sandish Kumar HN, phData.

A lot of phData customers were facing the issue that the Hadoop Data File System (HDFS) does not support updating its existing records/data sets. To overcome this, Cloudera has introduced a new Apache open source project, "Apache Kudu", which not only allows inserts/upserts but does this in real-time making it faster.

Kudu is the new addition to Hadoop ecosystem which enables faster inserts/updates with fast columnar scans and it also allows multiple real-time analytic queries across single storage layer where kudu internally organizes its data in the columnar format then row format. Kudu is specially designed for rapidly changing data like time-series, predictive modeling, and reporting applications where end users require immediate access to newly-arrival data.

In this blog, We start with Kudu Architecture and then cover topics like Kudu High Availability, Kudu File System, Kudu query system, Kudu - Hadoop Ecosystem Integration, and Limitations of Kudu.

Impala + Kudu Architecture:

Impala + Kudu architecture

Architecture:

Kudu works in a Master/Worker architecture. Kudu Master Node acts as catalog server, takes care of cluster coordination, maintenance of tablet directory and NOTE: Kudu can have multiple kudu master nodes to support fast failover. Kudu Tablet Server also called as tserver runs on each node, tserver is the storage engine, it hosts data, handles read/writes operations.

High Availability:

Kudu uses the Raft consensus algorithm to distribute the operations across the list of tablets or cluster.

Raft consensus algorithm allows collection of machines to work as a united group that can survive the failure of some of its group members. Raft consensus algorithm achieves unite via an elected leader. A server or node in a group(cluster) is either a leader or follower or can be a candidate at the time of election( on leader unavailability). The leader is responsible for log replication on its followers. The leader sends a heartbeat information to its followers about its existence. Every follower in the group(cluster) has a timeout(ms) in which it expects the heartbeat message from its leader. If no heartbeat is received the follower changes its status to the candidate and starts an election to elect the leader. You can read more about raft consensus algorithm here.

Kudu Storage:

While storing data in Kudu file system Kudu uses below-listed techniques to speed up the reading process as it is space-efficient at the storage level.

  • Differential encoding
  • Run-length encoding. Example if the input is [a,a,a,a,a,a,a,a,b,b,b,b,b,b,c,c] then the output in the storage will be [a,8,b,6,c,2], which is also known as lossless data compression.
  • Vectorized bit-packing, Which helps in compression, scan performance and also in random access in constant time with small overhead.

Kudu uses Raft consensus algorithm to replicate the data across the cluster. Where leader is responsible for accepting and replicating write replicas to the follower nodes. Once write is persisted with given N replication an acknowledgment will be sent to the client.

Kudu Query System:

Kudu supports SQL type query system via impala-shell. And as Kudu uses columnar storage which reduces the number data IO required for analytics queries.

Let’s go over Kudu table schema design:

PRIMARY KEY comes first in the creation table schema and you can have multiple columns in primary key section i.e, PRIMARY KEY (id, fname). Kudu uses RANGE, HASH, PARTITION BY clauses to distribute the data among its tablet servers. Kudu tables create N number of tablets based on partition schema specified on table creation schema. Partition schema can specify HASH or RANGE partition with N number of buckets or combination of RANGE and HASH partition. Once table is created, tablets are fixed can’t be added or dropped.

CREATE TABLE kudu_employee_table (id BIGINT PRIMARY KEY, fname STRING, lname STRING) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;


Hash Partition:

Hash partition distributed rows by hash value into N buckets. The below KUDU-SQL will have 1 million rows with 100 partitions = 10,000 rows per partition approximately. The rows are partitioned based on hash then sequentially, so 1, 99999 and 123455 can be in different partitions.

CREATE TABLE kudu_employee_table (id string primary key, fname string, lname) PARTITION BY HASH(id) PARTITIONS 100 STORED AS KUDU;


Range Partition:

Tables with range partition required to have tablets which will cover entire range of possible keys. Below partition will create 100 buckets for all ID’s which begin with the lowercase letter and no other parallelism will be enabled. You can have multiple range partitions at the same time.

CREATE TABLE kudu_employee_table (id string primary key, fname string,lname string) PARTITION BY HASH(id) PARTITIONS 100, RANGE (partition 'a' <= values < '{') STORED AS KUDU;


Upsert:

UPSERTS acts as combinations of INSERT and UPDATE

INSERT INTO kudu_employee_table VALUES (1, "robert",”john”);
UPSERT INTO kudu_employee_table VALUES (1, "robert",”jh”);


The current value of the row lname is jh.

Update:

UPDATE kudu_employee_table SET name="robert reese" WHERE id = 1;


Hadoop Ecosystem Integration:

Kudu provides C++, Java, Python API’s access to individual rows. These API’s can be used to create REST API’s for the web interface for analytics dashboards, Kudu connection (JDBC) to BI tools (tableau, qlik) and for machine learning projects. Kudu can be integrated with MapReduce, Spark and other Hadoop ecosystem components.

Kudu-Java Client API’s:

  • Connect to Kudu Cluster:
KuduClient client = new KuduClient.KuduClientBuilder(“localhost”).build();


  • Create a table at Kudu:
client.createTable(tableName, new Schema(columns), new CreateTableOptions().setRangePartitionColumns(rangeKeys));


  • Open table for insert/upsert rows:
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();


  • Scan a Kudu table:
KuduScanner scanner = client.newScannerBuilder(table).setProjectedColumnNames(projectColumns).build();


Kudu-Python Client API’s:

  • Connect to Kudu Cluster:
client = kudu.connect("127.0.0.1", 7051)


  • Create a table at Kudu:
client.create_table(table, schema, partitioning)


  • Open table for insert/upsert rows:
session = client.new_session();


  • Scan a Kudu table:
scanner = table.scanner() scanner.add_predicate(projectColumns)
result = scanner.open().read_all_tuples()


Let’s take a look at some Kudu-MapReduce and Kudu-Spark tools for bulk import/export of csv, avro, parquet formated files.

Kudu-MapReduce Utils:

  • Import CSV:
hadoop jar kudu-client-tools-1.5.0-SNAPSHOT-jar-with-dependencies.jar org.apache.kudu.mapreduce.tools.ImportCsv   


  • Export CSV:
hadoop jar kudu-client-tools-1.5.0-SNAPSHOT-jar-with-dependencies.jar org.apache.kudu.mapreduce.tools.ExportCsv   


Kudu-Spark Utils:

  • Import/Export CSV, AVRO, PARQUET:
spark-submit  --master <master-url> --deploy-mode <deploy-mode> 
  --class org.apache.kudu.spark.tools.ImportExportFiles kudu-1.6.0-SNAPSHOT-tools_2.11.jar 
  --operation=import/export --format=<data-format(csv,parquet,avro)> --master-addrs=<master-addrs> 
  --path=<input/output path> --table-name=<table-name> --<columns while export>


You can find all required jars in kudu/lib folder.

Limitations of Kudu:

  • Max cell size(individual values) < 64KB prior to compression.
  • Single row size should be < 200+KB
  • Table and Column names should be UTF-8 and not more then 256 bytes size.
  • Type, compression, and encoding can only be done through kudu not support via Impala.

References:

 
Bio: Sandish Kumar HN, A Big Data Solutions Engineer at phData, builds and manages Big Data solutions for phData customers. He is particularly interested in Big Data open source projects, Machine Learning, algorithm design and optimization, and distributed approaches to data processing and analysis. Sandish Kumar HN holds a Bachelor Degree in CSE and a Diploma in CS. Email him at sanysandish@gmail.com.

Related: