瀏覽代碼

fixed bug

agapple 6 年之前
父節點
當前提交
531b535228

+ 17 - 1
client-adapter/launcher/src/main/bin/startup.sh

@@ -40,6 +40,22 @@ if [ -z "$JAVA" ]; then
   fi
 fi
 
+case "$#" 
+in
+0 ) 
+  ;;
+2 ) 
+  if [ "$1" = "debug" ]; then
+    DEBUG_PORT=$2
+    DEBUG_SUSPEND="n"
+    JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
+  fi
+  ;;
+* )
+  echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
+  exit;;
+esac
+
 str=`file -L $JAVA | grep 64-bit`
 if [ -n "$str" ]; then
 	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
@@ -60,7 +76,7 @@ echo "cd to $bin_abs_path for workaround relative path"
 cd $bin_abs_path
 
 echo CLASSPATH :$CLASSPATH
-$JAVA $JAVA_OPTS $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication 1>>$base/logs/adapter.log 2>&1 &
+$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication 1>>$base/logs/adapter.log 2>&1 &
 echo $! > $base/bin/adapter.pid
 
 echo "cd to $current_path for continue"

+ 1 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -13,6 +13,7 @@ spring:
     default-property-inclusion: non_null
 
 canal.conf:
+  mode: tcp # kafka rocketMQ
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
 #  mqServers: 127.0.0.1:9092 #or rocketmq
@@ -23,7 +24,6 @@ canal.conf:
   timeout:
   accessKey:
   secretKey:
-  mode: tcp # kafka rocketMQ
 #  srcDataSources:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true

+ 2 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -12,6 +12,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.slf4j.Logger;
@@ -83,6 +84,7 @@ public class RocketMQCanalConnector implements CanalMQConnector {
         }
         rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely());
         rocketMQConsumer.setVipChannelEnabled(false);
+        rocketMQConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
         if (!StringUtils.isBlank(nameServer)) {
             rocketMQConsumer.setNamesrvAddr(nameServer);
         }

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -126,7 +126,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 producerTmp.commitTransaction();
             }
             callback.commit();
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error(e.getMessage(), e);
             if (kafkaProperties.getTransaction()) {
                 producerTmp.abortTransaction();

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -75,7 +75,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 send(destination, destination.getTopic(), data);
             }
             callback.commit();
-        } catch (Exception e) {
+        } catch (Throwable e) {
             callback.rollback();
         }
     }