1. 程式人生 > >java利用WatchService實時監控某個目錄下的文件變化並按行解析(註:附源代碼)

java利用WatchService實時監控某個目錄下的文件變化並按行解析(註:附源代碼)

tomcat啟動 interrupt extend red -name 利用 end eba tor

首先說下需求:通過ftp上傳約定格式的文件到服務器指定目錄下,應用程序能實時監控該目錄下文件變化,如果上傳的文件格式符合要求,將將按照每一行讀取解析再寫入到數據庫,解析完之後再將文件改名。

一. 一開始的思路

  設置一個定時任務,每隔一分鐘讀取下指定目錄下的文件變化,如果有滿足格式的文件,就進行解析。

這種方式很繁瑣,而且效率低,效率都消耗在了遍歷、保存狀態、對比狀態上了! 而且無法利用OS的很多功能。

二. WatchService介紹

1、 該類的對象就是操作系統原生的文件系統監控器!我們都知道OS自己的文件系統監控器可以監控系統上所有文件的變化,這種監控是無需遍歷、無需比較的,是一種基於信號收發的監控,因此效率一定是最高的;現在Java對其進行了包裝,可以直接在Java程序    中使用OS的文件系統監控器了;

2、 獲取當前OS平臺下的文件系統監控器:

i. WatchService watcher = FileSystems.getDefault().newWatchService();

ii. 從FileSystems這個類名就可以看出這肯定是屬於OS平臺文件系統的,接下來可以看出這一連串方法直接可以得到一個文件監控器;

  這裏暫時不用深入理解這串方法的具體含義,先知道怎麽用就行了;

3、 我們都知道,操作系統上可以同時開啟多個監控器,因此在Java程序中也不例外,上面的代碼只是獲得了一個監控器,你還可以用同樣的代碼同時獲得多個監控器;

4、 監控器其實就是一個後臺線程,在後臺監控文件變化所發出的信號,這裏通過上述代碼獲得的監控器還只是一個剛剛初始化的線程,連就緒狀態都沒有進入,只是初始化而已;

三、實現過程

  其實就是在初始化的時候創建一個線程,然後用watchService實時監控該目錄下文件變化,如果有滿足條件文件加進來,就按照約定的格式解析文件再寫入數據庫,詳細步驟如下!

  1、web.xml監聽器配置文件監控監聽器  

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation
="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:root-context.xml</param-value> </context-param> <filter> <filter-name>CharacterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>CharacterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <filter> <filter-name>sitemesh</filter-name> <filter-class>com.opensymphony.sitemesh.webapp.SiteMeshFilter</filter-class> </filter> <filter-mapping> <filter-name>sitemesh</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <servlet> <servlet-name>appServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:servlet-context.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>appServlet</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> <!-- 配置spring監聽器 --> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <!-- 配置監控文件變化監聽器 --> <listener> <listener-class>com.zealer.ad.listener.ThreadStartUpListenser</listener-class> </listener> <listener> <listener-class>com.zealer.ad.listener.SessionLifecycleListener</listener-class> </listener> <jsp-config> <taglib> <taglib-uri>/tag</taglib-uri> <taglib-location>/WEB-INF/tag/tag.tld</taglib-location> </taglib> </jsp-config> <welcome-file-list> <welcome-file>index.jsp</welcome-file> </welcome-file-list> <session-config> <session-timeout>45</session-timeout> </session-config> </web-app>

  2、編寫一個ThreadStartUpListenser類,實現ServletContextListener,tomcat啟動時創建後臺線程

  ThreadStartUpListenser.java

package com.zealer.ad.listener;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;

import com.zealer.ad.task.WatchFilePathTask;

@Component
public class ThreadStartUpListenser implements ServletContextListener
{
    private static WatchFilePathTask r = new WatchFilePathTask();

    private Log log = LogFactory.getLog(ThreadStartUpListenser.class);
    
    /*
     * tomcat啟動的時候創建一個線程
     * */
    @Override
    public void contextInitialized(ServletContextEvent paramServletContextEvent)
    {
        r.start();
        log.info("ImportUserFromFileTask is started!");
    }
    
    /*
     * tomcat關閉的時候銷毀這個線程
     * */
    @Override
    public void contextDestroyed(ServletContextEvent paramServletContextEvent)
    {
        r.interrupt();
    }

}

  3、創建指定目錄文件變化監控類

    WatchFilePathTask.java

package com.zealer.ad.task;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;

import com.zealer.ad.util.ConfigUtils;
import com.zealer.ad.util.SpringUtils;
/**
 * 指定目錄文件變化監控類
 * @author cancer
 *
 */
public class WatchFilePathTask extends Thread
{
    private Log log = LogFactory.getLog(WatchFilePathTask.class);
    
    private static final String filePath = ConfigUtils.getInstance().getValue("userfile_path");
    
    private WatchService watchService;
    
