Kettle大量資料快速匯出的解決方案(利用SQL匯出百萬級資料,挺快的)
阿新 • • 發佈:2020-11-18
1.開發背景
在web專案中,經常會需要查詢資料匯出excel,以前比較常見的就是用poi。使用poi的時候也有兩種方式,一種就是直接將集合一次性匯出為excel,還有一種是分批次追加的方式適合資料量較大的情況。poi支援xls和xlsx,使用2003版本的只支援6萬多行以下的資料量,使用2007版本的支援百萬行。但是呢,當資料量大了之後這種方式卻非常耗記憶體和時間。
接觸了etl之後就想著用kettle來做導資料,經過測試是完全可行的。幾十萬行,一百萬行都能快速匯出來,程式碼也非常簡單。
2.kettle相關maven依賴如下
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-vfs2</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>org.scannotation</groupId> <artifactId>scannotation</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>dom4j</groupId> <artifactId>dom4j</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-vfs</artifactId> <version>5.2.0.0</version> <classifier>pentaho</classifier> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-engine</artifactId> <version>5.2.0.0</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>5.2.0.0</version> </dependency>
OK,準備工作完成後開始編寫Kettle工具類:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package com.sckj.kettle; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.net.URL; import java.net.URLDecoder; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import org.apache.struts2.ServletActionContext; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; public class Kettle { public Kettle() { } public static InputStream export(String sql, String savePath, String hostIp, String dbName, String port, String userName, String password) { FileInputStream in = null; try { HttpServletResponse response = ServletActionContext.getResponse(); URL url = Kettle.class.getResource("/"); String paths = url.getFile().substring(0, url.getFile().indexOf("classes")); paths = URLDecoder.decode(paths); Long times = System.currentTimeMillis(); File path = new File(savePath); if (!path.exists() && !path.isDirectory()) { path.mkdir(); path.canWrite(); } KettleEnvironment.init(); TransMeta tm = new TransMeta(paths + "classes/" + "export.ktr"); Trans trans = new Trans(tm); trans.setVariable("hostIp", hostIp); trans.setVariable("dbName", dbName); trans.setVariable("port", port); trans.setVariable("userName", userName); trans.setVariable("password", password); trans.setVariable("sql", sql); trans.setVariable("savePath", savePath + "/" + times); trans.prepareExecution((String[])null); trans.startThreads(); trans.waitUntilFinished(); File file = new File(savePath + "/" + times + ".xlsx"); File file2 = new File(savePath + "/" + times + ".xls"); ServletOutputStream out; if (file.exists()) { in = new FileInputStream(file); response.reset(); response.setContentType("application/x-download;charset=UTF-8"); response.addHeader("Content-Disposition", "attachment;filename='" + times + ".xlsx'"); response.addHeader("Content-Length", String.valueOf(file.length())); out = response.getOutputStream(); writeFile(out, in); file.delete(); } else if (file2.exists()) { in = new FileInputStream(file2); response.reset(); response.setContentType("application/x-download;charset=UTF-8"); response.addHeader("Content-Disposition", "attachment;filename='" + times + ".xls'"); response.addHeader("Content-Length", String.valueOf(file2.length())); out = response.getOutputStream(); writeFile(out, in); file2.delete(); } else { response.setContentType("text/html;charset=UTF-8"); PrintWriter writer = response.getWriter(); writer.write("<script>alert('檔案不存在!');window.close();</script>"); } } catch (Exception var18) { var18.printStackTrace(); } return in; } public static void writeFile(OutputStream fos, InputStream fis) throws IOException { byte[] buffer = new byte[1024]; boolean var3 = false; int len; while((len = fis.read(buffer)) > 0) { fos.write(buffer, 0, len); fos.flush(); } fis.close(); fos.close(); } }
以上工具類要讀取一個export.ktr的檔案,我們把它放在\src\main\java下(當然你也可以放在你想要的路徑下,修改以上程式能獲取到就是了),記得打包不要排除了,不然找不到
export.ktr檔案內容(拷貝我的就行,如果不能使用自己去網上找個,百度下里面引數意思,資料庫我用的mysql):
<?xml version="1.0" encoding="UTF-8"?> <transformation> <info> <name>export</name> <description/> <extended_description/> <trans_version/> <trans_type>Normal</trans_type> <trans_status>0</trans_status> <directory>/</directory> <parameters> </parameters> <log> <trans-log-table><connection/> <schema/> <table/> <size_limit_lines/> <interval/> <timeout_days/> <field><id>ID_BATCH</id><enabled>Y</enabled><name>ID_BATCH</name></field><field><id>CHANNEL_ID</id><enabled>Y</enabled><name>CHANNEL_ID</name></field><field><id>TRANSNAME</id><enabled>Y</enabled><name>TRANSNAME</name></field><field><id>STATUS</id><enabled>Y</enabled><name>STATUS</name></field><field><id>LINES_READ</id><enabled>Y</enabled><name>LINES_READ</name><subject/></field><field><id>LINES_WRITTEN</id><enabled>Y</enabled><name>LINES_WRITTEN</name><subject/></field><field><id>LINES_UPDATED</id><enabled>Y</enabled><name>LINES_UPDATED</name><subject/></field><field><id>LINES_INPUT</id><enabled>Y</enabled><name>LINES_INPUT</name><subject/></field><field><id>LINES_OUTPUT</id><enabled>Y</enabled><name>LINES_OUTPUT</name><subject/></field><field><id>LINES_REJECTED</id><enabled>Y</enabled><name>LINES_REJECTED</name><subject/></field><field><id>ERRORS</id><enabled>Y</enabled><name>ERRORS</name></field><field><id>STARTDATE</id><enabled>Y</enabled><name>STARTDATE</name></field><field><id>ENDDATE</id><enabled>Y</enabled><name>ENDDATE</name></field><field><id>LOGDATE</id><enabled>Y</enabled><name>LOGDATE</name></field><field><id>DEPDATE</id><enabled>Y</enabled><name>DEPDATE</name></field><field><id>REPLAYDATE</id><enabled>Y</enabled><name>REPLAYDATE</name></field><field><id>LOG_FIELD</id><enabled>Y</enabled><name>LOG_FIELD</name></field><field><id>EXECUTING_SERVER</id><enabled>N</enabled><name>EXECUTING_SERVER</name></field><field><id>EXECUTING_USER</id><enabled>N</enabled><name>EXECUTING_USER</name></field><field><id>CLIENT</id><enabled>N</enabled><name>CLIENT</name></field></trans-log-table> <perf-log-table><connection/> <schema/> <table/> <interval/> <timeout_days/> <field><id>ID_BATCH</id><enabled>Y</enabled><name>ID_BATCH</name></field><field><id>SEQ_NR</id><enabled>Y</enabled><name>SEQ_NR</name></field><field><id>LOGDATE</id><enabled>Y</enabled><name>LOGDATE</name></field><field><id>TRANSNAME</id><enabled>Y</enabled><name>TRANSNAME</name></field><field><id>STEPNAME</id><enabled>Y</enabled><name>STEPNAME</name></field><field><id>STEP_COPY</id><enabled>Y</enabled><name>STEP_COPY</name></field><field><id>LINES_READ</id><enabled>Y</enabled><name>LINES_READ</name></field><field><id>LINES_WRITTEN</id><enabled>Y</enabled><name>LINES_WRITTEN</name></field><field><id>LINES_UPDATED</id><enabled>Y</enabled><name>LINES_UPDATED</name></field><field><id>LINES_INPUT</id><enabled>Y</enabled><name>LINES_INPUT</name></field><field><id>LINES_OUTPUT</id><enabled>Y</enabled><name>LINES_OUTPUT</name></field><field><id>LINES_REJECTED</id><enabled>Y</enabled><name>LINES_REJECTED</name></field><field><id>ERRORS</id><enabled>Y</enabled><name>ERRORS</name></field><field><id>INPUT_BUFFER_ROWS</id><enabled>Y</enabled><name>INPUT_BUFFER_ROWS</name></field><field><id>OUTPUT_BUFFER_ROWS</id><enabled>Y</enabled><name>OUTPUT_BUFFER_ROWS</name></field></perf-log-table> <channel-log-table><connection/> <schema/> <table/> <timeout_days/> <field><id>ID_BATCH</id><enabled>Y</enabled><name>ID_BATCH</name></field><field><id>CHANNEL_ID</id><enabled>Y</enabled><name>CHANNEL_ID</name></field><field><id>LOG_DATE</id><enabled>Y</enabled><name>LOG_DATE</name></field><field><id>LOGGING_OBJECT_TYPE</id><enabled>Y</enabled><name>LOGGING_OBJECT_TYPE</name></field><field><id>OBJECT_NAME</id><enabled>Y</enabled><name>OBJECT_NAME</name></field><field><id>OBJECT_COPY</id><enabled>Y</enabled><name>OBJECT_COPY</name></field><field><id>REPOSITORY_DIRECTORY</id><enabled>Y</enabled><name>REPOSITORY_DIRECTORY</name></field><field><id>FILENAME</id><enabled>Y</enabled><name>FILENAME</name></field><field><id>OBJECT_ID</id><enabled>Y</enabled><name>OBJECT_ID</name></field><field><id>OBJECT_REVISION</id><enabled>Y</enabled><name>OBJECT_REVISION</name></field><field><id>PARENT_CHANNEL_ID</id><enabled>Y</enabled><name>PARENT_CHANNEL_ID</name></field><field><id>ROOT_CHANNEL_ID</id><enabled>Y</enabled><name>ROOT_CHANNEL_ID</name></field></channel-log-table> <step-log-table><connection/> <schema/> <table/> <timeout_days/> <field><id>ID_BATCH</id><enabled>Y</enabled><name>ID_BATCH</name></field><field><id>CHANNEL_ID</id><enabled>Y</enabled><name>CHANNEL_ID</name></field><field><id>LOG_DATE</id><enabled>Y</enabled><name>LOG_DATE</name></field><field><id>TRANSNAME</id><enabled>Y</enabled><name>TRANSNAME</name></field><field><id>STEPNAME</id><enabled>Y</enabled><name>STEPNAME</name></field><field><id>STEP_COPY</id><enabled>Y</enabled><name>STEP_COPY</name></field><field><id>LINES_READ</id><enabled>Y</enabled><name>LINES_READ</name></field><field><id>LINES_WRITTEN</id><enabled>Y</enabled><name>LINES_WRITTEN</name></field><field><id>LINES_UPDATED</id><enabled>Y</enabled><name>LINES_UPDATED</name></field><field><id>LINES_INPUT</id><enabled>Y</enabled><name>LINES_INPUT</name></field><field><id>LINES_OUTPUT</id><enabled>Y</enabled><name>LINES_OUTPUT</name></field><field><id>LINES_REJECTED</id><enabled>Y</enabled><name>LINES_REJECTED</name></field><field><id>ERRORS</id><enabled>Y</enabled><name>ERRORS</name></field><field><id>LOG_FIELD</id><enabled>N</enabled><name>LOG_FIELD</name></field></step-log-table> <metrics-log-table><connection/> <schema/> <table/> <timeout_days/> <field><id>ID_BATCH</id><enabled>Y</enabled><name>ID_BATCH</name></field><field><id>CHANNEL_ID</id><enabled>Y</enabled><name>CHANNEL_ID</name></field><field><id>LOG_DATE</id><enabled>Y</enabled><name>LOG_DATE</name></field><field><id>METRICS_DATE</id><enabled>Y</enabled><name>METRICS_DATE</name></field><field><id>METRICS_CODE</id><enabled>Y</enabled><name>METRICS_CODE</name></field><field><id>METRICS_DESCRIPTION</id><enabled>Y</enabled><name>METRICS_DESCRIPTION</name></field><field><id>METRICS_SUBJECT</id><enabled>Y</enabled><name>METRICS_SUBJECT</name></field><field><id>METRICS_TYPE</id><enabled>Y</enabled><name>METRICS_TYPE</name></field><field><id>METRICS_VALUE</id><enabled>Y</enabled><name>METRICS_VALUE</name></field></metrics-log-table> </log> <maxdate> <connection/> <table/> <field/> <offset>0.0</offset> <maxdiff>0.0</maxdiff> </maxdate> <size_rowset>10000</size_rowset> <sleep_time_empty>50</sleep_time_empty> <sleep_time_full>50</sleep_time_full> <unique_connections>N</unique_connections> <feedback_shown>Y</feedback_shown> <feedback_size>50000</feedback_size> <using_thread_priorities>Y</using_thread_priorities> <shared_objects_file/> <capture_step_performance>N</capture_step_performance> <step_performance_capturing_delay>1000</step_performance_capturing_delay> <step_performance_capturing_size_limit>100</step_performance_capturing_size_limit> <dependencies> </dependencies> <partitionschemas> </partitionschemas> <slaveservers> </slaveservers> <clusterschemas> </clusterschemas> <created_user>-</created_user> <created_date>2016/03/01 17:21:47.083</created_date> <modified_user>-</modified_user> <modified_date>2016/09/01 09:29:12.684</modified_date> <key_for_session_key>H4sIAAAAAAAAAAMAAAAAAAAAAAA=</key_for_session_key> <is_key_private>N</is_key_private> </info> <notepads> </notepads> <connection> <name>111</name> <server>${hostIp}</server> <type>MYSQL</type> <access>Native</access> <database>${dbName}</database> <port>${port}</port> <username>${userName}</username> <password>${password}</password> <servername/> <data_tablespace/> <index_tablespace/> <attributes> <attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute> <attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute> <attribute><code>FORCE_IDENTIFIERS_TO_LOWERCASE</code><attribute>N</attribute></attribute> <attribute><code>FORCE_IDENTIFIERS_TO_UPPERCASE</code><attribute>N</attribute></attribute> <attribute><code>IS_CLUSTERED</code><attribute>N</attribute></attribute> <attribute><code>PORT_NUMBER</code><attribute>${port}</attribute></attribute> <attribute><code>PRESERVE_RESERVED_WORD_CASE</code><attribute>N</attribute></attribute> <attribute><code>QUOTE_ALL_FIELDS</code><attribute>N</attribute></attribute> <attribute><code>STREAM_RESULTS</code><attribute>Y</attribute></attribute> <attribute><code>SUPPORTS_BOOLEAN_DATA_TYPE</code><attribute>Y</attribute></attribute> <attribute><code>SUPPORTS_TIMESTAMP_DATA_TYPE</code><attribute>Y</attribute></attribute> <attribute><code>USE_POOLING</code><attribute>N</attribute></attribute> </attributes> </connection> <order> <hop> <from>表输入</from><to>Excel输出</to><enabled>Y</enabled> </hop> </order> <step> <name>Excel输出</name> <type>ExcelOutput</type> <description/> <distribute>Y</distribute> <custom_distribution/> <copies>1</copies> <partitioning> <method>none</method> <schema_name/> </partitioning> <header>Y</header> <footer>N</footer> <encoding>UTF-8</encoding> <append>N</append> <add_to_result_filenames>Y</add_to_result_filenames> <file> <name>${savePath}</name> <extention>xls</extention> <do_not_open_newfile_init>N</do_not_open_newfile_init> <create_parent_folder>N</create_parent_folder> <split>N</split> <add_date>N</add_date> <add_time>N</add_time> <SpecifyFormat>N</SpecifyFormat> <date_time_format/> <sheetname>Sheet1</sheetname> <autosizecolums>Y</autosizecolums> <nullisblank>N</nullisblank> <protect_sheet>N</protect_sheet> <password>Encrypted </password> <splitevery>0</splitevery> <usetempfiles>N</usetempfiles> <tempdirectory/> </file> <template> <enabled>N</enabled> <append>N</append> <filename>C:\Users\jiao\Desktop\通用模板.xls</filename> </template> <fields> </fields> <custom> <header_font_name>arial</header_font_name> <header_font_size>10</header_font_size> <header_font_bold>Y</header_font_bold> <header_font_italic>N</header_font_italic> <header_font_underline>no</header_font_underline> <header_font_orientation>horizontal</header_font_orientation> <header_font_color>white</header_font_color> <header_background_color>dark_teal</header_background_color> <header_row_height>320</header_row_height> <header_alignment>center</header_alignment> <header_image/> <row_font_name>arial</row_font_name> <row_font_size>10</row_font_size> <row_font_color>black</row_font_color> <row_background_color>none</row_background_color> </custom> <cluster_schema/> <remotesteps> <input> </input> <output> </output> </remotesteps> <GUI> <xloc>544</xloc> <yloc>176</yloc> <draw>Y</draw> </GUI> </step> <step> <name>表输入</name> <type>TableInput</type> <description/> <distribute>N</distribute> <custom_distribution/> <copies>1</copies> <partitioning> <method>none</method> <schema_name/> </partitioning> <connection>111</connection> <sql>${sql}</sql> <limit>0</limit> <lookup/> <execute_each_row>N</execute_each_row> <variables_active>Y</variables_active> <lazy_conversion_active>N</lazy_conversion_active> <cluster_schema/> <remotesteps> <input> </input> <output> </output> </remotesteps> <GUI> <xloc>272</xloc> <yloc>176</yloc> <draw>Y</draw> </GUI> </step> <step_error_handling> </step_error_handling> <slave-step-copy-partition-distribution> </slave-step-copy-partition-distribution> <slave_transformation>N</slave_transformation> </transformation>
主方法裡呼叫:
Kettle.export(sql, SystemConstant.EXPORT_PATH, "資料庫IP", "資料庫名字", "資料庫埠", "資料庫賬號", "資料庫密碼");
你也可以用Springboot整合,有對應maven及配置,自行查資料。
效率測試:
*匯出10w行記錄
執行時間: 1133ms
執行時間: 1082ms
執行時間: 1096ms
* 匯出100w行記錄
執行時間: 39784ms
執行時間: 8566ms
執行時間: 8622ms
* Excel 2007行數極限 1048575 執行時間: 9686ms
第一次導資料要載入kettle元件執行稍慢,後面幾次再導資料速度就飛快了,更多結果有興趣的可以去試試。