Latest Posts

Spark Memory Management

Spark Memory Management 1.6.0+

Starting Apache Spark version 1.6.0, memory management model has changed. The old memory management model is implemented by StaticMemoryManager class, and now it is called “legacy”. “Legacy” mode is disabled by default, which means that running the same code on Spark 1.5.x and 1.6.0 would result in different behavior, be careful with that. For compatibility, you can enable the “legacy” model with spark.memory.useLegacyMode parameter, which is turned off by default.

Previously I have described the “legacy” model of memory management in this article about Spark Architecture almost one year ago. Also I have written an article on Spark Shuffle implementations that briefly touches memory management topic as well.

This article describes new memory management model used in Apache Spark starting version 1.6.0, which is implemented as UnifiedMemoryManager.

Long story short, new memory management model looks like this:

Apache Spark Unified Memory Manager introduced in v1.6.0+

You can see 3 main memory regions on the diagram:

Reserved Memory. This is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB, which means that this 300MB of RAM does not participate in Spark memory region size calculations, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production. Be aware, this memory is only called “reserved”, in fact it is not used by Spark in any way, but it sets the limit on what you can allocate for Spark usage. Even if you want to give all the Java Heap for Spark to cache your data, you won’t be able to do so as this “reserved” part would remain spare (not really spare, it would store lots of Spark internal objects). For your information, if you don’t give Spark executor at least 1.5 * Reserved Memory = 450MB heap, it will fail with “please use larger heap size” error message.
User Memory. This is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations. For example, you can rewrite Spark aggregation by using mapPartitions transformation maintaining hash table for this aggregation to run, which would consume so called User Memory. In Spark 1.6.0 the size of this memory pool can be calculated as (“Java Heap” – “Reserved Memory”) * (1.0 – spark.memory.fraction), which is by default equal to (“Java Heap” – 300MB) * 0.25. For example, with 4GB heap you would have 949MB of User Memory. And again, this is the User Memory and its completely up to you what would be stored in this RAM and how, Spark makes completely no accounting on what you do there and whether you respect this boundary or not. Not respecting this boundary in your code might cause OOM error.
Spark Memory. Finally, this is the memory pool managed by Apache Spark. Its size can be calculated as (“Java Heap” – “Reserved Memory”) * spark.memory.fraction, and with Spark 1.6.0 defaults it gives us (“Java Heap” – 300MB) * 0.75. For example, with 4GB heap this pool would be 2847MB in size. This whole pool is split into 2 regions – Storage Memory and Execution Memory, and the boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5. The advantage of this new memory management scheme is that this boundary is not static, and in case of memory pressure the boundary would be moved, i.e. one region would grow by borrowing space from another one. I would discuss the “moving” this boundary a bit later, now let’s focus on how this memory is being used:
Storage Memory. This pool is used for both storing Apache Spark cached data and for temporary space serialized data “unroll”. Also all the “broadcast” variables are stored there as cached blocks. In case you’re curious, here’s the code of unroll. As you may see, it does not require that enough memory for unrolled block to be available – in case there is not enough memory to fit the whole unrolled partition it would directly put it to the drive if desired persistence level allows this. As of “broadcast”, all the broadcast variables are stored in cache with MEMORY_AND_DISK persistence level.
Execution Memory. This pool is used for storing the objects required during the execution of Spark tasks. For example, it is used to store shuffle intermediate buffer on the Map side in memory, also it is used to store hash table for hash aggregation step. This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).

Ok, so now let’s focus on the moving boundary between Storage Memory and Execution Memory. Due to nature of Execution Memory, you cannot forcefully evict blocks from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found. But it is not so for the Storage Memory – it is just a cache of blocks stored in RAM, and if we evict the block from there we can just update the block metadata reflecting the fact this block was evicted to HDD (or simply removed), and trying to access this block Spark would read it from HDD (or recalculate in case your persistence level does not allow to spill on HDD).

So, we can forcefully evict the block from Storage Memory, but cannot do so from Execution Memory. When Execution Memory pool can borrow some space from Storage Memory? It happens when either:

There is free space available in Storage Memory pool, i.e. cached blocks don’t use all the memory available there. Then it just reduces the Storage Memory pool size, increasing the Execution Memory pool.
Storage Memory pool size …read more

Community Choice Winner Blog: Machine Learning in Big Data – Look Forward or Be Left Behind

Machine Learning in Big Data – Look Forward or Be Left Behind
Bill Porto, Senior Engineering Analyst, RedPoint Global Inc.

