Skip to content

Commit

Permalink
dev:tablestore adapter组件增加对目标列常量值的配置支持
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-yun committed Jan 9, 2023
1 parent a10698e commit f7b78e0
Show file tree
Hide file tree
Showing 10 changed files with 502 additions and 53 deletions.
15 changes: 15 additions & 0 deletions client-adapter/tablestore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@
</exclusions>
</dependency>

<!--mysql依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static Map<String, MappingConfig> load(Properties envProperties) {
}
try {
config.validate();
config.getDbMapping().init(config);
} catch (Exception e) {
throw new RuntimeException("ERROR Config: " + fileName + " " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.alibaba.otter.canal.client.adapter.tablestore.enums.TablestoreFieldType;
import com.alibaba.otter.canal.client.adapter.tablestore.support.SyncUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSetMetaData;
import java.util.*;
Expand Down Expand Up @@ -154,6 +156,7 @@ public int hashCode() {


public static class DbMapping implements AdapterMapping {
protected Logger logger = LoggerFactory.getLogger(this.getClass());

private String database; // 数据库名或schema名
private String table; // 表名
Expand All @@ -168,6 +171,13 @@ public static class DbMapping implements AdapterMapping {
private int readBatch = 5000;
private int commitBatch = 5000; // etl等批量提交大小

private Map<String, String> constantTargetColumns; //目标字段常量值映射

private Map<String, String> constantTargetColumnsParsed; //目标字段常量值映射解析

private Map<String, ConstantColumnItem> constantColumnItems = new LinkedHashMap<>(); // 转换后的静态常量字段映射列表


private Map<String, ColumnItem> columnItems = new LinkedHashMap<>(); // 转换后的字段映射列表

public String getDatabase() {
Expand Down Expand Up @@ -249,60 +259,98 @@ public void setColumnItems(Map<String, ColumnItem> columnItems) {
this.columnItems = columnItems;
}

public Map<String, String> getConstantTargetColumns() {
return constantTargetColumns;
}

public void setConstantTargetColumns(Map<String, String> constantTargetColumns) {
this.constantTargetColumns = constantTargetColumns;
}

public Map<String, String> getConstantTargetColumnsParsed() {
return constantTargetColumnsParsed;
}

public void setConstantTargetColumnsParsed(Map<String, String> constantTargetColumnsParsed) {
this.constantTargetColumnsParsed = constantTargetColumnsParsed;
}



public static class ConstantColumnItem {
private String targetColumn;
private String column;
private TablestoreFieldType type;

public String getColumn() {
return column;
}

public void setColumn(String column) {
this.column = column;
}

public TablestoreFieldType getType() {
return type;
}

public void setType(TablestoreFieldType type) {
this.type = type;
}

public String getTargetColumn() {
return targetColumn;
}

public void setTargetColumn(String targetColumn) {
this.targetColumn = targetColumn;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ColumnItem that = (ColumnItem) o;
return Objects.equals(column, that.column);
}

@Override
public int hashCode() {
return Objects.hash(column);
}
}



public void init(MappingConfig config) {
logger.info("=========dbMapping begin init.=========");
String splitBy = "$";
if (targetColumns != null) {
boolean needTypeInference = false;
for (Map.Entry<String, String> columnField : targetColumns.entrySet()) {
String field = columnField.getValue();
String type = null;
if (field != null) {
// 解析类型
int i = field.indexOf(splitBy);
if (i > -1) {
type = field.substring(i + 1);
field = field.substring(0, i);
}
}
ColumnItem columnItem = new ColumnItem();
columnItem.setColumn(columnField.getKey());
columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);

TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
if (fieldType == null) {
needTypeInference = true;
}
columnItem.setType(fieldType);
columnItems.put(columnField.getKey(), columnItem);
}
if (needTypeInference) {
// 认为有field没有配置映射类型,需要进行类型推断
DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
try {
ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = rsd.getColumnName(i);
if (columnItems.containsKey(columnName) && columnItems.get(columnName).getType() == null) {
int columnType = rsd.getColumnType(i);
columnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

} else {
//普通列
if (targetColumns != null) {
parseTargetColumns(config, splitBy);
}
if (targetColumns == null) {
this.targetColumns = new LinkedHashMap<>();
}
targetColumnsParsed = new HashMap<>();

//常量列
if (constantTargetColumns != null) {
parseTargetConstant(config, splitBy);
}
if (constantTargetColumns == null) {
this.constantColumnItems = new LinkedHashMap<>();
}

initParsed(splitBy);

logger.info("=========dbMapping success init.=========");

}


private void initParsed(String splitBy) {
targetColumnsParsed = new HashMap<>();
targetColumns.forEach((key, value) -> {
if (StringUtils.isEmpty(value)) {
targetColumnsParsed.put(key, key);
Expand All @@ -312,6 +360,114 @@ public void init(MappingConfig config) {
targetColumnsParsed.put(key, value);
}
});

constantTargetColumnsParsed = new HashMap<>();
constantTargetColumns.forEach((key, value) -> {
if (StringUtils.isEmpty(value)) {
constantTargetColumnsParsed.put(key, key);
} else if (value.contains(splitBy) && constantColumnItems.containsKey(key)) {
constantTargetColumnsParsed.put(key, constantColumnItems.get(key).targetColumn);
} else {
constantTargetColumnsParsed.put(key, value);
}
});
}


private void parseTargetColumns(MappingConfig config, String splitBy) {
boolean needTypeInference = false;
for (Map.Entry<String, String> columnField : targetColumns.entrySet()) {
String field = columnField.getValue();
String type = null;
if (field != null) {
// 解析类型
int i = field.indexOf(splitBy);
if (i > -1) {
type = field.substring(i + 1);
field = field.substring(0, i);
}
}
ColumnItem columnItem = new ColumnItem();
columnItem.setColumn(columnField.getKey());
columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);

TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
if (fieldType == null) {
needTypeInference = true;
}
columnItem.setType(fieldType);
columnItems.put(columnField.getKey(), columnItem);
}
if (needTypeInference) {
// 认为有field没有配置映射类型,需要进行类型推断
DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
try {
ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = rsd.getColumnName(i);
if (columnItems.containsKey(columnName) && columnItems.get(columnName).getType() == null) {
int columnType = rsd.getColumnType(i);
columnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

private void parseTargetConstant(MappingConfig config, String splitBy) {
boolean needTypeInference = false;
for (Map.Entry<String, String> columnField : constantTargetColumns.entrySet()) {
String field = columnField.getValue();
String type = null;
if (field != null) {
// 解析类型
int i = field.indexOf(splitBy);
if (i > -1) {
type = field.substring(i + 1);
field = field.substring(0, i);
}
}
ConstantColumnItem columnItem = new ConstantColumnItem();
columnItem.setColumn(columnField.getKey());
columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);

TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
if (fieldType == null) {
needTypeInference = true;
}
columnItem.setType(fieldType);
constantColumnItems.put(columnField.getKey(), columnItem);
}
if (needTypeInference) {
// 认为有field没有配置映射类型,需要进行类型推断
DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
try {
ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = rsd.getColumnName(i);
if (constantColumnItems.containsKey(columnName) && constantColumnItems.get(columnName).getType() == null) {
int columnType = rsd.getColumnType(i);
constantColumnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

}
Expand Down
Loading

0 comments on commit f7b78e0

Please sign in to comment.