Browse Source

Merge pull request #737 from lcybo/master

Parallel参数细节的完善
agapple 7 years ago
parent
commit
3308bf0662

+ 3 - 1
deployer/src/main/resources/canal.properties

@@ -53,7 +53,9 @@ canal.instance.get.ddl.isolation = false
 
 # parallel parser config
 canal.instance.parser.parallel = true
-canal.instance.parser.parallelThreadSize = 16
+## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
+#canal.instance.parser.parallelThreadSize = 16
+## disruptor ringbuffer size, must be power of 2
 canal.instance.parser.parallelBufferSize = 256
 
 #################################################

+ 1 - 1
deployer/src/main/resources/spring/default-instance.xml

@@ -195,7 +195,7 @@
 		
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
-		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 1 - 1
deployer/src/main/resources/spring/file-instance.xml

@@ -180,7 +180,7 @@
 		
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
-		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 2 - 2
deployer/src/main/resources/spring/group-instance.xml

@@ -169,7 +169,7 @@
 		
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
-		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 	
@@ -268,7 +268,7 @@
 		
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
-		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 1 - 1
deployer/src/main/resources/spring/local-instance.xml

@@ -140,7 +140,7 @@
 		
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
-		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 1 - 1
deployer/src/main/resources/spring/memory-instance.xml

@@ -168,7 +168,7 @@
 		
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
-		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 7 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -90,9 +90,9 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
 
     protected boolean                                isGTIDMode                 = false;                                   // 是否是GTID模式
     protected boolean                                parallel                   = true;                                    // 是否开启并行解析模式
-    protected int                                    parallelThreadSize         = Runtime.getRuntime()
+    protected Integer                                parallelThreadSize         = Runtime.getRuntime()
                                                                                     .availableProcessors() * 60 / 100;     // 60%的能力跑解析,剩余部分处理网络
-    protected int                                    parallelBufferSize         = 16 * parallelThreadSize;
+    protected int                                    parallelBufferSize         = 256;                                     // 必须为2的幂
     protected MultiStageCoprocessor                  multiStageCoprocessor;
 
     protected abstract BinlogParser buildParser();
@@ -595,11 +595,13 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         return parallelThreadSize;
     }
 
-    public void setParallelThreadSize(int parallelThreadSize) {
-        this.parallelThreadSize = parallelThreadSize;
+    public void setParallelThreadSize(Integer parallelThreadSize) {
+        if (parallelThreadSize != null) {
+            this.parallelThreadSize = parallelThreadSize;
+        }
     }
 
-    public int getParallelBufferSize() {
+    public Integer getParallelBufferSize() {
         return parallelBufferSize;
     }