Computers? Not so much. One of the biggest developments – and challenges – in technology has been the advent of machine learning. But even as we make major strides in the age of Big Data, applying machine learning to our data is something that few have effectively achieved. Creating models to predict customer response or to segment customer data into set categories are “predictable” use cases. It’s a start – taking data, discovering what it can tell you, and creating a model and use for it. But that’s not enough…

If we want to deliver a real cost advantage for the enterprise with machine learning, there are larger, mission-critical issues to address. These issues focus on model choice, viability horizon, practical design alternatives, learning from on-the-fence model factors, and opportunities for automating access to changing data and netting-out error and noise.

This April at the Hadoop Summit in Dublin, I am delighted to have the opportunity to share with you how continual, adaptive optimization is the key to maintaining a leadership position in satisfying customer demand. As senior analytics engineer at RedPoint Global, I’m focused on developing automated business optimization software that incorporates evolutionary optimization, neural networks, and a host of other non-traditional machine learning techniques.

Hadoop Summit Europe 2016 provides a great opportunity to share my experience in machine learning and Big Data with attendees who want to really move ahead with their own machine learning mastery.

We’ll explore:

What the applicability of machine learning tools are
How these processes should and can be optimized to predict, segment, and ultimately drive more predictable outcomes from business decisions
Approaches for populating and tuning your models
The tools and processes you will need to truly impact sustainable revenue for your enterprise

I’m looking forward to showing how to apply predictive modelling and optimization to harness the full power and potential of your data at the Hadoop Summit Europe 2016.

The post Community Choice Winner Blog: Machine Learning in Big Data – Look Forward or Be Left Behind appeared first on Hortonworks.

…read more

Community Choice Winner Blog: Overview of Apache Flink – The 4G of Big Data Analytics Frameworks

Overview of Apache Flink: the 4G of Big Data Analytics Frameworks
Author: Slim Baltagi, Director of Big Data engineering, Capital One

I want to thank those of you who voted for my proposal and I look forward to meeting many of you in Dublin. I’ll be around for the conference and would gladly welcome any follow on conversations.

About me

I am currently a Director of Big Data engineering at Capital One. Capital One is a leading consumer and commercial banking institution conducting business in the US, Canada and the U.K.

I have over 18 years of IT and business experience and I spent the last 5 years of my life Hadooping and more recently Sparking and Flinking! I enjoy evangelizing Big Data technologies by speaking at Big Data events and maintaining a blog and a Knowledge Base on many Apache projects: Hadoop, Spark, Flink… With some fellow squirrels, I also run Apache Flink Meetups in New York City, Chicago, Washington DC, Dallas/Fort Worth, Boston and Paris.

My session is an introductory level talk about Apache Flink: a multi-purpose Big Data analytics framework leading a movement towards the unification of batch and stream processing or stream processing-first in the open source. With the many technical innovations Apache Flink brings along with its unique vision and philosophy, it is considered the 4 G (4th Generation) of Big Data Analytics frameworks providing the only hybrid (Real-Time Streaming + Batch) open source distributed data processing engine supporting many use cases: Real-Time streaming, batch, machine learning and graph processing.

After attending my talk, you will know more about:

What is Apache Flink stack? Its streaming dataflow execution engine, APIs and domain-specific libraries for batch, streaming, machine learning and graph processing.

How Apache Flink integrates with Hadoop and other open source tools for data input and output as well as deployment?

Why Apache Flink is an alternative to Apache Hadoop MapReduce, Apache Storm and Apache Spark?

How Apache Flink is used at Capital One and who else adopted Apache Flink?

Where to learn more about Apache Flink?
To get a preview of my session at the 2016 Hadoop Summit in Dublin, I would like to suggest a couple related talks that I gave in 2015:

2015 Big Data Scala By the Bay, San Francisco, US: Why Apache Flink is the 4G of Big Data Analytics Frameworks?

2015 Flink Forward, Berlin, Germany: Flink and Spark Similarities and Differences

I would like also to suggest slide decks of a few talks, which I gave about Apache Flink, at

My talk is an introductory talk open to technical and non-technical people alike. I look forward to meeting you at the Hadoop Summit that will take place in Dublin, Ireland in April 13-14, 2016.

The post Community Choice Winner Blog: Overview of Apache Flink – The 4G of Big Data Analytics Frameworks appeared first on Hortonworks.

…read more

Hadoop Capacity Planning

Welcome to 2016! As Hadoop races into prime time computing systems, Some of the issues such as how to do capacity planning, assessment and adoption of new tools, backup and recovery, and disaster recovery/continuity planning are becoming serious questions with…
Read more

The Object rEvolution


