Monthly Archive: January 2016

Spark Memory Management

Spark Memory Management 1.6.0+

Starting Apache Spark version 1.6.0, memory management model has changed. The old memory management model is implemented by StaticMemoryManager class, and now it is called “legacy”. “Legacy” mode is disabled by default, which means that running the same code on Spark 1.5.x and 1.6.0 would result in different behavior, be careful with that. For compatibility, you can enable the “legacy” model with spark.memory.useLegacyMode parameter, which is turned off by default.

Previously I have described the “legacy” model of memory management in this article about Spark Architecture almost one year ago. Also I have written an article on Spark Shuffle implementations that briefly touches memory management topic as well.

This article describes new memory management model used in Apache Spark starting version 1.6.0, which is implemented as UnifiedMemoryManager.

Long story short, new memory management model looks like this:

Apache Spark Unified Memory Manager introduced in v1.6.0+

You can see 3 main memory regions on the diagram:

Reserved Memory. This is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB, which means that this 300MB of RAM does not participate in Spark memory region size calculations, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production. Be aware, this memory is only called “reserved”, in fact it is not used by Spark in any way, but it sets the limit on what you can allocate for Spark usage. Even if you want to give all the Java Heap for Spark to cache your data, you won’t be able to do so as this “reserved” part would remain spare (not really spare, it would store lots of Spark internal objects). For your information, if you don’t give Spark executor at least 1.5 * Reserved Memory = 450MB heap, it will fail with “please use larger heap size” error message.
User Memory. This is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations. For example, you can rewrite Spark aggregation by using mapPartitions transformation maintaining hash table for this aggregation to run, which would consume so called User Memory. In Spark 1.6.0 the size of this memory pool can be calculated as (“Java Heap” – “Reserved Memory”) * (1.0 – spark.memory.fraction), which is by default equal to (“Java Heap” – 300MB) * 0.25. For example, with 4GB heap you would have 949MB of User Memory. And again, this is the User Memory and its completely up to you what would be stored in this RAM and how, Spark makes completely no accounting on what you do there and whether you respect this boundary or not. Not respecting this boundary in your code might cause OOM error.
Spark Memory. Finally, this is the memory pool managed by Apache Spark. Its size can be calculated as (“Java Heap” – “Reserved Memory”) * spark.memory.fraction, and with Spark 1.6.0 defaults it gives us (“Java Heap” – 300MB) * 0.75. For example, with 4GB heap this pool would be 2847MB in size. This whole pool is split into 2 regions – Storage Memory and Execution Memory, and the boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5. The advantage of this new memory management scheme is that this boundary is not static, and in case of memory pressure the boundary would be moved, i.e. one region would grow by borrowing space from another one. I would discuss the “moving” this boundary a bit later, now let’s focus on how this memory is being used:
Storage Memory. This pool is used for both storing Apache Spark cached data and for temporary space serialized data “unroll”. Also all the “broadcast” variables are stored there as cached blocks. In case you’re curious, here’s the code of unroll. As you may see, it does not require that enough memory for unrolled block to be available – in case there is not enough memory to fit the whole unrolled partition it would directly put it to the drive if desired persistence level allows this. As of “broadcast”, all the broadcast variables are stored in cache with MEMORY_AND_DISK persistence level.
Execution Memory. This pool is used for storing the objects required during the execution of Spark tasks. For example, it is used to store shuffle intermediate buffer on the Map side in memory, also it is used to store hash table for hash aggregation step. This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).

Ok, so now let’s focus on the moving boundary between Storage Memory and Execution Memory. Due to nature of Execution Memory, you cannot forcefully evict blocks from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found. But it is not so for the Storage Memory – it is just a cache of blocks stored in RAM, and if we evict the block from there we can just update the block metadata reflecting the fact this block was evicted to HDD (or simply removed), and trying to access this block Spark would read it from HDD (or recalculate in case your persistence level does not allow to spill on HDD).

So, we can forcefully evict the block from Storage Memory, but cannot do so from Execution Memory. When Execution Memory pool can borrow some space from Storage Memory? It happens when either:

There is free space available in Storage Memory pool, i.e. cached blocks don’t use all the memory available there. Then it just reduces the Storage Memory pool size, increasing the Execution Memory pool.
Storage Memory pool size …read more

Community Choice Winner Blog: Machine Learning in Big Data – Look Forward or Be Left Behind

