1
0
Эх сурвалжийг харах

= code cleanup

  - for loop replaceable with enhanced 'for' loop
  - using diamond in generic type
  - collapse try catch exception
  - using try-with-resource
  - using  List#sort instead of Collections#sort
  - replaced lambda with method reference
  - using native compare method to compare numbers
  - using switch instead of if for in ESSyncUtil
zavakid 4 жил өмнө
parent
commit
df24596092
33 өөрчлөгдсөн 265 нэмэгдсэн , 353 устгасан
  1. 1 3
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/connector/SimpleAdminConnector.java
  2. 2 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/bind/DefaultPropertyNamePatternsMatcher.java
  3. 2 5
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/common/YamlProcessor.java
  4. 16 21
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  5. 156 141
      client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/support/ESSyncUtil.java
  6. 33 38
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduEtlService.java
  7. 1 10
      common/src/main/java/com/alibaba/otter/canal/common/utils/AddressUtils.java
  8. 1 4
      common/src/main/java/com/alibaba/otter/canal/common/utils/JsonUtils.java
  9. 2 8
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/filter/AviaterRegexFilter.java
  10. 4 4
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/CanalMessageSerializerUtil.java
  11. 1 7
      dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java
  12. 2 2
      deployer/src/main/java/com/alibaba/otter/canal/deployer/admin/CanalAdminController.java
  13. 2 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MysqlGTIDSet.java
  14. 1 1
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/UUIDSet.java
  15. 2 8
      filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java
  16. 1 1
      meta/src/main/java/com/alibaba/otter/canal/meta/ZooKeeperMetaManager.java
  17. 6 16
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  18. 3 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  19. 0 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  20. 1 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java
  21. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java
  22. 2 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/HttpHelper.java
  23. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java
  24. 1 7
      parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogEventPerformanceTest.java
  25. 2 10
      parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java
  26. 5 5
      protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java
  27. 2 2
      protocol/src/main/java/com/alibaba/otter/canal/protocol/SecurityUtil.java
  28. 4 4
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java
  29. 4 4
      server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java
  30. 1 1
      store/src/main/java/com/alibaba/otter/canal/store/model/Events.java
  31. 2 5
      store/src/test/java/com/alibaba/otter/canal/store/memory/buffer/MemoryEventStoreMemBatchTest.java
  32. 1 2
      store/src/test/java/com/alibaba/otter/canal/store/memory/buffer/MemoryEventStoreMultiThreadTest.java
  33. 2 5
      store/src/test/java/com/alibaba/otter/canal/store/memory/buffer/MemoryEventStorePutAndGetTest.java

+ 1 - 3
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/connector/SimpleAdminConnector.java

@@ -108,9 +108,7 @@ public class SimpleAdminConnector implements AdminConnector {
             }
 
             connected = true;
-        } catch (IOException e) {
-            throw new ServiceException(e);
-        } catch (NoSuchAlgorithmException e) {
+        } catch (IOException | NoSuchAlgorithmException e) {
             throw new ServiceException(e);
         }
     }

+ 2 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/bind/DefaultPropertyNamePatternsMatcher.java

@@ -70,8 +70,8 @@ class DefaultPropertyNamePatternsMatcher implements PropertyNamePatternsMatcher
                 return false;
             }
         }
