/* * @File: pgBatchCopy.js * @Description: pgsql通过COPY命令批量插入数据测试 * @Author: clownce.deng * @Date: 2020-05-14 08:57:00 */ var _ = require("lodash"); var fs = require("fs"); var moment = require("moment"); var logger = require("topsin.logger"); var DB = require("topsin.database"); var error = require("topsin.error"); var console = require("console"); var process = require('process'); var config = require("./config"); try { var argv = process.argv; // 工作中心ID var wid = argv[1]; // 插入数据行数 var data_row_count = _.toNumber(argv[2]); if (_.isNaN(data_row_count) || data_row_count == 0) { throw "invalid parameter: " + argv[2]; } // 初始化数据库连接 var LOCAL_DB_CONN = 'LOCAL_DB_CONN'; DB.addConnection(config.database_conf, LOCAL_DB_CONN); // 测试数据库连接 var isConnected = DB.query(LOCAL_DB_CONN, function (q) { return q.isConnected(); }); if (!isConnected) { throw "Connect to local database failed."; } console.info("Connect to database sucessful."); // 生成本地文件 randomBuildLogFile(wid, data_row_count); console.info("start to batchInsert data..."); var query = DB.query(LOCAL_DB_CONN); var beforeTime = moment(); query.begin(); // COPY服务端寻找文件,\COPY客户端寻找文件(注意:\COPY只能在SQL Shell中执行,它是变相调用COPY FROM STDIN) var sql = "COPY oee_machine_log_v2 (workcenter_id,log_time,log_type,lot_no,partnumber,subpart,lot_serial,station,state," + "programe_name,daq_time,analysis_flag,log_data) FROM '{0}/data{1}.csv' delimiter ',' csv header"; sql = _.format(sql, config.work_path, wid); // 导出数据到文件,带分表不能直接导出,需用select查询 // var sql = "COPY ( select workcenter_id,log_time,log_time2,log_type,lot_no,partnumber,subpart,lot_serial,station,state,programe_name," // + "daq_time,analysis_flag,log_data from oee_machine_log_v2) TO 'F:/workspace/pg_batch_insert/data1.csv' CSV HEADER"; query.execSql(sql); if (query.lastError().isValid()) { query.rollback(); throw "batchInsert data faild. " + query.lastError().text(); } query.commit(); var afterTime = moment(); console.info("batchInsert data success."); var duration = moment.duration(afterTime.diff(beforeTime)); console.info("elapsed time(seconds): " + duration.as("seconds")); fs.writeFile(config.work_path + "/process_" + wid + ".txt", duration.as("seconds")); } catch (e) { console.error(e); } function randomBuildLogFile(workcenter_id, row_count) { var dataCount = row_count; var rowData = []; rowData.push("workcenter_id"); rowData.push("log_time"); rowData.push("log_type"); rowData.push("lot_no"); rowData.push("partnumber"); rowData.push("subpart"); rowData.push("lot_serial"); rowData.push("station"); rowData.push("state"); rowData.push("programe_name"); rowData.push("daq_time"); rowData.push("analysis_flag"); rowData.push("log_data"); var content = _.join(rowData, ","); content += "\n"; for (var index = 0; index < dataCount; index++) { var rowData = []; rowData.push(workcenter_id); rowData.push(moment().format("YYYY-MM-DD HH:mm:ss")); rowData.push("info"); rowData.push("1234567890"); rowData.push("ABCDEFGH"); rowData.push("test_part"); rowData.push("12345"); rowData.push("test_station"); rowData.push("test_state"); rowData.push("test_program"); rowData.push(moment().format("YYYY-MM-DD HH:mm:ss")); rowData.push("t"); rowData.push(randomBuildMapData()); content += _.join(rowData, ","); content += "\n"; } fs.writeFile(config.work_path + "/data" + workcenter_id + ".csv", content); } function randomBuildMapData() { var retMap = {}; for (var count = 1; count <= 100; count++) { retMap["test_key_" + count] = "test_value_" + count; } var retStr; var retList = []; _.forEach(retMap, function (v, k) { retList.push("\"\"" + k + "\"\"" + ": " + "\"\"" + v + "\"\""); }); retStr = "\"{" + _.join(retList, ",") + "}\""; return retStr; }