It’s our pleasure to host Ryan Peterson, Chief Solution Strategist at EMC, as a guest blogger to expand upon another great step in our partnership to deliver compelling customer solutions through joint engineering efforts. Follow Ryan @BigDataRyan.

Object storage isn’t a new concept and EMC’s been innovating around it since the beginning. Take our Centera and Atmos products as key examples. The first Centera was created around the idea that objects could store much higher quantities of data than a file system in a single store while the other aspect of Centera was a rich set of security and compliancy features file systems had not been able to achieve. Data shredding for example was a feature required by governments and law firms. We all know some politicians who need a Centera system Atmos on the other hand was designed with a completely different base requirement. The goal was to support a geo-parity environment mostly seen in large enterprise customers and with service providers. In the Atmos design, data written to one location would be protected by other locations and yet share a common namespace. The design inspired many large internet-scale companies you likely use today and some of them are even backed by an Atmos system.

But when you are innovating from scratch, you make design decisions that leave things out and you learn from the 25,000 current EMC object storage customers. So we started with a new baseline of code and added in many of the components of Centera and Atmos to create something new, exciting, and dare I say revolutionary. Enter Elastic Cloud Storage (ECS) which can scale from one rack in one data center to many racks in many data centers thus encompassing the design requirements of both Atmos and Centera and with new data protection features that increase performance from the original design such as local replica process, erasure coding for high performance, and geo-protection using XOR to reduce overhead. ECS changes the game!

But with the advent of new technologies in the world such as is near and dear to my heart, Hadoop, the design needed to include the capability to analyze the data in the entire global namespace and do it efficiently.

ECS includes a mapping to Hadoop using the Hadoop Compatible File System (HCFS) guidelines the same way you might see a Lustre or Gluster connect. The metadata controllers in ECS provide the namespace context and allow Hadoop to be able to see the data on that system the same way it would if it were looking at HDFS. In fact, it’s as simple as using a different URI string to connect and you don’t have to remove your HDFS DAS if you don’t want to. Simply take your existing Hadoop cluster and point to viprfs:// like shown below. Hadoop will automatically open a series of connections to access the data at the fastest possible rate.

Now before we wanted to go out and tell the world about this solution, we really wanted to enlist the support of the Hadoop distributions and we wanted to test it thoroughly. The picture below is a setup of 10 racks of ECS running the Hortonworks and Pivotal distributions of Hadoop. This is one of others like it that seek to simplify the implementation process, validate all things are functional, and provides us a place to test scenarios our customers bring to us.

Our friends at Hortonworks really did an amazing job going through all of the features of Hadoop and validating each and every line of Apache code works on ECS. Click here to see all of the certifications that have already been completed with our geo-scale object platform and Hortonworks.

So what? What does this mean to you? Let’s get serious and clear. Never before has there been an opportunity to purchase your own Analytics-Ready-Cloud-in-a-Box. So who are the customers that might care?

If you have a need for data to be spread across geographies such as Americas, Europe, and Asia; or even New York, Chicago, and Los Angeles, then relying on a single name space to support that environment while keeping the data in a state that can be quickly accessed and analyzed should be top of mind. Thus far, we’ve seen customers in the following segments (to name a few and not exhaustive):

Internet of Things (IoT) such as Connected Cars, Home Automation, Turbines, and Smartphone Backups
Geo-scale Archive – data that you might have sent to tape or offsited stays inexpensive and analytics accessible
Service Providers, Telcos, and Web 2.0 companies that need to service the application generation

Let’s compare this with the existing technologies used in Public Cloud providers not using ECS. Data is collected in multi-tenant object systems, is copied to another platform for analysis (a Cloud Data Lake so to speak) and the results pushed back into your primary system. Amazon’s S3 and EMR are a good example of that type of legacy cloud architecture. With ECS, we remove the need to move data by allowing analysis to happen against the data set where it sits. Now that’s Revolutionary!

If you have requirements that you believe are met with ECS, whether you want to host the equipment yourself or are looking for an ECS-enabled Public Cloud Service Provider, reach out to your EMC representative or discuss with our friends at Hortonworks. We can meet your needs with this rEvolutionary architecture.

For more information, you can watch this video of my colleagues Nikhil & Priya discussing the internals of the platform and how it works with Hadoop.

You can also download our Hadoop on ECS White Paper.

The post The Object rEvolution appeared first on Hortonworks.

…read more

Hadoop on Remote Storage


The question regarding running Hadoop on a remote storage rises again and again by many independent developers, enterprise users and vendors. And there are still many discussions in community, with completely opposite opinions. I’d like to state here my personal view on this complex problem.