-        for (int i = 0; i < match.length; i++) {
-            if (match[i]) {
+        for (boolean b : match) {
+            if (b) {
                 return true;
             }
         }

+ 2 - 5
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/common/YamlProcessor.java

@@ -130,8 +130,7 @@ public abstract class YamlProcessor {
             if (logger.isDebugEnabled()) {
                 logger.debug("Loading from YAML: " + resource);
             }
-            Reader reader = new UnicodeReader(resource.getInputStream());
-            try {
+            try (Reader reader = new UnicodeReader(resource.getInputStream())) {
                 for (Object object : yaml.loadAll(reader)) {
                     if (object != null && process(asMap(object), callback)) {
                         count++;
@@ -142,10 +141,8 @@ public abstract class YamlProcessor {
                 }
                 if (logger.isDebugEnabled()) {
                     logger.debug(
-                        "Loaded " + count + " document" + (count > 1 ? "s" : "") + " from YAML resource: " + resource);
+                            "Loaded " + count + " document" + (count > 1 ? "s" : "") + " from YAML resource: " + resource);
                 }
-            } finally {
-                reader.close();
             }
         } catch (IOException ex) {
             handleProcessError(resource, ex);

+ 16 - 21
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -6,6 +6,7 @@ import java.io.InputStreamReader;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -317,10 +318,8 @@ public class ExtensionLoader<T> {
                 while (urls.hasMoreElements()) {
                     URL url = urls.nextElement();
                     try {
-                        BufferedReader reader = null;
-                        try {
-                            reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
-                            String line = null;
+                        try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), StandardCharsets.UTF_8))) {
+                            String line;
                             while ((line = reader.readLine()) != null) {
                                 final int ci = line.indexOf('#');
                                 if (ci >= 0) line = line.substring(0, ci);
@@ -340,10 +339,10 @@ public class ExtensionLoader<T> {
                                             // classLoader);
                                             if (!type.isAssignableFrom(clazz)) {
                                                 throw new IllegalStateException(
-                                                    "Error when load extension class(interface: " + type
-                                                                                + ", class line: " + clazz.getName()
-                                                                                + "), class " + clazz.getName()
-                                                                                + "is not subtype of interface.");
+                                                        "Error when load extension class(interface: " + type
+                                                                + ", class line: " + clazz.getName()
+                                                                + "), class " + clazz.getName()
+                                                                + "is not subtype of interface.");
                                             } else {
                                                 try {
                                                     clazz.getConstructor(type);
@@ -361,10 +360,10 @@ public class ExtensionLoader<T> {
                                                             } else if (c != clazz) {
                                                                 cachedNames.remove(clazz);
                                                                 throw new IllegalStateException(
-                                                                    "Duplicate extension " + type.getName() + " name "
-                                                                                                + n + " on "
-                                                                                                + c.getName() + " and "
-                                                                                                + clazz.getName());
+                                                                        "Duplicate extension " + type.getName() + " name "
+                                                                                + n + " on "
+                                                                                + c.getName() + " and "
+                                                                                + clazz.getName());
                                                             }
                                                         }
                                                     }
@@ -373,19 +372,15 @@ public class ExtensionLoader<T> {
                                         }
                                     } catch (Throwable t) {
                                         IllegalStateException e = new IllegalStateException(
-                                            "Failed to load extension class(interface: " + type + ", class line: "
-                                                                                            + line + ") in " + url
-                                                                                            + ", cause: "
-                                                                                            + t.getMessage(),
-                                            t);
+                                                "Failed to load extension class(interface: " + type + ", class line: "
+                                                        + line + ") in " + url
+                                                        + ", cause: "
+                                                        + t.getMessage(),
+                                                t);
                                         exceptions.put(line, e);
                                     }
                                 }
                             } // end of while read lines
-                        } finally {
-                            if (reader != null) {
-                                reader.close();
-                            }
                         }
                     } catch (Throwable t) {
                         logger.error("Exception when load extension class(interface: " + type + ", class file: " + url

+ 156 - 141
client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/support/ESSyncUtil.java

@@ -63,172 +63,187 @@ public class ESSyncUtil {
             return val;
         }
         Object res = null;
-        if ("integer".equals(esType)) {
-            if (val instanceof Number) {
-                res = ((Number) val).intValue();
-            } else {
-                res = Integer.parseInt(val.toString());
-            }
-        } else if ("long".equals(esType)) {
-            if (val instanceof Number) {
-                res = ((Number) val).longValue();
-            } else {
-                res = Long.parseLong(val.toString());
-            }
-        } else if ("short".equals(esType)) {
-            if (val instanceof Number) {
-                res = ((Number) val).shortValue();
-            } else {
-                res = Short.parseShort(val.toString());
-            }
-        } else if ("byte".equals(esType)) {
-            if (val instanceof Number) {
-                res = ((Number) val).byteValue();
-            } else {
-                res = Byte.parseByte(val.toString());
-            }
-        } else if ("double".equals(esType)) {
-            if (val instanceof Number) {
-                res = ((Number) val).doubleValue();
-            } else {
-                res = Double.parseDouble(val.toString());
-            }
-        } else if ("float".equals(esType) || "half_float".equals(esType) || "scaled_float".equals(esType)) {
-            if (val instanceof Number) {
-                res = ((Number) val).floatValue();
-            } else {
-                res = Float.parseFloat(val.toString());
-            }
-        } else if ("boolean".equals(esType)) {
-            if (val instanceof Boolean) {
-                res = val;
-            } else if (val instanceof Number) {
-                int v = ((Number) val).intValue();
-                res = v != 0;
-            } else {
-                res = Boolean.parseBoolean(val.toString());
-            }
-        } else if ("date".equals(esType)) {
-            if (val instanceof java.sql.Time) {
-                DateTime dateTime = new DateTime(((java.sql.Time) val).getTime());
-                if (dateTime.getMillisOfSecond() != 0) {
-                    res = dateTime.toString("HH:mm:ss.SSS");
+        switch (esType) {
+            case "integer":
+                if (val instanceof Number) {
+                    res = ((Number) val).intValue();
                 } else {
-                    res = dateTime.toString("HH:mm:ss");
+                    res = Integer.parseInt(val.toString());
                 }
-            } else if (val instanceof java.sql.Timestamp) {
-                DateTime dateTime = new DateTime(((java.sql.Timestamp) val).getTime());
-                if (dateTime.getMillisOfSecond() != 0) {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
+                break;
+            case "long":
+                if (val instanceof Number) {
+                    res = ((Number) val).longValue();
                 } else {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
+                    res = Long.parseLong(val.toString());
                 }
-            } else if (val instanceof java.sql.Date || val instanceof Date) {
-                DateTime dateTime;
-                if (val instanceof java.sql.Date) {
-                    dateTime = new DateTime(((java.sql.Date) val).getTime());
+                break;
+            case "short":
+                if (val instanceof Number) {
+                    res = ((Number) val).shortValue();
                 } else {
-                    dateTime = new DateTime(((Date) val).getTime());
+                    res = Short.parseShort(val.toString());
                 }
-                if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
-                    && dateTime.getMillisOfSecond() == 0) {
-                    res = dateTime.toString("yyyy-MM-dd");
+                break;
+            case "byte":
+                if (val instanceof Number) {
+                    res = ((Number) val).byteValue();
                 } else {
+                    res = Byte.parseByte(val.toString());
+                }
+                break;
+            case "double":
+                if (val instanceof Number) {
+                    res = ((Number) val).doubleValue();
+                } else {
+                    res = Double.parseDouble(val.toString());
+                }
+                break;
+            case "float":
+            case "half_float":
+            case "scaled_float":
+                if (val instanceof Number) {
+                    res = ((Number) val).floatValue();
+                } else {
+                    res = Float.parseFloat(val.toString());
+                }
+                break;
+            case "boolean":
+                if (val instanceof Boolean) {
+                    res = val;
+                } else if (val instanceof Number) {
+                    int v = ((Number) val).intValue();
+                    res = v != 0;
+                } else {
+                    res = Boolean.parseBoolean(val.toString());
+                }
+                break;
+            case "date":
+                if (val instanceof java.sql.Time) {
+                    DateTime dateTime = new DateTime(((java.sql.Time) val).getTime());
+                    if (dateTime.getMillisOfSecond() != 0) {
+                        res = dateTime.toString("HH:mm:ss.SSS");
+                    } else {
+                        res = dateTime.toString("HH:mm:ss");
+                    }
+                } else if (val instanceof java.sql.Timestamp) {
+                    DateTime dateTime = new DateTime(((java.sql.Timestamp) val).getTime());
                     if (dateTime.getMillisOfSecond() != 0) {
                         res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
                     } else {
                         res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
                     }
-                }
-            } else if (val instanceof Long) {
-                DateTime dateTime = new DateTime(((Long) val).longValue());
-                if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
-                    && dateTime.getMillisOfSecond() == 0) {
-                    res = dateTime.toString("yyyy-MM-dd");
-                } else if (dateTime.getMillisOfSecond() != 0) {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
-                } else {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
-                }
-            } else if (val instanceof String) {
-                String v = ((String) val).trim();
-                if (v.length() > 18 && v.charAt(4) == '-' && v.charAt(7) == '-' && v.charAt(10) == ' '
-                    && v.charAt(13) == ':' && v.charAt(16) == ':') {
-                    String dt = v.substring(0, 10) + "T" + v.substring(11);
-                    Date date = Util.parseDate(dt);
-                    if (date != null) {
-                        DateTime dateTime = new DateTime(date);
+                } else if (val instanceof java.sql.Date || val instanceof Date) {
+                    DateTime dateTime;
+                    if (val instanceof java.sql.Date) {
+                        dateTime = new DateTime(((java.sql.Date) val).getTime());
+                    } else {
+                        dateTime = new DateTime(((Date) val).getTime());
+                    }
+                    if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
+                            && dateTime.getMillisOfSecond() == 0) {
+                        res = dateTime.toString("yyyy-MM-dd");
+                    } else {
                         if (dateTime.getMillisOfSecond() != 0) {
                             res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
                         } else {
                             res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
                         }
                     }
-                } else if (v.length() == 10 && v.charAt(4) == '-' && v.charAt(7) == '-') {
-                    Date date = Util.parseDate(v);
-                    if (date != null) {
-                        DateTime dateTime = new DateTime(date);
+                } else if (val instanceof Long) {
+                    DateTime dateTime = new DateTime(((Long) val).longValue());
+                    if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
+                            && dateTime.getMillisOfSecond() == 0) {
                         res = dateTime.toString("yyyy-MM-dd");
+                    } else if (dateTime.getMillisOfSecond() != 0) {
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
+                    } else {
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
+                    }
+                } else if (val instanceof String) {
+                    String v = ((String) val).trim();
+                    if (v.length() > 18 && v.charAt(4) == '-' && v.charAt(7) == '-' && v.charAt(10) == ' '
+                            && v.charAt(13) == ':' && v.charAt(16) == ':') {
+                        String dt = v.substring(0, 10) + "T" + v.substring(11);
+                        Date date = Util.parseDate(dt);
+                        if (date != null) {
+                            DateTime dateTime = new DateTime(date);
+                            if (dateTime.getMillisOfSecond() != 0) {
+                                res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
+                            } else {
+                                res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
+                            }
+                        }
+                    } else if (v.length() == 10 && v.charAt(4) == '-' && v.charAt(7) == '-') {
+                        Date date = Util.parseDate(v);
+                        if (date != null) {
+                            DateTime dateTime = new DateTime(date);
+                            res = dateTime.toString("yyyy-MM-dd");
+                        }
                     }
                 }
-            }
-        } else if ("binary".equals(esType)) {
-            if (val instanceof byte[]) {
-                Base64 base64 = new Base64();
-                res = base64.encodeAsString((byte[]) val);
-            } else if (val instanceof Blob) {
-                byte[] b = blobToBytes((Blob) val);
-                Base64 base64 = new Base64();
-                res = base64.encodeAsString(b);
-            } else if (val instanceof String) {
-                // 对应canal中的单字节编码
-                byte[] b = ((String) val).getBytes(StandardCharsets.ISO_8859_1);
-                Base64 base64 = new Base64();
-                res = base64.encodeAsString(b);
-            }
-        } else if ("geo_point".equals(esType)) {
-            if (!(val instanceof String)) {
-                logger.error("es type is geo_point, but source type is not String");
-                return val;
-            }
+                break;
+            case "binary":
+                if (val instanceof byte[]) {
+                    Base64 base64 = new Base64();
+                    res = base64.encodeAsString((byte[]) val);
+                } else if (val instanceof Blob) {
+                    byte[] b = blobToBytes((Blob) val);
+                    Base64 base64 = new Base64();
+                    res = base64.encodeAsString(b);
+                } else if (val instanceof String) {
+                    // 对应canal中的单字节编码
+                    byte[] b = ((String) val).getBytes(StandardCharsets.ISO_8859_1);
+                    Base64 base64 = new Base64();
+                    res = base64.encodeAsString(b);
+                }
+                break;
+            case "geo_point":
+                if (!(val instanceof String)) {
+                    logger.error("es type is geo_point, but source type is not String");
+                    return val;
+                }
 
-            if (!((String) val).contains(",")) {
-                logger.error("es type is geo_point, source value not contains ',' separator");
-                return val;
-            }
+                if (!((String) val).contains(",")) {
+                    logger.error("es type is geo_point, source value not contains ',' separator");
+                    return val;
+                }
 
-            String[] point = ((String) val).split(",");
-            Map<String, Double> location = new HashMap<>();
-            location.put("lat", Double.valueOf(point[0].trim()));
-            location.put("lon", Double.valueOf(point[1].trim()));
-            return location;
-        } else if ("array".equals(esType)) {
-            if ("".equals(val.toString().trim())) {
-                res = new ArrayList<>();
-            } else {
-                String value = val.toString();
-                String separator = ",";
-                if (!value.contains(",")) {
-                    if (value.contains(";")) {
-                        separator = ";";
-                    } else if (value.contains("|")) {
-                        separator = "\\|";
-                    } else if (value.contains("-")) {
-                        separator = "-";
+                String[] point = ((String) val).split(",");
+                Map<String, Double> location = new HashMap<>();
+                location.put("lat", Double.valueOf(point[0].trim()));
+                location.put("lon", Double.valueOf(point[1].trim()));
+                return location;
+            case "array":
+                if ("".equals(val.toString().trim())) {
+                    res = new ArrayList<>();
+                } else {
+                    String value = val.toString();
+                    String separator = ",";
+                    if (!value.contains(",")) {
+                        if (value.contains(";")) {
+                            separator = ";";
+                        } else if (value.contains("|")) {
+                            separator = "\\|";
+                        } else if (value.contains("-")) {
+                            separator = "-";
+                        }
                     }
+                    String[] values = value.split(separator);
+                    return Arrays.asList(values);
                 }
-                String[] values = value.split(separator);
-                return Arrays.asList(values);
-            }
-        } else if ("object".equals(esType)) {
-            if ("".equals(val.toString().trim())) {
-                res = new HashMap<>();
-            } else {
-                res = JSON.parseObject(val.toString(), Map.class);
-            }
-        } else {
-            // 其他类全以字符串处理
-            res = val.toString();
+                break;
+            case "object":
+                if ("".equals(val.toString().trim())) {
+                    res = new HashMap<>();
+                } else {
+                    res = JSON.parseObject(val.toString(), Map.class);
+                }
+                break;
+            default:
+                // 其他类全以字符串处理
+                res = val.toString();
+                break;
         }
 
         return res;

+ 33 - 38
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduEtlService.java

@@ -88,45 +88,40 @@ public class KuduEtlService extends AbstractEtlService {
                         Map<String, Object> data = new HashMap<>();
                         for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
                             String mysqlColumnName = entry.getKey();// mysql字段名
-                String kuduColumnName = entry.getValue();// kudu字段名
-                if (kuduColumnName == null) {
-                    kuduColumnName = mysqlColumnName;
-                }
-                Object value = rs.getObject(kuduColumnName);
-                if (value != null) {
-                    data.put(kuduColumnName, value);
-                } else {
-                    data.put(kuduColumnName, null);
-                }
-            }
-            dataList.add(data);
-            idx++;
-            impCount.incrementAndGet();
-            if (logger.isDebugEnabled()) {
-                logger.debug("successful import count:" + impCount.get());
-            }
-            if (idx % kuduMapping.getCommitBatch() == 0) {
-                kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
-                dataList.clear();
-            }
-        }
-        if (!dataList.isEmpty()) {
-            kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
-        }
-        return true;
+                            String kuduColumnName = entry.getValue();// kudu字段名
+                            if (kuduColumnName == null) {
+                                kuduColumnName = mysqlColumnName;
+                            }
+                            Object value = rs.getObject(kuduColumnName);
+                            if (value != null) {
+                                data.put(kuduColumnName, value);
+                            } else {
+                                data.put(kuduColumnName, null);
+                            }
+                        }
+                        dataList.add(data);
+                        idx++;
+                        impCount.incrementAndGet();
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("successful import count:" + impCount.get());
+                        }
+                        if (idx % kuduMapping.getCommitBatch() == 0) {
+                            kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+                            dataList.clear();
+                        }
+                    }
+                    if (!dataList.isEmpty()) {
+                        kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+                    }
+                    return true;
 
-    } catch (SQLException e) {
-        e.printStackTrace();
-        logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
-        errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
-        return false;
-    } catch (KuduException e) {
-        e.printStackTrace();
-        logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
-        errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
-        return false;
-    }
-}           );
+                } catch (SQLException | KuduException e) {
+                    e.printStackTrace();
+                    logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
+                    return false;
+                }
+            });
             return true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);

