1. 程式人生 > >我的物聯網項目(十一) 單數據庫事務也需謹慎

我的物聯網項目(十一) 單數據庫事務也需謹慎

般的 back use 刪除 bin message point broker manage

單體架構模式下的數據庫基本都是單數據庫,所以應用層通過spring事務控制的本質其實就是數據庫對事務的支持,沒有數據庫的事務支持,spring是無法提供事務功能的。通過spring實現事務的方式也有聲明式事務編程式事務兩種,不管哪一種實現起來都比較簡單。像一般的業務,類型下面這種方式編程就行:

1.配置文件

    <!-- 事務控制 -->
	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

    <!--  配置事務傳播特性 -->
	<tx:advice id="txAdvice" transaction-manager="transactionManager">
	    <tx:attributes>
	      <tx:method name="save*" propagation="REQUIRED"/>
	      <tx:method name="update*" propagation="REQUIRED"/>
	      <tx:method name="delete*" propagation="REQUIRED"/>
	      <tx:method name="*" read-only="true" />
	    </tx:attributes>
	</tx:advice>

	<!--  配置參與事務的類 -->
	<aop:config>
		<aop:pointcut id="interceptorPointCuts" expression="execution(* com.orange.adminweb.service.*.*(..))"/>
		<aop:advisor pointcut-ref="interceptorPointCuts" advice-ref="txAdvice" />
	</aop:config>

2.com.orange.adminweb.service包下的java代碼

public void updateBinding(String userId,String merchantId,Order order) throws Exception {
     //刪除用戶
     userService.delete(userId);
     //刪除商家
     merchantService.delete(merchantId);
     //更新訂單
     orderService.update(order);
		
}

像簡單事務類似上面編程只需註意兩個事情:

1.上面xml配置的save*,update*,delete*表示通配符以save,update,delete開頭的方法(com.orange.adminweb.service包下)都是啟用事務的。

2.以save,update,delete開頭的方法(com.orange.adminweb.service包下)必須繼續將異常往外拋。

所以,很多剛開始入行的同事基本按照這種方式寫代碼,也沒什麽問題,但是最近測試有人反映,有個業務(類似上面)數據不一致,簡單來說就是幾張表的數據原子性不一致,我找到方法類,打開看了,場景確實和上面的稍微不太一樣,以下為模擬代碼。

private void saveSubscribe(){
		StringBuilder clientBuilder = new StringBuilder();
		clientBuilder.append(GlobalConstant.GROUPID);
		clientBuilder.append("@@@");
		clientBuilder.append("ClientID_");
		clientBuilder.append(UuidUtil.get32UUID());
		String clientId=clientBuilder.toString();
		MemoryPersistence persistence = new MemoryPersistence();
		try {
			final MqttClient sampleClient = new MqttClient(GlobalConstant.BROKER, clientId, persistence);
            final MqttConnectOptions connOpts = new MqttConnectOptions();
            System.out.println("Connecting to broker: " + GlobalConstant.BROKER);
            String sign=MacSignature.macSignature(clientId.split("@@@")[0], GlobalConstant.SECRETKEY);
            final String[] topicFilters=new String[]{GlobalConstant.TOPIC + "/#"};
            final int[]qos={1};
            connOpts.setUserName(GlobalConstant.ACESSKEY);
            connOpts.setServerURIs(new String[] { GlobalConstant.BROKER });
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(false);
            connOpts.setKeepAliveInterval(100);
            sampleClient.setCallback(new MqttCallback() {
                public void connectionLost(Throwable throwable) {
                    log.info("mqtt connection lost");
                    throwable.printStackTrace();
                    while(!sampleClient.isConnected()){
                        try {
                            sampleClient.connect(connOpts);
                            sampleClient.subscribe(topicFilters,qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                public void messageArrived(String topic, MqttMessage mqttMessage){
                	try {
                		saveOrder(new String(mqttMessage.getPayload()));
				 } catch (Exception e) {
						e.printStackTrace();
				 }
                }
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    log.ifo("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                }
            });
            sampleClient.connect(connOpts);
            sampleClient.subscribe(topicFilters,qos);
		}catch(Exception ex){
			ex.printStackTrace();
		}
	}

操作三張表的saveOrder方法如下:

private void saveOrder(String message) throws Exception{
     //修改用戶
     userService.updateUser(......);

     //修改商家
     merchantService.updateMerchant(......);
     
     //下訂單
     orderService.saveOrder(.......);
}

因為業務本身原因,當saveOrder方法裏面的修改用戶,修改商家,下訂單任何一個方法出現異常時候,saveSubscribe方法並沒有回滾數據。

重點看saveSubscribe方法裏面的代碼片段:

sampleClient.setCallback(new MqttCallback() {
                public void connectionLost(Throwable throwable) {
                    log.info("mqtt connection lost");
                    throwable.printStackTrace();
                    while(!sampleClient.isConnected()){
                        try {
                            sampleClient.connect(connOpts);
                            sampleClient.subscribe(topicFilters,qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                public void messageArrived(String topic, MqttMessage mqttMessage){
                	try {
                		saveOrder(new String(mqttMessage.getPayload()));
				 } catch (Exception e) {
						e.printStackTrace();
				 }
                }
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    log.ifo("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                }
            });

這個裏面有setCallback回調,它裏面的異常是沒法接著往外拋的,所以不會回滾數據,簡單來說saveSubscribe方法就是一個沒有事務控制的方法。

其實這種業務場景有點類似我們之前的業務需求:

有個AService和BService都配置了事務,AService調用了BService,BService需要記錄日誌,但是當BService出現異常的時候,發現沒有記錄日誌,原因是AService和BService配置事務的時候有個參數propagation,默認都配置了REQUIRED

 <tx:method name="save*" propagation="REQUIRED"/>

使用這種策略時BService將使用Aservice的事務,所以AService回滾將整個方法體內的任何東西都回滾了。所以解決這種業務場景就需要BService配置獨立的事務,不管業務邏輯的Aservice是否有異常,BService日誌都應該能夠記錄成功。

所以解決上面setCallback回調不拋異常出去的問題,配置修改成saveOrder配置獨立事務可以解決問題。

<tx:method name="save*" propagation="REQUIRED"/>
<tx:method name="update*" propagation="REQUIRED"/>
<tx:method name="delete*" propagation="REQUIRED"/>
<tx:method name="saveOrder" propagation="REQUIRES_NEW" rollback-for="java.lang.Exception"/>

通過這次問題的解決回顧,說到底還是對Spring事務類型並沒有引起重視,具體的業務場景應該使用不同的事務類型,而並不是一味的使用REQUIRED,最後貼下Spring的七種事務傳播行為類型:

PROPAGATION_REQUIRED--支持當前事務,如果當前沒有事務,就新建一個事務。這是最常見的選擇。

PROPAGATION_SUPPORTS--支持當前事務,如果當前沒有事務,就以非事務方式執行。

PROPAGATION_MANDATORY--支持當前事務,如果當前沒有事務,就拋出異常。

PROPAGATION_REQUIRES_NEW--新建事務,如果當前存在事務,把當前事務掛起。

PROPAGATION_NOT_SUPPORTED--以非事務方式執行操作,如果當前存在事務,就把當前事務掛起。

PROPAGATION_NEVER--以非事務方式執行,如果當前存在事務,則拋出異常。

PROPAGATION_NESTED--如果當前存在事務,則在嵌套事務內執行。如果當前沒有事務,則進行與PROPAGATION_REQUIRED類似的操作。

我的物聯網項目(十一) 單數據庫事務也需謹慎