    @Override
    public void run()
    {
        try
        {
            //獲取監控服務
            watchService = FileSystems.getDefault().newWatchService();
            log.debug("獲取監控服務"+watchService);
            Path path = FileSystems.getDefault().getPath(filePath);
            log.debug("@@@:Path:"+path);
            
            final String todayFormat = DateTime.now().toString("yyyyMMdd");
            
            File existFiles = new File(filePath);
            //啟動時檢查是否有未解析的符合要求的文件
            if(existFiles.isDirectory())
            {
                File[] matchFile = existFiles.listFiles(new FileFilter()
                {
                    
                    @Override
                    public boolean accept(File pathname)
                    {
                        if((todayFormat+".txt").equals(pathname.getName()))
                        {
                            return true;
                        }
                        else
                        {
                            return false;
                        }
                    }
                });
                
                if(null != matchFile)
                {
                    for (File file : matchFile)
                    {
                        //找到符合要求的文件,開始解析
                        ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");
                        task.setFileName(file.getAbsolutePath());
                        task.start();
                    }
                }
            }
       
//註冊監控服務,監控新增事件 WatchKey key = path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);        while (true) { key = watchService.take(); for (WatchEvent<?> event : key.pollEvents()) { //獲取目錄下新增的文件名 String fileName = event.context().toString(); //檢查文件名是否符合要求 if((todayFormat+".txt").equals(fileName)) { String filePath = path.toFile().getAbsolutePath()+File.separator+fileName; log.info("import filePath:"+filePath); //啟動線程導入用戶數據 ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");//new ImportUserFromFileTask(filePath); task.setFileName(filePath); task.start(); log.debug("啟動線程導入用戶數據"+task); } } key.reset(); } } catch (IOException e) { log.error(e.getMessage(),e); } catch (InterruptedException e) { log.error(e.getMessage(),e); } } }

  4、創建解析用戶文件及導入數據庫線程,由WatchFilePathTask啟動

package com.zealer.ad.task;

import com.zealer.ad.entity.AutoPutUser;
import com.zealer.ad.entity.Bmsuser;
import com.zealer.ad.service.AutoPutUserService;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.joda.time.DateTime;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;

import java.util.Date;

import javax.annotation.Resource;


/**
 * 解析用戶文件及入庫線程,由WatchFilePathTask啟動
 * @author cancer
 *
 */
public class ImportUserFromFileTask extends Thread {
    private Log log = LogFactory.getLog(ImportUserFromFileTask.class);
    private String fileName;
    @Resource(name = "autoPutUserService")
    private AutoPutUserService autoPutUserService;

    @Override
    public void run() {
        File file = new File(fileName);

        if (file.exists() && file.isFile()) {
            log.debug(":@@@準備開始休眠10秒鐘:" + file);

            //休眠十分鐘,防止文件過大還沒完全拷貝到指定目錄下,這裏的線程就開始讀取文件
            try {
                sleep(10000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }

            InputStreamReader read;

            try {
                read = new InputStreamReader(new FileInputStream(file), "UTF-8");

                BufferedReader bufferedReader = new BufferedReader(read);
                String lineTxt = null;
                int count = 0;
                Boolean f = false;

                while ((lineTxt = bufferedReader.readLine()) != null) {
                    if ((null == lineTxt) || "".equals(lineTxt)) {
                        continue;
                    }

                    if (lineTxt.startsWith("‘")) {
                        lineTxt = lineTxt.substring(1, lineTxt.length());
                    }

                    //解析分隔符為‘, ‘
                    String[] lines = lineTxt.split("‘, ‘");
                    int length = lines.length;

                    if (length < 2) {
                        continue;
                    }

                    Bmsuser bmsuser = new Bmsuser();
                    bmsuser.setName(lines[0]);if (!"".equals(lines[1])) {
                        bmsuser.setCity(lines[1]);
                    }
//根據唯一索引已經存在的數據則不插入
                    f = autoPutUserService.insertIgnore(bmsuser);

                    if (f) {
                        count++;
                    }
                }

                //匯總數據
                AutoPutUser autoPutUser = new AutoPutUser();
                autoPutUser.setTotalCount(autoPutUserService.getUserCount());
                autoPutUser.setCount(count);
                autoPutUser.setCountDate(new Date(System.currentTimeMillis()));

                String today = DateTime.now().toString("yyyy-MM-dd");
                Integer oldCount = autoPutUserService.getOldCount(today);

                //如果今天導入過了就更新否則插入
                if (!oldCount.equals(0)) {
                    autoPutUserService.updateUserData(autoPutUser, today,
                        oldCount);
                } else {
                    autoPutUserService.gatherUserData(autoPutUser);
                }

                //註意:要關閉流
                read.close();
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }

            File newFile = new File(file.getPath() +
                    System.currentTimeMillis() + ".complate");
            file.renameTo(newFile);
        } else {
            log.error(fileName + " file is not exists");
        }
    }

    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    public AutoPutUserService getAutoPutUserService() {
        return autoPutUserService;
    }

    public void setAutoPutUserService(AutoPutUserService autoPutUserService) {
        this.autoPutUserService = autoPutUserService;
    }
}

附帶:

1、sql腳本

CREATE TABLE `bmsuser` (
  `id` int(255) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(32) DEFAULT NULL ,
  `city` varchar(32) DEFAULT NULL COMMENT ,
  PRIMARY KEY (`bmsid`),
  UNIQUE KEY `bbLoginName` (`bbLoginName`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

2、文件格式,命名為yyyyMMdd.txt

‘張三‘, ‘深圳‘

java利用WatchService實時監控某個目錄下的文件變化並按行解析(註:附源代碼)