+ 1 - 10
common/src/main/java/com/alibaba/otter/canal/common/utils/AddressUtils.java

@@ -18,20 +18,11 @@ public class AddressUtils {
     private static final Pattern IP_PATTERN   = Pattern.compile("[0-9]{1,3}(\\.[0-9]{1,3}){3,}");
 
     public static boolean isAvailablePort(int port) {
-        ServerSocket ss = null;
-        try {
-            ss = new ServerSocket(port);
+        try (ServerSocket ss = new ServerSocket(port)) {
             ss.bind(null);
             return true;
         } catch (IOException e) {
             return false;
-        } finally {
-            if (ss != null) {
-                try {
-                    ss.close();
-                } catch (IOException e) {
-                }
-            }
         }
     }
 

+ 1 - 4
common/src/main/java/com/alibaba/otter/canal/common/utils/JsonUtils.java

@@ -71,14 +71,11 @@ public class JsonUtils {
      */
     public static String marshalToString(Object obj, String... fliterFields) {
         final List<String> propertyFliters = Arrays.asList(fliterFields);
-        SerializeWriter out = new SerializeWriter();
-        try {
+        try (SerializeWriter out = new SerializeWriter()) {
             JSONSerializer serializer = new JSONSerializer(out);
             serializer.getPropertyFilters().add((source, name, value) -> !propertyFliters.contains(name));
             serializer.write(obj);
             return out.toString();
-        } finally {
-            out.close();
         }
     }
 

+ 2 - 8
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/filter/AviaterRegexFilter.java

@@ -51,7 +51,7 @@ public class AviaterRegexFilter {
         // 对pattern按照从长到短的排序
         // 因为 foo|foot 匹配 foot 会出错,原因是 foot 匹配了 foo 之后,会返回 foo,但是 foo 的长度和 foot
         // 的长度不一样
-        Collections.sort(list, COMPARATOR);
+        list.sort(COMPARATOR);
         // 对pattern进行头尾完全匹配
         list = completionPattern(list);
         this.pattern = StringUtils.join(list, PATTERN_SPLIT);
@@ -88,13 +88,7 @@ public class AviaterRegexFilter {
 
         @Override
         public int compare(String str1, String str2) {
-            if (str1.length() > str2.length()) {
-                return -1;
-            } else if (str1.length() < str2.length()) {
-                return 1;
-            } else {
-                return 0;
-            }
+            return Integer.compare(str2.length(), str1.length());
         }
     }
 

+ 4 - 4
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/CanalMessageSerializerUtil.java

@@ -34,8 +34,8 @@ public class CanalMessageSerializerUtil {
                         messageSize += CodedOutputStream.computeInt64Size(1, data.getId());
 
                         int dataSize = 0;
-                        for (int i = 0; i < rowEntries.size(); i++) {
-                            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+                        for (ByteString rowEntry : rowEntries) {
+                            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntry);
                         }
                         messageSize += dataSize;
                         messageSize += 1 * rowEntries.size();
@@ -53,8 +53,8 @@ public class CanalMessageSerializerUtil {
                         output.writeRawVarint32(messageSize);
                         // message
                         output.writeInt64(1, data.getId());
-                        for (int i = 0; i < rowEntries.size(); i++) {
-                            output.writeBytes(2, rowEntries.get(i));
+                        for (ByteString rowEntry : rowEntries) {
+                            output.writeBytes(2, rowEntry);
                         }
                         output.checkNoSpaceLeft();
                         return body;

+ 1 - 7
dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java

@@ -12,8 +12,7 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
 public class FetcherPerformanceTest {
 
     public static void main(String args[]) {
-        DirectLogFetcher fetcher = new DirectLogFetcher();
-        try {
+        try (DirectLogFetcher fetcher = new DirectLogFetcher()) {
             Class.forName("com.mysql.jdbc.Driver");
             Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
             Statement statement = connection.createStatement();
@@ -40,11 +39,6 @@ public class FetcherPerformanceTest {
             }
         } catch (Exception e) {
             e.printStackTrace();
-        } finally {
-            try {
-                fetcher.close();
-            } catch (IOException e) {
-            }
         }
     }
 }

+ 2 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/admin/CanalAdminController.java

@@ -192,7 +192,7 @@ public class CanalAdminController implements CanalAdmin {
         Collection<File> files = org.apache.commons.io.FileUtils.listFiles(new File("../logs/canal/"),
             TrueFileFilter.TRUE,
             TrueFileFilter.TRUE);
-        List<String> names = files.stream().map(f -> f.getName()).collect(Collectors.toList());
+        List<String> names = files.stream().map(File::getName).collect(Collectors.toList());
         return Joiner.on(",").join(names);
     }
 
@@ -206,7 +206,7 @@ public class CanalAdminController implements CanalAdmin {
         Collection<File> files = org.apache.commons.io.FileUtils.listFiles(new File("../logs/" + destination + "/"),
             TrueFileFilter.TRUE,
             TrueFileFilter.TRUE);
-        List<String> names = files.stream().map(f -> f.getName()).collect(Collectors.toList());
+        List<String> names = files.stream().map(File::getName).collect(Collectors.toList());
         return Joiner.on(",").join(names);
     }
 

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MysqlGTIDSet.java

@@ -89,8 +89,8 @@ public class MysqlGTIDSet implements GTIDSet {
             // 存在多个GTID时会有回车符
             String[] uuidStrs = gtidData.replaceAll("\n", "").split(",");
             m = new HashMap<>(uuidStrs.length);
-            for (int i = 0; i < uuidStrs.length; i++) {
-                UUIDSet uuidSet = UUIDSet.parse(uuidStrs[i]);
+            for (String uuidStr : uuidStrs) {
+                UUIDSet uuidSet = UUIDSet.parse(uuidStr);
                 m.put(uuidSet.SID.toString(), uuidSet);
             }
         }

+ 1 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/UUIDSet.java

@@ -80,7 +80,7 @@ public class UUIDSet {
             if (equals(o)) {
                 return 1;
             }
-            return start > o.start ? 1 : (start == o.start ? 0 : -1);
+            return Long.compare(start, o.start);
         }
     }
 

+ 2 - 8
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java

@@ -53,7 +53,7 @@ public class AviaterRegexFilter implements CanalEventFilter<String> {
         // 对pattern按照从长到短的排序
         // 因为 foo|foot 匹配 foot 会出错,原因是 foot 匹配了 foo 之后,会返回 foo,但是 foo 的长度和 foot
         // 的长度不一样
-        Collections.sort(list, COMPARATOR);
+        list.sort(COMPARATOR);
         // 对pattern进行头尾完全匹配
         list = completionPattern(list);
         this.pattern = StringUtils.join(list, PATTERN_SPLIT);
@@ -90,13 +90,7 @@ public class AviaterRegexFilter implements CanalEventFilter<String> {
 
         @Override
         public int compare(String str1, String str2) {
-            if (str1.length() > str2.length()) {
-                return -1;
-            } else if (str1.length() < str2.length()) {
-                return 1;
-            } else {
-                return 0;
-            }
+            return Integer.compare(str2.length(), str1.length());
         }
     }
 

+ 1 - 1
meta/src/main/java/com/alibaba/otter/canal/meta/ZooKeeperMetaManager.java

@@ -191,7 +191,7 @@ public class ZooKeeperMetaManager extends AbstractCanalLifeCycle implements Cana
         }
 
         // 找到最小的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        ArrayList<Long> batchIds = new ArrayList<>(nodes.size());
         for (String batchIdString : nodes) {
             batchIds.add(Long.valueOf(batchIdString));
         }

+ 6 - 16
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -81,10 +81,9 @@ public class LocalBinLogConnection implements ErosaConnection {
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         File current = new File(directory, binlogfilename);
 
-        FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
-        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
-        LogContext context = new LogContext();
-        try {
+        try (FileLogFetcher fetcher = new FileLogFetcher(bufferSize)) {
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+            LogContext context = new LogContext();
             fetcher.open(current, binlogPosition);
             context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
             while (running) {
@@ -129,10 +128,6 @@ public class LocalBinLogConnection implements ErosaConnection {
             }
         } catch (InterruptedException e) {
             logger.warn("LocalBinLogConnection dump interrupted");
-        } finally {
-            if (fetcher != null) {
-                fetcher.close();
-            }
         }
     }
 
@@ -226,10 +221,9 @@ public class LocalBinLogConnection implements ErosaConnection {
             throw new CanalParseException("binlog:" + binlogfilename + " is not found");
         }
 
-        FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
-        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
-        LogContext context = new LogContext();
-        try {
+        try (FileLogFetcher fetcher = new FileLogFetcher(bufferSize)) {
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+            LogContext context = new LogContext();
             fetcher.open(current, binlogPosition);
             context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
             while (running) {
@@ -273,10 +267,6 @@ public class LocalBinLogConnection implements ErosaConnection {
             }
         } catch (InterruptedException e) {
             logger.warn("LocalBinLogConnection dump interrupted");
-        } finally {
-            if (fetcher != null) {
-                fetcher.close();
-            }
         }
     }
 

+ 3 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -191,8 +191,7 @@ public class MysqlConnection implements ErosaConnection {
         loadBinlogChecksum();
         sendBinlogDumpGTID(gtidSet);
 
-        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
-        try {
+        try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
             fetcher.start(connector.getChannel());
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
@@ -212,8 +211,6 @@ public class MysqlConnection implements ErosaConnection {
                     break;
                 }
             }
-        } finally {
-            fetcher.close();
         }
     }
 
@@ -229,8 +226,7 @@ public class MysqlConnection implements ErosaConnection {
         sendBinlogDump(binlogfilename, binlogPosition);
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
         ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
-        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
-        try {
+        try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
             fetcher.start(connector.getChannel());
             while (fetcher.fetch()) {
                 accumulateReceivedBytes(fetcher.limit());
@@ -240,8 +236,6 @@ public class MysqlConnection implements ErosaConnection {
                     break;
                 }
             }
-        } finally {
-            fetcher.close();
         }
     }
 
@@ -257,8 +251,7 @@ public class MysqlConnection implements ErosaConnection {
         sendBinlogDumpGTID(gtidSet);
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
         ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
-        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
-        try {
+        try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
             fetcher.start(connector.getChannel());
             while (fetcher.fetch()) {
                 accumulateReceivedBytes(fetcher.limit());
@@ -268,8 +261,6 @@ public class MysqlConnection implements ErosaConnection {
                     break;
                 }
             }
-        } finally {
-            fetcher.close();
         }
     }
 

+ 0 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -245,18 +245,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 if (haController != null && haController instanceof HeartBeatCallback) {
                     ((HeartBeatCallback) haController).onSuccess(costTime);
                 }
-            } catch (SocketTimeoutException e) {
-                if (haController != null && haController instanceof HeartBeatCallback) {
-                    ((HeartBeatCallback) haController).onFailed(e);
-                }
-                reconnect = true;
-                logger.warn("connect failed by ", e);
-            } catch (IOException e) {
-                if (haController != null && haController instanceof HeartBeatCallback) {
-                    ((HeartBeatCallback) haController).onFailed(e);
-                }
-                reconnect = true;
-                logger.warn("connect failed by ", e);
             } catch (Throwable e) {
                 if (haController != null && haController instanceof HeartBeatCallback) {
                     ((HeartBeatCallback) haController).onFailed(e);

+ 1 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -150,11 +150,7 @@ public class DirectLogFetcher extends LogFetcher {
             close(); /* Do cleanup */
             logger.error("Socket timeout expired, closing connection", e);
             throw e;
-        } catch (InterruptedIOException e) {
-            close(); /* Do cleanup */
-            logger.info("I/O interrupted while reading from client socket", e);
-            throw e;
-        } catch (ClosedByInterruptException e) {
+        } catch (InterruptedIOException | ClosedByInterruptException e) {
             close(); /* Do cleanup */
             logger.info("I/O interrupted while reading from client socket", e);
             throw e;

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

@@ -230,7 +230,7 @@ public class BinLogFileQueue {
             }
         }, null));
         // 排一下序列
-        Collections.sort(files, Comparator.comparing(File::getName));
+        files.sort(Comparator.comparing(File::getName));
         return files;
     }
 

+ 2 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/HttpHelper.java

@@ -56,8 +56,7 @@ public class HttpHelper {
         HttpGet httpGet = new HttpGet(uri);
         HttpClientContext context = HttpClientContext.create();
         context.setRequestConfig(config);
-        CloseableHttpResponse response = httpclient.execute(httpGet, context);
-        try {
+        try (CloseableHttpResponse response = httpclient.execute(httpGet, context)) {
             int statusCode = response.getStatusLine().getStatusCode();
             long end = System.currentTimeMillis();
             long cost = end - start;
@@ -69,10 +68,9 @@ public class HttpHelper {
             } else {
                 String errorMsg = EntityUtils.toString(response.getEntity());
                 throw new RuntimeException("requestGet remote error, url=" + uri.toString() + ", code=" + statusCode
-                                           + ", error msg=" + errorMsg);
+                        + ", error msg=" + errorMsg);
             }
         } finally {
-            response.close();
             httpGet.releaseConnection();
         }
     }

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java

@@ -83,7 +83,7 @@ public class PeriodMixedLogPositionManager extends AbstractLogPositionManager {
 
         // 启动定时工作任务
         executorService.scheduleAtFixedRate(() -> {
-            List<String> tasks = new ArrayList<String>(persistTasks);
+            List<String> tasks = new ArrayList<>(persistTasks);
             for (String destination : tasks) {
                 try {
                     // 定时将内存中的最新值刷到zookeeper中,多次变更只刷一次

+ 1 - 7
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogEventPerformanceTest.java

@@ -22,8 +22,7 @@ public class MysqlBinlogEventPerformanceTest {
     protected static Charset charset = Charset.forName("utf-8");
 
     public static void main(String args[]) {
-        DirectLogFetcher fetcher = new DirectLogFetcher();
-        try {
+        try (DirectLogFetcher fetcher = new DirectLogFetcher()) {
             MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
             connector.connect();
             updateSettings(connector);
@@ -49,11 +48,6 @@ public class MysqlBinlogEventPerformanceTest {
             }
         } catch (Exception e) {
             e.printStackTrace();
-        } finally {
-            try {
-                fetcher.close();
-            } catch (IOException e) {
-            }
         }
     }
 

+ 2 - 10
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

@@ -34,8 +34,7 @@ public class MysqlBinlogParsePerformanceTest {
     protected static Charset charset = Charset.forName("utf-8");
 
     public static void main(String args[]) {
-        DirectLogFetcher fetcher = new DirectLogFetcher();
-        try {
+        try (DirectLogFetcher fetcher = new DirectLogFetcher()) {
             MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
             connector.connect();
             updateSettings(connector);
@@ -45,9 +44,7 @@ public class MysqlBinlogParsePerformanceTest {
             Thread thread = new Thread(() -> {
                 try {
                     consumer(buffer);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                } catch (InterruptedException e) {
+                } catch (IOException | InterruptedException e) {
                     e.printStackTrace();
                 }
             });
@@ -59,11 +56,6 @@ public class MysqlBinlogParsePerformanceTest {
             }
         } catch (Exception e) {
             e.printStackTrace();
-        } finally {
-            try {
-                fetcher.close();
-            } catch (IOException e) {
-            }
         }
     }
 

+ 5 - 5
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -18,24 +18,24 @@ public class Message implements Serializable {
 
     private static final long      serialVersionUID = 1234034768477580009L;
     private long                   id;
-    private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
+    private List<CanalEntry.Entry> entries          = new ArrayList<>();
     // row data for performance, see:
     // https://github.com/alibaba/canal/issues/726
     private boolean                raw              = true;
-    private List<ByteString>       rawEntries       = new ArrayList<ByteString>();
+    private List<ByteString>       rawEntries       = new ArrayList<>();
 
     public Message(long id, List<Entry> entries){
         this.id = id;
-        this.entries = entries == null ? new ArrayList<Entry>() : entries;
+        this.entries = entries == null ? new ArrayList<>() : entries;
         this.raw = false;
     }
 
     public Message(long id, boolean raw, List entries){
         this.id = id;
         if (raw) {
-            this.rawEntries = entries == null ? new ArrayList<ByteString>() : entries;
+            this.rawEntries = entries == null ? new ArrayList<>() : entries;
         } else {
-            this.entries = entries == null ? new ArrayList<Entry>() : entries;
+            this.entries = entries == null ? new ArrayList<>() : entries;
         }
         this.raw = raw;
     }

+ 2 - 2
protocol/src/main/java/com/alibaba/otter/canal/protocol/SecurityUtil.java

@@ -84,8 +84,8 @@ public class SecurityUtil {
      */
     public static String byte2HexStr(byte[] b) {
         StringBuilder hs = new StringBuilder();
-        for (int n = 0; n < b.length; n++) {
-            String hex = (Integer.toHexString(b[n] & 0XFF));
+        for (byte value : b) {
+            String hex = (Integer.toHexString(value & 0XFF));
             if (hex.length() == 1) {
                 hs.append("0" + hex);
             } else {

+ 4 - 4
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -161,8 +161,8 @@ public class SessionHandler extends SimpleChannelHandler {
                             messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());
 
                             int dataSize = 0;
-                            for (int i = 0; i < rowEntries.size(); i++) {
-                                dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+                            for (ByteString rowEntry : rowEntries) {
+                                dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntry);
                             }
                             messageSize += dataSize;
                             messageSize += 1 * rowEntries.size();
@@ -194,8 +194,8 @@ public class SessionHandler extends SimpleChannelHandler {
                             output.writeRawVarint32(messageSize);
                             // message
                             output.writeInt64(1, message.getId());
-                            for (int i = 0; i < rowEntries.size(); i++) {
-                                output.writeBytes(2, rowEntries.get(i));
+                            for (ByteString rowEntry : rowEntries) {
+                                output.writeBytes(2, rowEntry);
                             }
                             output.checkNoSpaceLeft();
                             NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(),

+ 4 - 4
server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java

@@ -63,8 +63,8 @@ public class ProtocolTest {
         messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());
 
         int dataSize = 0;
-        for (int i = 0; i < rowEntries.size(); i++) {
-            dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+        for (ByteString rowEntry : rowEntries) {
+            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntry);
         }
         messageSize += dataSize;
         messageSize += 1 * rowEntries.size();
@@ -82,8 +82,8 @@ public class ProtocolTest {
         output.writeRawVarint32(messageSize);
         // message
         output.writeInt64(1, message.getId());
-        for (int i = 0; i < rowEntries.size(); i++) {
-            output.writeBytes(2, rowEntries.get(i));
+        for (ByteString rowEntry : rowEntries) {
+            output.writeBytes(2, rowEntry);
         }
         output.checkNoSpaceLeft();
 

+ 1 - 1
store/src/main/java/com/alibaba/otter/canal/store/model/Events.java

@@ -20,7 +20,7 @@ public class Events<EVENT> implements Serializable {
     private static final long serialVersionUID = -7337454954300706044L;
 
     private PositionRange     positionRange    = new PositionRange();
-    private List<EVENT>       events           = new ArrayList<EVENT>();
+    private List<EVENT>       events           = new ArrayList<>();
 
     public List<EVENT> getEvents() {
         return events;

+ 2 - 5
store/src/test/java/com/alibaba/otter/canal/store/memory/buffer/MemoryEventStoreMemBatchTest.java

@@ -81,9 +81,7 @@ public class MemoryEventStoreMemBatchTest extends MemoryEventStoreBase {
 
         try {
             result = eventStore.put(buildEvent("1", 1L, 1L + bufferSize), 1000L, TimeUnit.MILLISECONDS);
-        } catch (CanalStoreException e) {
-            Assert.fail(e.getMessage());
-        } catch (InterruptedException e) {
+        } catch (CanalStoreException | InterruptedException e) {
             Assert.fail(e.getMessage());
         }
 
@@ -162,8 +160,7 @@ public class MemoryEventStoreMemBatchTest extends MemoryEventStoreBase {
             Assert.assertTrue(entrys.getEvents().size() == batchSize);
             Assert.assertEquals(position, entrys.getPositionRange().getStart());
             Assert.assertEquals(position, entrys.getPositionRange().getEnd());
-        } catch (CanalStoreException e) {
-        } catch (InterruptedException e) {
+        } catch (CanalStoreException | InterruptedException e) {
         }
 
         ExecutorService executor = Executors.newFixedThreadPool(1);

+ 1 - 2
store/src/test/java/com/alibaba/otter/canal/store/memory/buffer/MemoryEventStoreMultiThreadTest.java

@@ -109,8 +109,7 @@ public class MemoryEventStoreMultiThreadTest extends MemoryEventStoreBase {
                 }
                 try {
                     eventStore.put(event);
-                } catch (CanalStoreException e) {
-                } catch (InterruptedException e) {
+                } catch (CanalStoreException | InterruptedException e) {
                 }
             }
         }

+ 2 - 5
store/src/test/java/com/alibaba/otter/canal/store/memory/buffer/MemoryEventStorePutAndGetTest.java

@@ -64,9 +64,7 @@ public class MemoryEventStorePutAndGetTest extends MemoryEventStoreBase {
 
         try {
             result = eventStore.put(buildEvent("1", 1L, 1L + bufferSize), 1000L, TimeUnit.MILLISECONDS);
-        } catch (CanalStoreException e) {
-            Assert.fail(e.getMessage());
-        } catch (InterruptedException e) {
+        } catch (CanalStoreException | InterruptedException e) {
             Assert.fail(e.getMessage());
         }
 
@@ -140,8 +138,7 @@ public class MemoryEventStorePutAndGetTest extends MemoryEventStoreBase {
             Assert.assertTrue(entrys.getEvents().size() == batchSize);
             Assert.assertEquals(position, entrys.getPositionRange().getStart());
             Assert.assertEquals(position, entrys.getPositionRange().getEnd());
-        } catch (CanalStoreException e) {
-        } catch (InterruptedException e) {
+        } catch (CanalStoreException | InterruptedException e) {
         }
 
         ExecutorService executor = Executors.newFixedThreadPool(1);