In this article I would call remote storage “NAS” for simplicity. I would also take as a given that remote storage is not the same HDFS, but something completely different – from standard storage arrays with LUNs mounted to the servers to different distributed storage systems. For all these systems I assume that they are remote, because unlike HDFS they don’t allow you to run your custom code on the storage nodes. And they are mostly “storages”, so they are using some kind of erasure encoding to save the space and make this solution more competitive.

If you are reading my blog for a long time, you might mention that it is the second version of this article. During the last year I was constantly thinking on this problem, and my position has shifted a bit, mostly based on the real world practice and experience.

Read IO Performance. For most of the Hadoop clusters the limiting factor in performance is IO. The more IO bandwidth you have, the faster your cluster would work. You won’t be surprised if I tell you that the IO bandwidth mostly depends on the amount of disks you have and their type. For example, a single SATA HDD can deliver you somewhat 50MB/sec in sequential scans, SAS HDD can give you 90MB/sec and SSD might achieve 300MB/sec. This is a simple math to calculate the total platform bandwidth given these numbers. Comparing DAS with NAS does not make much sense in this context, because both NAS and cluster with DAS might have the same amount of disks and thus would deliver comparable bandwidth. So again, considering infinite network bandwidth with zero latency, same RAID controllers and same number and type of drives used, DAS and NAS solutions would deliver the same read IO performance.
Write IO Performance. Here the things are getting a bit more complicated, and you should understand how exactly your NAS solution work to be able to compare it with Hadoop on DAS. HDFS stores a number of exact copies of the data, 3 by default. So if you write X GB of data, in fact they would occupy 3*X GB of disk space. And of course, the process of writing 3 copies of the data is 3 times slower than the process of writing a single copy. How does the most NAS storages work? NAS is an old industry and they clearly understood that storing many exact copies of the data is very wasteful, so most of them use some kind of erasure coding (like Reed-Solomon one). This allows you to achieve similar redundancy with storing 3 exact copies of the data with only 40% overhead with RS(10,4). But everything comes at cost, and the cost here is performance. For writing a single block in HDFS you have to just write it 3 times. With RD(10,4) to write a single block you have to calculate erasure codes for it either by reading other 9 blocks and writing out 4 of them, or having some kind of a caching layer with replication and background compaction process. In short, writing to it would always be slower than writing to the cluster with replication, this is like comparing RAID10 with RAID5, same logic of replication vs erasure coding.
Read IO performance (degraded). In case you have lost a single machine or single drive in Hadoop cluster with DAS, your read performance is not affected – you read the same data from a different node that is still alive. But what happens in NAS with RS(10,4)? Right, to restore a single block with RS(10,4) you have to read up to 13 blocks, which would make your system up to 13 times slower! Of course, in most cases you encode sequential blocks and then read sequential blocks, so you can restore the missing one easier. But still, your performance would degrade 2x in best scenario and up to 13x in worst:

And if you think that the degraded case is not very relevant for you, here is the statistics of Facebook Hadoop cluster:

Data Recovery. When you are losing the node and repliacing it, how long does it take to recover the redundancy of your system? For HDFS with DAS you are just copying the data for under-replicated blocks to the new node. For RS(10,4) you have to restore the missing blocks by reading all the other blocks in its group and performing computations on top of them. Usually it is 5x-10x slower:

Network. When you run a Hadoop cluster with DAS, Hadoop framework itself tries to schedule executers as close to the data as possible, usually making a preference to local IO. In the cluster with NAS, your IO is always remote, with no exceptions. So the network becomes a big pain point – you should plan it very carefully with no oversubscription both between the compute nodes, and between compute and storage. Network rarely becomes a bottleneck if you have enough 10GbE interfaces, but the switches should be good, and you need much more of them than in solution with DAS. Here’s the slide from Cisco’s presentation regarding this subject:

Local Storage. Having remote HDFS might look like a good option, but what about the local storage on the “compute” nodes? Usually people forget that the same MapReduce stores intermediate data on the local storage, and the same Spark puts all the shuffle intermediate data to the local storage. Plus the same Hive and Pig are translated into MR or Tez or Spark, storing their intermediate results on local storage as well. Thus even “compute” nodes should have enough local storage, and the safest option is to have the same amount of raw …read more

MPP vs Hadoop Talk

Today I had a great talk at the Hadoop User Group Ireland meetup in Dublin, and it was an adapted and refactored version of the article on the same subject, MPP vs Hadoop. Here are the slides:

Feel free to comment and share your opinion on this subject

…read more