`

Spark源码分析13-Tuning Spark

 
阅读更多

We can refer to the link http://spark.incubator.apache.org/docs/latest/tuning.html  for  detail tuning document.

 

     After tuning, spark can process 200M logs every minutes  in one single work with 1G memory. The time for finish process logs is about 35-40 seconds every duration.

     There is some points I do the tuning.

    

    I change the logs, every use conation 20KB logs, so 5000 uses will have 100M logs. When the logs of every use is less, the concurrent process speed will have improvement

    JavaDStream<String> stringStream = jsc.socketTextStream("0.0.0.0", ConfigUtil.getInt(ConfigUtil.KEY_SPARK_REMOTE_FLUME_LISTENER_PORT), StorageLevel.MEMORY_AND_DISK())

 

Add  StorageLevel.MEMORY_AND_DISK() when create stream, the default is StorageLevel.MEMORY_AND_DISK2(), will use double memory.

    Replace code String user = currentLine.substring(start,end);   with String user = new String(currentLine.substring(start,end).toCharArray());

Because substring will create a new String, it will use the same char array with currentLine, thus when some object have refer to user object and currentLine is useless, currentLine also can’t be  recycle.

 

    update Spark configure. Below is the latest configuration, when the data is different, sometimes we need update it.

 

        sparkConf.setMaster(ConfigUtil.getString(ConfigUtil.KEY_SPARK_REMOTE_MASTER)).setAppName(appName)

                .setJars(new String[]{ConfigUtil.getString(ConfigUtil.KEY_SPARK_REMOTE_JAR_LOCATION)})

                .set("spark.executor.memory", "1024m")

                .set("spark.streaming.unpersist", "true")

                .set("spark.rdd.compress", "true")

                .set("spark.default.parallelism", "12")

                .set("spark.storage.memoryFraction", "0.3")

                .set("spark.cleaner.ttl", "1200")

                        //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

                        // .set("spark.kryo.registrator", "com.seven.oi.spark.KryoRegistratorEx")

                        //.setExecutorEnv("SPARK_JAVA_OPTS","-XX:NewRatio=1 -XX:+UseCompressedStrings -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps")

                //.setExecutorEnv("SPARK_JAVA_OPTS", "-XX:NewRatio=1 -XX:+UseCompressedStrings")

                        //use fastutil to store map

                .set("spark.shuffle.memoryFraction", "0.3");

 

   set spark.rdd.compress  = true, thus can use less memory 

   when out of memory happen, we can also consider increment spark.default.parallelism. 

   sometimes we can use org.apache.spark.serializer.KryoSerializer,  Kryo is significantly faster and more compact than Java serialization. default is Java serialization. (Need more test, current I can’t find any different in my test).

   When there is a lot of young GC, we can consider increment the Young generation.   Set  -XX:NewRatio 

   When the JDK is 64 bit,  we can use -XX:+UseCompressedStrings to compress String. I found JDK with 64 bit will use more memory then JDK with 32 bit.

   We also can consider increment spark.shuffle.memoryFraction when have a lot of shuffle operation 

   If you want spark submit more tasks every time you can increment SPARK_WORKER_CORES. spark will submit tasks less than SPARK_WORKER_CORES every time

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics