|
@@ -7,6 +7,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.ServiceLoader;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
@@ -266,9 +267,10 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
|
boolean raw = isRaw(canalInstance.getEventStore());
|
|
|
List entrys = null;
|
|
|
if (raw) {
|
|
|
- entrys = Lists.transform(events.getEvents(), Event::getRawEntry);
|
|
|
+ // new list
|
|
|
+ entrys = events.getEvents().stream().map(Event::getRawEntry).collect(Collectors.toList());
|
|
|
} else {
|
|
|
- entrys = Lists.transform(events.getEvents(), Event::getEntry);
|
|
|
+ entrys = events.getEvents().stream().map(Event::getEntry).collect(Collectors.toList());
|
|
|
}
|
|
|
if (logger.isInfoEnabled()) {
|
|
|
logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
|
|
@@ -348,9 +350,10 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
|
boolean raw = isRaw(canalInstance.getEventStore());
|
|
|
List entrys = null;
|
|
|
if (raw) {
|
|
|
- entrys = Lists.transform(events.getEvents(), Event::getRawEntry);
|
|
|
+ // new list
|
|
|
+ entrys = events.getEvents().stream().map(Event::getRawEntry).collect(Collectors.toList());
|
|
|
} else {
|
|
|
- entrys = Lists.transform(events.getEvents(), Event::getEntry);
|
|
|
+ entrys = events.getEvents().stream().map(Event::getEntry).collect(Collectors.toList());
|
|
|
}
|
|
|
if (logger.isInfoEnabled()) {
|
|
|
logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
|