Skip to content

Commit

Permalink
[FLINK-35072][doris] Support applying AlterColumnTypeEvent to Doris p…
Browse files Browse the repository at this point in the history
…ipeline sink

This closes  #3473
  • Loading branch information
yuxiqian authored Jul 30, 2024
1 parent 85bfd6a commit a39959f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<name>flink-cdc-pipeline-connector-doris</name>

<properties>
<doris.connector.version>1.6.2</doris.connector.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -44,7 +48,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
<version>1.6.0</version>
<version>${doris.connector.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
Expand Down Expand Up @@ -79,6 +80,8 @@ public void applySchemaChange(SchemaChangeEvent event) {
applyDropColumnEvent((DropColumnEvent) event);
} else if (event instanceof RenameColumnEvent) {
applyRenameColumnEvent((RenameColumnEvent) event);
} else if (event instanceof AlterColumnTypeEvent) {
applyAlterColumnTypeEvent((AlterColumnTypeEvent) event);
} else if (event instanceof AlterColumnTypeEvent) {
throw new RuntimeException("Unsupported schema change event, " + event);
}
Expand Down Expand Up @@ -146,26 +149,28 @@ private List<String> buildDistributeKeys(Schema schema) {
return new ArrayList<>();
}

private String buildTypeString(DataType dataType) {
if (dataType instanceof LocalZonedTimestampType
|| dataType instanceof TimestampType
|| dataType instanceof ZonedTimestampType) {
int precision = DataTypeChecks.getPrecision(dataType);
return String.format("%s(%s)", "DATETIMEV2", Math.min(Math.max(precision, 0), 6));
} else {
return DorisTypeMapper.toDorisType(DataTypeUtils.toFlinkDataType(dataType));
}
}

private void applyAddColumnEvent(AddColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();
for (AddColumnEvent.ColumnWithPosition col : addedColumns) {
Column column = col.getAddColumn();
String typeString;
if (column.getType() instanceof LocalZonedTimestampType
|| column.getType() instanceof TimestampType
|| column.getType() instanceof ZonedTimestampType) {
int precision = DataTypeChecks.getPrecision(column.getType());
typeString =
String.format("%s(%s)", "DATETIMEV2", Math.min(Math.max(precision, 0), 6));
} else {
typeString =
DorisTypeMapper.toDorisType(
DataTypeUtils.toFlinkDataType(column.getType()));
}
FieldSchema addFieldSchema =
new FieldSchema(column.getName(), typeString, column.getComment());
new FieldSchema(
column.getName(),
buildTypeString(column.getType()),
column.getComment());
schemaChangeManager.addColumn(
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
}
Expand All @@ -192,4 +197,21 @@ private void applyRenameColumnEvent(RenameColumnEvent event)
entry.getValue());
}
}

private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
Map<String, DataType> typeMapping = event.getTypeMapping();

for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
schemaChangeManager.modifyColumnDataType(
tableId.getSchemaName(),
tableId.getTableName(),
new FieldSchema(
entry.getKey(),
buildTypeString(entry.getValue()),
null)); // Currently, AlterColumnTypeEvent carries no comment info. This
// will be fixed after FLINK-35243 got merged.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -367,7 +366,6 @@ public void testDorisRenameColumn() throws Exception {
}

@Test
@Ignore("AlterColumnType is yet to be supported until we close FLINK-35072.")
public void testDorisAlterColumnType() throws Exception {
TableId tableId =
TableId.tableId(
Expand Down

0 comments on commit a39959f

Please sign in to comment.