Persist RDDs which are used repeatedly in your application
A dataset which is persisted, each node (executor) stores any partitions that it computes in memory and reuses them in other actions in the dataset or datasets derived from it. This allows future actions to be much faster (>10x). Caching is key for iterative algorithms and fast interactive use. Use rdd.persist()
- On RDDs which are used in loops.
- On RDDs which are used in more than one DAG
- When the upfront cost of creating the RDD is high
- ex: loading table scan data from Cassandra and filtering it on a non key column
Remarks:
- Intermediate RDDs are automatically persisted in shuffle operations.
- rdd.cache == rdd.persist(StorageLevel.MEMORY_ONLY)
StorageLevels:
Your choice of storage levels should be in following order:
- MEMORY_ONLY: Deserialized Java Objects in memory.
- MEMORY_ONLY_SER: Serialized objects in memory. More space efficient
- MEMORY_AND_DISK_SER: same as MEMORY_AND_DISK but data is in serialized format. Space efficient.
- MEMORY_AND_DISK: same as MEMORY_ONLY, RDD that does not fit in memory will be spilled to disk. Use this when you know your RDD is holding more data than your executor memory
- MEMORY_ONLY_2 & MEMORY_AND_DISK_2: same as above levels, but replicate each partition on 2 cluster nodes
- OFF_HEAP (experimental) | avoids gc. Ensure enough OffHeap is available for executor (overhead memory)
Task Parallelism
Parallelism in spark is directly tied to number of task, which in turn is tied to number of partitions in your RDD
- Too much Parallelism where each task is sub-second has detrimental effect due to the tax paid on creating and destroying tasks
- Too little Parallelism where each task is multi minute, suffers from not using the parallel computing efficiencies.
- Find a balance between number of tasks vs task-durations.
- Spark default partitioner is HashPartitioner . You can use your own partitioner for custom partitioning logic.
Use Repartition & Coalesce for tweaking Parallelism.
Repartition
Repartition is a wide transformation. Involves shuffles.
- Use it when you have partition skews (few partitions magnitude larger than rest)
- Pick repartition key which has high Cardinality
- If there no such key in your RDD with high cardinality, add prefix/suffix strings with high cardinality (Random UUIDs, timestamps etc)
Coalesce
Coalesce is a narrow transformation. Merges multiple partitions that exist on the same executor.
- Use this to reduce parallelism in your Stage.
- you could use Coalesce(numOfNewPartitions, shuffle = true ) to force shuffle if partitons need to be equally distributed
Avoid GroupByKey
UseReduceByKeyinstead.
GroupByKey involves shuffle. Where as ReduceByKey goes over records per partition minimizing shuffle data. Similarly, consider using
- combineByKey: can be used when you are combining elements but your return type differs from your input value type.
- foldByKey: merges the values for each key using an associative function and a neutral "zero value".
Map vs MapPartitions
if map operation takes a function which involves an initialization which is time consuming, considering using mapPartitions, where you could perform the init per partition not per record. example below.
def mLine(line:String)={
val parser=new CSVParser('\t')
parser.parseLine(line)
}
...
// this will create a new parser object for each element in myRDD
...myRDD.map(mLine(_))
def pLines(lines:Iterator[String])={
val parser=new CSVParser('\t')
lines.map(parser.parseLine(_).size)
}
// this will create only one parser object for each partition
myRDD.mapPartitions(pLines)
Note: You could also consider mapWith function, which takes an extra function to the map, which is called per every partition.
Avoid APIs which involve driver
Unless absolutely needed and the Dataset size is small.
Getting data to driver causes bottlenecks in your Application.
Example of apis which bring data to Driver:collect, countByKey, countByValue, collectAsMap, count
For finding RDD count, useAccumulators
Accumulators are distributed counters. Each executor can update the accumulator, but only driver can read the value. Accumulators are great for keeping track of counts in your Application RDDs and can be used effectively to replace count() api.
for finding if an RDD is empty or not, use RDD.take(1).length
RDD.count() > 0 vs RDD.take(1).length == 1
If your application requires a logic where you need to check if an RDD has non-zero records, use RDD.take(1).length instead of RDD.count(). This safe-guards against any perf impact due to large RDD.
Use_Broadcast_variables For data used across executors
If all executors need certain dataset which has to be loaded from storage and used across stages, load the dataset once in driver and broadcast it to each executor.
Broadcast variables are global immutable data available for all executors for their lifetime. This can improve the performance of the application as there is no need for each executor to load the data from source, separately.
Using CoGroup on paired RDDs
Avoid_flatMap-join-groupByKey_pattern.
When two datasets are already grouped by key and you want to join them and keep them grouped, you can just usecogroup. That avoids all the overhead associated with unpacking and repacking the groups.
val rdd1 = sc.parallelize(Seq(("k1", 1),("k2",2),("k3,3")))
val rdd2 = sc.parallelize(Seq(("k1", 11),("k2",12),("k3,13")))
val grouped = rdd1.cogroup(rdd2)
// ((k1,(1,11)), (k2, (2,12)), (k3, (3,13)))
Minimize shuffles
Reducing shuffles is easier said than done :). Shuffle metrics are available in Spark UI underApplication UI -> Stages -> Shuffle Read & Shuffle Write metrics
Analyzing your transformation DAG
caling rdd.toDebugString prints out the DAG, with widetransformations highlighed with a**
ex:
scala> val a = sc.parallelize(Array(1,2,3)).distinct
scala> a.toDebugString
MappedRDD[5] at distinct at <console>:12 (1 partitions)
MapPartitionsRDD[4] at distinct at <console>:12 (1 partitions)
**ShuffledRDD[3] at distinct at <console>:12 (1 partitions)** // widetransformation
MapPartitionsRDD[2] at distinct at <console>:12 (1 partitions)
MappedRDD[1] at distinct at <console>:12 (1 partitions)
ParallelCollectionRDD[0] at parallelize at <console>:12 (1 partitions)
List of wide transformations which_might_cause shuffles:
- cogroup, groupWith, groupByKey
- join, leftOuterJoin, rightOuterJoin,
- ReduceByKey, combineByKey, sortByKey
- distinct, intersection
- repartition, Coalesce(shuffle = true)
Avoid Skewed Partitions
Skewed partitions occur when data is not equally distributed among partitions.
- This could happen when the RDD partioned by keys which do not have high cardinality.
- Or the default HashPartitioner does not provide good distribution.
Partition Skews are usually associated with
- laggard tasks in stages UI tab
- If RDDs are persisted, Skews are visible in storage tab as well where certain executors have data size skew.
To resolve partition skews:
- Repartition your data appropriately, with high cardinality keys
- When such keys are not available, add a high cardinality prefix of suffix
- Write your own partitioner by extending Partitioner class. call rdd.partitionBy with then custom partitioner
class MyCustomPartitioner(numParts: Int) extends Partitioner {
override def getPartitions(key: Any ) : Int = ...
override def equals(other: Any): Boolean = ...
}
// use custom partitioner
rdd.partitionBy(new MyCustomPartitioner(100))
Handle Bad Input gracefully
Alwaysexpect bad records in your input data (Streaming or Batch)
- Try Catch block for every serialization/deserialized functions (ex: json, xml, csv)
- While iterating over each record, call helper functions with additional safe-guards (ex: rdd.map(safemap( _ )))
Spark + Cassandra Programming
Pushdown predicates
While fetching data from cassandra tables, ensure appropriate filter clauses are sent directly to cassandra using pushdown predicates. This helps in reducing the amount of data fetched from cassandra, hence reducing network I/O and vastly improving your Application performance.
It must be noted that only filter on Cassandra Table key columns are pushed down as predicates. Any other column predicates result in a runtime CQL exception.
Pushdown predicate Conditions can be programmed using_Where_api on CassandraTableRDD.
val myTableRdd = sc.loadFromCassandraTable[MyTableCaseClass](keyspace, tableName)
.where("key1 = ?", key1Val)
.where("key2 >= ?", key2Min)
.where("key2 <= ?", key2Max)
Use special functions as needed
CassandraTableRDD provides few special functions which can be used to utilize Cassandra Capabilities in improving performance of your Spark Application. Couple of the Apis below
joinWithCassandra
- Use this to join a spark RDD with a cassandraTable. Each key in source Spark RDD is translated to Spark CQL and hence reduces the amount of data to transferred between Brings only needed data to Spark
cassandraCount
Spark's rdd.count() method brings all rows to be fetched from Cto Spark, adding significant memory and network overhead._cassRdd.cassandraCount()_however performs count() on cassandra and only fetching the numbers to spark. This is highly beneficial if you do not need all data later on in your Application.
Cassandra table schema
Model your Source & Intermediate table schema such that it serves Spark push down predicates (refer above). If more than one jobs are using the source tables with varying query patterns, optimize for the most important/frequent pattern.
Destination table schema should serve CF API or similar queries.
Select only Columns you need
Many a times, developers write one caseclass per Cassandra Table and reuse it in all their Application. This case class usually represents all columns in the table. When we load a cassandraTable with such caseclass, spark brings_all_data from cassandra table to spark. This is undesirable when the Application uses only subset of the columns from the source table. Recommendation is to use specific caseClasses per Application needs. Do not be lazy and use same caseclass for all reads from a Cassandra Table.
Persist data
Its ok to be aggressive with persist when you know that the data is reasonable sized for your executor memory. Especially when the upfront cost of prepping the RDD is high. Refer above persistent_Persist RDDs which are used repeatedly in your application_topic for storegeLevel recommendations.
Spark + Kafka Programming
Parallelism of reading data from kafka topic
Always be aware of your kafka topic's partition count. Reach out to platform team when in doubt.
- Always keep Number of executors in your Streaming App <= partitions for given topic. Any more executors will be useless.
- For maximum Parallelism, executors should be = topic partitions
Checkpointing
Always checkpoint your offsets. CF platform baseclass uses custom checkpointing to persist topic offsets into Cassandra table. This is enabled by default. Metadata checkpointing only available in CommonConnector currently.
Keep batch time under mini-batch frequency
Avoid scheduling delays and backlogs in streaming jobs, by ensuring that the processing time per batch is < mini-batch-frequency. We can control the amount of data ingested by spark streaming application's batch, by using throttling settings:
- Static-throttling: spark.streaming.kafka.maxRatePerPartition = <number-of-events-to-be-read-by-each-executor>
- dynamic-throttling : spark.streaming.backpressure.enabled = true. automatically adjusts ingestion rate when scheduling delays are found.
NOTE: When restarting job, consider offset selection of earliest vs latest according to your scenario.