Machine Learning in Big Data – Look Forward or Be Left Behind
Bill Porto, Senior Engineering Analyst, RedPoint Global Inc.

Computers? Not so much. One of the biggest developments – and challenges – in technology has been the advent of machine learning. But even as we make major strides in the age of Big Data, applying machine learning to our data is something that few have effectively achieved. Creating models to predict customer response or to segment customer data into set categories are “predictable” use cases. It’s a start – taking data, discovering what it can tell you, and creating a model and use for it. But that’s not enough…

If we want to deliver a real cost advantage for the enterprise with machine learning, there are larger, mission-critical issues to address. These issues focus on model choice, viability horizon, practical design alternatives, learning from on-the-fence model factors, and opportunities for automating access to changing data and netting-out error and noise.

This April at the Hadoop Summit in Dublin, I am delighted to have the opportunity to share with you how continual, adaptive optimization is the key to maintaining a leadership position in satisfying customer demand. As senior analytics engineer at RedPoint Global, I’m focused on developing automated business optimization software that incorporates evolutionary optimization, neural networks, and a host of other non-traditional machine learning techniques.

Hadoop Summit Europe 2016 provides a great opportunity to share my experience in machine learning and Big Data with attendees who want to really move ahead with their own machine learning mastery.

We’ll explore:

What the applicability of machine learning tools are
How these processes should and can be optimized to predict, segment, and ultimately drive more predictable outcomes from business decisions
Approaches for populating and tuning your models
The tools and processes you will need to truly impact sustainable revenue for your enterprise

I’m looking forward to showing how to apply predictive modelling and optimization to harness the full power and potential of your data at the Hadoop Summit Europe 2016.

The post Community Choice Winner Blog: Machine Learning in Big Data – Look Forward or Be Left Behind appeared first on Hortonworks.

…read more

Community Choice Winner Blog: Overview of Apache Flink – The 4G of Big Data Analytics Frameworks

Overview of Apache Flink: the 4G of Big Data Analytics Frameworks
Author: Slim Baltagi, Director of Big Data engineering, Capital One

I want to thank those of you who voted for my proposal and I look forward to meeting many of you in Dublin. I’ll be around for the conference and would gladly welcome any follow on conversations.

About me

I am currently a Director of Big Data engineering at Capital One. Capital One is a leading consumer and commercial banking institution conducting business in the US, Canada and the U.K.

I have over 18 years of IT and business experience and I spent the last 5 years of my life Hadooping and more recently Sparking and Flinking! I enjoy evangelizing Big Data technologies by speaking at Big Data events and maintaining a blog and a Knowledge Base on many Apache projects: Hadoop, Spark, Flink… With some fellow squirrels, I also run Apache Flink Meetups in New York City, Chicago, Washington DC, Dallas/Fort Worth, Boston and Paris.

My session is an introductory level talk about Apache Flink: a multi-purpose Big Data analytics framework leading a movement towards the unification of batch and stream processing or stream processing-first in the open source. With the many technical innovations Apache Flink brings along with its unique vision and philosophy, it is considered the 4 G (4th Generation) of Big Data Analytics frameworks providing the only hybrid (Real-Time Streaming + Batch) open source distributed data processing engine supporting many use cases: Real-Time streaming, batch, machine learning and graph processing.

After attending my talk, you will know more about:

What is Apache Flink stack? Its streaming dataflow execution engine, APIs and domain-specific libraries for batch, streaming, machine learning and graph processing.

How Apache Flink integrates with Hadoop and other open source tools for data input and output as well as deployment?

Why Apache Flink is an alternative to Apache Hadoop MapReduce, Apache Storm and Apache Spark?

How Apache Flink is used at Capital One and who else adopted Apache Flink?

Where to learn more about Apache Flink?
To get a preview of my session at the 2016 Hadoop Summit in Dublin, I would like to suggest a couple related talks that I gave in 2015:

2015 Big Data Scala By the Bay, San Francisco, US: Why Apache Flink is the 4G of Big Data Analytics Frameworks?

2015 Flink Forward, Berlin, Germany: Flink and Spark Similarities and Differences

I would like also to suggest slide decks of a few talks, which I gave about Apache Flink, at

My talk is an introductory talk open to technical and non-technical people alike. I look forward to meeting you at the Hadoop Summit that will take place in Dublin, Ireland in April 13-14, 2016.

The post Community Choice Winner Blog: Overview of Apache Flink – The 4G of Big Data Analytics Frameworks appeared first on Hortonworks.

…read more