Browse Source

kafka consumer 测试类修改

rewerma 7 years ago
parent
commit
19f4c5ae1b

+ 1 - 3
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java

@@ -28,11 +28,11 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
         final ExecutorService executor = Executors.newFixedThreadPool(1);
 
         final KafkaCanalConnector connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
-        connector.subscribe();
 
         executor.submit(new Runnable() {
             @Override
             public void run() {
+                connector.subscribe();
                 while (running) {
                     try {
                         Message message = connector.getWithoutAck(3L, TimeUnit.SECONDS);
@@ -53,8 +53,6 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
         running = false;
         executor.shutdown();
         logger.info("shutdown completed");
-
-        sleep(10000);
     }
 
 }