[Spring cloud 一步步實現廣告系統] 16. 增量索引實現以及投送資料到MQ(kafka)
實現增量資料索引
上一節中,我們為實現增量索引的載入做了充足的準備,使用到mysql-binlog-connector-java
開源元件來實現MySQL 的binlog監聽,關於binlog的相關知識,大家可以自行網路查閱。或者可以mailto:[email protected]
本節我們將根據binlog 的資料物件,來實現增量資料的處理,我們構建廣告的增量資料,其實說白了就是為了在後期能把廣告投放到索引服務,實現增量資料到增量索引的生成。Let's code.
- 定義一個投遞增量資料的介面(接收引數為我們上一節定義的binlog日誌的轉換物件)
/** * ISender for 投遞增量資料 方法定義介面 * * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a> */ public interface ISender { void sender(MysqlRowData rowData); }
- 建立增量索引監聽器
/** * IncrementListener for 增量資料實現監聽 * * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a> * @since 2019/6/27 */ @Slf4j @Component public class IncrementListener implements Ilistener { private final AggregationListener aggregationListener; @Autowired public IncrementListener(AggregationListener aggregationListener) { this.aggregationListener = aggregationListener; } //根據名稱選擇要注入的投遞方式 @Resource(name = "indexSender") private ISender sender; /** * 標註為 {@link PostConstruct}, * 即表示在服務啟動,Bean完成初始化之後,立刻初始化 */ @Override @PostConstruct public void register() { log.info("IncrementListener register db and table info."); Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this)); } @Override public void onEvent(BinlogRowData eventData) { TableTemplate table = eventData.getTableTemplate(); EventType eventType = eventData.getEventType(); //包裝成最後需要投遞的資料 MysqlRowData rowData = new MysqlRowData(); rowData.setTableName(table.getTableName()); rowData.setLevel(eventData.getTableTemplate().getLevel()); //將EventType轉為OperationTypeEnum OperationTypeEnum operationType = OperationTypeEnum.convert(eventType); rowData.setOperationTypeEnum(operationType); //獲取模版中該操作對應的欄位列表 List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType); if (null == fieldList) { log.warn("{} not support for {}.", operationType, table.getTableName()); return; } for (Map<String, String> afterMap : eventData.getAfter()) { Map<String, String> _afterMap = new HashMap<>(); for (Map.Entry<String, String> entry : afterMap.entrySet()) { String colName = entry.getKey(); String colValue = entry.getValue(); _afterMap.put(colName, colValue); } rowData.getFieldValueMap().add(_afterMap); } sender.sender(rowData); } }
開啟binlog監聽
- 首先來配置監聽binlog的資料庫連線資訊
adconf:
mysql:
host: 127.0.0.1
port: 3306
username: root
password: 12345678
binlogName: ""
position: -1 # 從當前位置開始監聽
編寫配置類:
/** * BinlogConfig for 定義監聽Binlog的配置資訊 * * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a> */ @Component @ConfigurationProperties(prefix = "adconf.mysql") @Data @AllArgsConstructor @NoArgsConstructor public class BinlogConfig { private String host; private Integer port; private String username; private String password; private String binlogName; private Long position; }
在我們實現 監聽binlog那節,我們實現了一個自定義client CustomBinlogClient
,需要實現binlog的監聽,這個監聽的客戶端就必須是一個獨立執行的執行緒,並且要在程式啟動的時候進行監聽,我們來實現運行當前client的方式,這裡我們會使用到一個新的Runnerorg.springframework.boot.CommandLineRunner
,let's code.
@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {
@Autowired
private CustomBinlogClient binlogClient;
@Override
public void run(String... args) throws Exception {
log.info("BinlogRunner is running...");
binlogClient.connect();
}
}
增量資料投遞
在binlog監聽的過程中,我們看到針對於int, String 這類資料欄位,mysql的記錄是沒有問題的,但是針對於時間型別,它被格式化成了字串型別:Fri Jun 21 15:07:53 CST 2019
。
--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
{before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
對於這個時間格式,我們需要關注2點資訊:
- CST,這個時間格式會比我們的時間+ 8h(中國標準時間 China Standard Time UT+8:00)
- 需要對這個日期進行解釋處理
當然,我們也可以通過設定mysql的日期格式來改變該行為,在此,我們通過編碼來解析該時間格式:
/**
* Thu Jun 27 08:00:00 CST 2019
*/
public static Date parseBinlogString2Date(String dateString) {
try {
DateFormat dateFormat = new SimpleDateFormat(
"EEE MMM dd HH:mm:ss zzz yyyy",
Locale.US
);
return DateUtils.addHours(dateFormat.parse(dateString), -8);
} catch (ParseException ex) {
log.error("parseString2Date error:{}", dateString);
return null;
}
}
因為我們在定義索引的時候,是根據表之間的層級關係(Level)來設定的,根據程式碼規範,不允許出現Magic Number, 因此我們定義一個數據層級列舉,來表達資料層級。
/**
* AdDataLevel for 廣告資料層級
*
* @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
*/
@Getter
public enum AdDataLevel {
LEVEL2("2", "level 2"),
LEVEL3("3", "level 3"),
LEVEL4("4", "level 4");
private String level;
private String desc;
AdDataLevel(String level, String desc) {
this.level = level;
this.desc = desc;
}
}
實現資料投遞
因為增量資料可以投遞到不同的位置以及用途,我們之前實現了一個投遞介面com.sxzhongf.ad.sender.ISender
,接下來我們實現一個投遞類:
@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {
/**
* 根據廣告級別,投遞Binlog資料
*/
@Override
public void sender(MysqlRowData rowData) {
if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
Level2RowData(rowData);
} else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
Level3RowData(rowData);
} else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
Level4RowData(rowData);
} else {
log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
}
}
private void Level2RowData(MysqlRowData rowData) {
if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
List<AdPlanTable> planTables = new ArrayList<>();
for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {
AdPlanTable planTable = new AdPlanTable();
//Map的第二種迴圈方式
fieldValueMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
planTable.setPlanId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
planTable.setUserId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
planTable.setPlanStatus(Integer.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
break;
}
});
planTables.add(planTable);
}
//投遞推廣計劃
planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
} else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
List<AdCreativeTable> creativeTables = new LinkedList<>();
rowData.getFieldValueMap().forEach(afterMap -> {
AdCreativeTable creativeTable = new AdCreativeTable();
afterMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
creativeTable.setAdId(Long.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
creativeTable.setType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
creativeTable.setMaterialType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
creativeTable.setHeight(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
creativeTable.setWidth(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
creativeTable.setAuditStatus(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
creativeTable.setAdUrl(v);
break;
}
});
creativeTables.add(creativeTable);
});
//投遞廣告創意
creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
}
}
private void Level3RowData(MysqlRowData rowData) {
...
}
/**
* 處理4級廣告
*/
private void Level4RowData(MysqlRowData rowData) {
...
}
}
投放增量資料到MQ(kafka)
為了我們的資料投放更加靈活,方便資料統計,分析等系統的需求,我們來實現一個投放到訊息中的介面,其他服務可以訂閱當前MQ 的TOPIC來實現資料訂閱。
配置檔案中配置TOPIC
adconf:
kafka:
topic: ad-search-mysql-data
--------------------------------------
/**
* KafkaSender for 投遞Binlog增量資料到kafka訊息佇列
*
* @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
* @since 2019/7/1
*/
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {
@Value("${adconf.kafka.topic}")
private String topic;
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 傳送資料到kafka佇列
*/
@Override
public void sender(MysqlRowData rowData) {
kafkaTemplate.send(
topic, JSON.toJSONString(rowData)
);
}
/**
* 測試消費kafka訊息
*/
@KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
public void processMysqlRowData(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMsg = Optional.ofNullable(record.value());
if (kafkaMsg.isPresent()) {
Object message = kafkaMsg.get();
MysqlRowData rowData = JSON.parseObject(
message.toString(),
MysqlRowData.class
);
System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
//sender.sender();
}
}
}