緩存一致性 每次逢年過節的時候搶票非常艱難,放票的時候那么多人同時去搶票,如果所有人查詢、購票等都去訪問數據庫,那數據庫的壓力得有多大,這時候很多都會引入緩存, 把車票信息放入緩存,這樣可以減少數據庫壓力。當乘客購買成功之后,數據庫發生了變化,需要及時更新緩存中的數據,以便于其他乘客能從緩存中及時獲取最新車票信息。這就是緩存一致性。
解決數據庫與緩存一致性主要思路:
1、同步雙寫:也就是修改db的時候同時修改一下緩存,這種模式下會出現無法保證數據庫與緩存的原子性。
如果出現多線程同時修改db的情況,網絡延遲導致數據庫修改順序與請求順序錯位
例如:A 先操作數據庫修改 x=1
B也修改數據庫 x=2
但是網絡延遲導致
B先修改緩存 x=2
A再修改緩存 x=1
這樣就導致了數據庫中x=2,而緩存中則是 x=1,導致數據庫與緩存不一致。
2、設置有效期:給緩存設置有效期,到期后自動刪除。再次查詢時更新
優勢:簡單、方便
缺點:時效性差,緩存過期之前可能不一致
場景:更新頻率較低,時效性要求低的業務
那我們有沒有什么更加好的解決方案呢?
阿里云的canal就為我們很好的解決了這一問題:
canal: 是Alibaba旗下的一款開源項目,純Java開發.它是基于數據庫增量日志解析,提供 增量數據訂閱&消費 ,目前主要支持mysql。
canal工作原理
mysql的主從復制原理:
MySQL master 將數據變更寫入二進制日志( binary log , 其中記錄叫做二進制日志事件 binary log events ,可以通過 show binlog events 進行查看)
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志( relay log )
MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
canal工作原理
canal模擬mysql salve的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議;
mysql master收到dump請求,開始推送binary log給slave(也就是canal);
canal解析binary log對象(原始byte流).
canal的安裝配置(以windows為例)
一、登進Mysql后,使用show variables like'log_bin';查詢是否開啟binlog,如果開啟(ON),進行下一步,如果沒開啟(OFF),在數據庫的my.ini配置文件添加配置
[mysqld]
# 開啟 binlog
log-bin=mysql-bin
# 選擇 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
server_id=1
二、binlog開啟后,創建一個canal用戶并授權,官網配置是@%,表示所有服務器,所以改為localhost就可以,在mysql中,運行如下代碼,設置完成之后重啟:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' identified by 'canal';
FLUSH PRIVILEGES;
三、安裝canal
- 下載地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.6-alpha-1
在conf文件夾里找到confcanal.properties
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
- 說明:這個文件是 canal 的基本通用配置,canal 端口號默認就是 11111,修改 canal 的輸出 model,默認 tcp,改為輸出到 kafka
- 重點關注上面的:canal.serverMode = tcp 這個配置,默認情況,如果是使用mysql,可以不做修改,如果需要將數據同步到kafka,或者rocketmq,可以分別修改即可,此處暫不做修改
- 解壓到適當位置,解壓后在conf文件夾里找到exampleinstance.properties,
canal.instance.mysql.slaveId=20 #只要和mysql的master的不一樣即可
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
- canal.instance.mysql.slaveId=20 #只要和mysql的master的不一樣即可
- canal.instance.master.address=127.0.0.1:3306 ,監聽的mysql的master節點信息
- 配置連接 MySQL 的用戶名和密碼,默認就是我們前面授權的 canal
- 修改數據庫配置信息,canal.instance.dbUsername、canal.instance.dbPassword為數據庫賬戶密碼,均為canal,剛剛創建賬號密碼,
- 到bin目錄下啟動 startup.bat,出現如下界面表示啟動成功
四、spring boot中整合canal maven依賴
< dependency >
< groupId >com.alibaba.otter< /groupId >
< artifactId >canal.client< /artifactId >
< version >1.1.4< /version >
< /dependency >
java 示例:
public class CanalService {
public static void main(String[] args) throws Exception{
//1.獲取 canal 連接對象,我在本機上部署的,所以是127.0.0.1
CanalConnector canalConnector =
CanalConnectors.newSingleConnector(new
InetSocketAddress("127.0.0.1", 11111), "example", "", "");
System.out.println("canal啟動并開始監聽數據 ...... ");
while (true){
canalConnector.connect();
//訂閱表 test數據庫下的所有表
canalConnector.subscribe("test.*");
//獲取數據
Message message = canalConnector.get(100);
//解析message
List< CanalEntry.Entry > entries = message.getEntries();
if(entries.size() <=0){
System.out.println("未檢測到數據");
Thread.sleep(1000);
}
for(CanalEntry.Entry entry : entries){
//1、獲取表名
String tableName = entry.getHeader().getTableName();
//2、獲取類型
CanalEntry.EntryType entryType = entry.getEntryType();
//3、獲取序列化后的數據
ByteString storeValue = entry.getStoreValue();
//判斷是否rowdata類型數據
if(CanalEntry.EntryType.ROWDATA.equals(entryType)){
//對第三步中的數據進行解析
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//獲取當前事件的操作類型
CanalEntry.EventType eventType = rowChange.getEventType();
//獲取數據集
List< CanalEntry.RowData > rowDatasList = rowChange.getRowDatasList();
//便利數據
for(CanalEntry.RowData rowData : rowDatasList){
//數據變更之前的內容
JSONObject beforeData = new JSONObject();
List< CanalEntry.Column > beforeColumnsList = rowData.getAfterColumnsList();
for(CanalEntry.Column column : beforeColumnsList){
beforeData.put(column.getName(),column.getValue());
}
//數據變更之后的內容
List< CanalEntry.Column > afterColumnsList = rowData.getAfterColumnsList();
JSONObject afterData = new JSONObject();
for(CanalEntry.Column column : afterColumnsList){
afterData.put(column.getName(),column.getValue());
}
System.out.println("Table :" + tableName +
",eventType :" + eventType +
",beforeData :" + beforeData +
",afterData : " + afterData);
//操作緩存
}
}else {
System.out.println("當前操作類型為:" + entryType);
}
}
}
}
}
我手動在book表中操作數據,可以看到程序監控輸出結果
五、 最后我們拿到數據之后可以放入消息隊列,這樣可以加入重試機制,還可以防止冪等問題,最后再寫入緩存。
- 消息隊列保證可靠性:寫到隊列中的消息,成功消費之前不會丟失(重啟項目也不擔心)。
- 消息隊列保證消息成功投遞:下游從隊列拉取消息,成功消費后才會刪除消息,否則還會繼續投遞消息給消費者(符合我們重試的場景)。
-
緩存
+關注
關注
1文章
241瀏覽量
26757 -
數據庫
+關注
關注
7文章
3846瀏覽量
64685 -
開源
+關注
關注
3文章
3402瀏覽量
42711 -
dump
+關注
關注
0文章
13瀏覽量
9525
發布評論請先 登錄
相關推薦
Cache一致性協議優化研究
![Cache<b class='flag-5'>一致性</b>協議優化研究](https://file.elecfans.com/web2/M00/49/86/poYBAGKhwMOAGKC_AAAc6iUvydw085.jpg)
自主駕駛系統將使用緩存一致性互連IP和非一致性互連IP
緩存與數據庫雙寫一致性幾種策略分析
如何保證緩存一致性
深入理解數據備份的關鍵原則:應用一致性與崩潰一致性的區別
![深入理解<b class='flag-5'>數據</b>備份的關鍵原則:應用<b class='flag-5'>一致性</b>與崩潰<b class='flag-5'>一致性</b>的區別](https://file1.elecfans.com/web2/M00/C4/A2/wKgaomXueUOAUC9kAAUkG4ifnAc542.png)
評論