|
@@ -2,10 +2,8 @@ package cn.iocoder.yudao.module.pms.job.sap;
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil;
|
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
|
-import cn.hutool.core.util.ObjUtil;
|
|
|
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
|
|
|
import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
|
|
|
-import cn.iocoder.yudao.module.pms.dal.mysql.iotsappickinglist.IotSapPickingListMapper;
|
|
|
import cn.iocoder.yudao.module.pms.sap.SapConnector;
|
|
|
import cn.iocoder.yudao.module.pms.sap.service.IotSapService;
|
|
|
import cn.iocoder.yudao.module.pms.sap.vo.IotSapStockVO;
|
|
@@ -20,7 +18,9 @@ import org.springframework.stereotype.Component;
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static cn.iocoder.yudao.module.pms.framework.config.MultiThreadConfiguration.PMS_THREAD_POOL_TASK_EXECUTOR;
|
|
@@ -35,8 +35,7 @@ public class SyncSapStockJob implements JobHandler {
|
|
|
private SapConnector sapConnector;
|
|
|
@Autowired
|
|
|
private SapOrgApi sapOrgApi;
|
|
|
- @Autowired
|
|
|
- private IotSapPickingListMapper sapPickingListMapper;
|
|
|
+
|
|
|
@Resource(name = PMS_THREAD_POOL_TASK_EXECUTOR)
|
|
|
private ThreadPoolTaskExecutor pmsThreadPoolTaskExecutor;
|
|
|
@Autowired
|
|
@@ -50,59 +49,169 @@ public class SyncSapStockJob implements JobHandler {
|
|
|
if (CollUtil.isEmpty(factoryCodes)) {
|
|
|
return "No SAP Factory";
|
|
|
}
|
|
|
- factoryCodes.forEach(factory -> {
|
|
|
- CountDownLatch latch = new CountDownLatch(factoryCodes.size());;
|
|
|
- pmsThreadPoolTaskExecutor.execute(() -> {
|
|
|
+ log.info("共找到 {} 个工厂需要同步: {}", factoryCodes.size(), factoryCodes);
|
|
|
+
|
|
|
+ // 提前获取SAP连接,避免每个线程重复创建
|
|
|
+ JCoDestination destination;
|
|
|
+ try {
|
|
|
+ destination = sapConnector.getDestination();
|
|
|
+ // 测试连接是否有效
|
|
|
+ destination.ping();
|
|
|
+ log.info("SAP连接建立成功");
|
|
|
+ } catch (JCoException e) {
|
|
|
+ log.error("SAP连接建立失败", e);
|
|
|
+ return "SAP连接失败: " + e.getMessage();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 使用CountDownLatch等待所有线程完成
|
|
|
+ CountDownLatch latch = new CountDownLatch(factoryCodes.size());
|
|
|
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
|
|
|
+ List<String> failedFactories = new ArrayList<>();
|
|
|
+ List<String> successFactories = new ArrayList<>();
|
|
|
+
|
|
|
+
|
|
|
+ // 为每个工厂创建异步任务
|
|
|
+ for (String factory : factoryCodes) {
|
|
|
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
|
try {
|
|
|
- JCoDestination destination = sapConnector.getDestination();
|
|
|
- JCoFunction function = destination.getRepository().getFunction("ZPMS_002");
|
|
|
- if (ObjUtil.isNotEmpty(function)) {
|
|
|
- // 设置输入参数
|
|
|
- JCoParameterList input = function.getImportParameterList();
|
|
|
- input.setValue("IV_MATNR", ""); // 物料编号
|
|
|
- input.setValue("IV_WERKS", factory); // 工厂
|
|
|
- input.setValue("IV_LGORT", ""); // 库存地点
|
|
|
- // 执行 RFC 调用
|
|
|
- function.execute(destination);
|
|
|
- // 获取输出参数
|
|
|
- JCoParameterList output = function.getTableParameterList();
|
|
|
- JCoTable etStockTable = function.getTableParameterList().getTable("ET_STOCK");
|
|
|
- System.out.println(factory + "当前工厂对应的SAP库存数量:" + etStockTable.getNumRows());
|
|
|
- List<IotSapStockVO> sapStocks = new ArrayList<>();
|
|
|
- if (etStockTable != null && etStockTable.getNumRows() > 0) {
|
|
|
- for (int i = 0; i < etStockTable.getNumRows(); i++) {
|
|
|
- etStockTable.setRow(i);
|
|
|
- IotSapStockVO sapStock = new IotSapStockVO();
|
|
|
- sapStock.setWERKS(etStockTable.getString("WERKS"));
|
|
|
- sapStock.setMATNR(etStockTable.getString("MATNR"));
|
|
|
- sapStock.setMAKTX(etStockTable.getString("MAKTX"));
|
|
|
- sapStock.setLABST(etStockTable.getBigDecimal("LABST"));
|
|
|
- sapStock.setLGORT(etStockTable.getString("LGORT"));
|
|
|
- sapStock.setCHARG(etStockTable.getString("CHARG"));
|
|
|
- sapStock.setSOBKZ(etStockTable.getString("SOBKZ"));
|
|
|
- sapStock.setPSPNR(etStockTable.getString("PSPNR"));
|
|
|
- sapStock.setPOSID(etStockTable.getString("POSID"));
|
|
|
- sapStock.setMEINS(etStockTable.getString("MEINS"));
|
|
|
- sapStock.setINSME(etStockTable.getString("INSME"));
|
|
|
- sapStock.setSPEME(etStockTable.getString("SPEME"));
|
|
|
- sapStock.setJIAGE(etStockTable.getBigDecimal("JIAGE"));
|
|
|
- sapStocks.add(sapStock);
|
|
|
- }
|
|
|
- }
|
|
|
- // 处理头部表信息 和 明细表数据
|
|
|
- if (CollUtil.isNotEmpty(sapStocks)) {
|
|
|
- iotSapService.processSapStock(factory, sapStocks);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (JCoException e) {
|
|
|
- // 记录调用接口异常日志
|
|
|
- throw new RuntimeException(e);
|
|
|
+ syncFactoryStock(destination, factory);
|
|
|
+ successFactories.add(factory);
|
|
|
+ log.info("工厂 {} 库存同步完成", factory);
|
|
|
+ } catch (Exception e) {
|
|
|
+ failedFactories.add(factory);
|
|
|
+ log.error("工厂 {} 库存同步失败", factory, e);
|
|
|
} finally {
|
|
|
latch.countDown();
|
|
|
}
|
|
|
- });
|
|
|
- });
|
|
|
- return "SAP Stock Info: ET_STOCK";
|
|
|
+ }, pmsThreadPoolTaskExecutor);
|
|
|
+
|
|
|
+ futures.add(future);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 等待所有任务完成,设置超时时间
|
|
|
+ try {
|
|
|
+ boolean completed = latch.await(3, TimeUnit.MINUTES);
|
|
|
+ if (!completed) {
|
|
|
+ log.warn("SAP库存同步任务超时,可能有些工厂数据未完成同步");
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.error("SAP库存同步任务被中断", e);
|
|
|
+ return "任务被中断";
|
|
|
+ }
|
|
|
+
|
|
|
+ // 汇总执行结果
|
|
|
+ String result = buildResultMessage(factoryCodes.size(), successFactories.size(), failedFactories);
|
|
|
+ log.info("SAP库存同步任务完成: {}", result);
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步单个工厂的库存数据
|
|
|
+ */
|
|
|
+ private void syncFactoryStock(JCoDestination destination, String factory) {
|
|
|
+ JCoFunction function = null;
|
|
|
+ try {
|
|
|
+ // 每个线程使用独立的JCoFunction实例
|
|
|
+ function = destination.getRepository().getFunction("ZPMS_002");
|
|
|
+ if (function == null) {
|
|
|
+ throw new RuntimeException("未找到SAP函数 ZPMS_002");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 设置输入参数
|
|
|
+ JCoParameterList input = function.getImportParameterList();
|
|
|
+ input.setValue("IV_MATNR", ""); // 物料编号
|
|
|
+ input.setValue("IV_WERKS", factory); // 工厂
|
|
|
+ input.setValue("IV_LGORT", ""); // 库存地点
|
|
|
+
|
|
|
+ // 执行 RFC 调用
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ function.execute(destination);
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ log.debug("工厂 {} SAP调用耗时: {}ms", factory, (endTime - startTime));
|
|
|
+
|
|
|
+ // 处理返回数据
|
|
|
+ JCoTable etStockTable = function.getTableParameterList().getTable("ET_STOCK");
|
|
|
+ if (etStockTable == null) {
|
|
|
+ log.warn("工厂 {} 未返回ET_STOCK表数据", factory);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ int rowCount = etStockTable.getNumRows();
|
|
|
+ log.info("工厂 {} 获取到 {} 条库存数据", factory, rowCount);
|
|
|
+
|
|
|
+ if (rowCount > 0) {
|
|
|
+ List<IotSapStockVO> sapStocks = parseSapStockData(etStockTable, factory);
|
|
|
+ if (CollUtil.isNotEmpty(sapStocks)) {
|
|
|
+ iotSapService.processSapStock(factory, sapStocks);
|
|
|
+ log.info("工厂 {} 成功处理 {} 条库存记录", factory, sapStocks.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (JCoException e) {
|
|
|
+ throw new RuntimeException("工厂 " + factory + " SAP调用失败", e);
|
|
|
+ } finally {
|
|
|
+ // 清理JCo资源
|
|
|
+ if (function != null) {
|
|
|
+ try {
|
|
|
+ // JCoFunction没有close方法,但可以显式清理
|
|
|
+ function = null;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("清理JCoFunction资源时发生警告", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 解析SAP库存数据
|
|
|
+ */
|
|
|
+ private List<IotSapStockVO> parseSapStockData(JCoTable etStockTable, String factory) {
|
|
|
+ List<IotSapStockVO> sapStocks = new ArrayList<>();
|
|
|
+
|
|
|
+ for (int i = 0; i < etStockTable.getNumRows(); i++) {
|
|
|
+ etStockTable.setRow(i);
|
|
|
+ try {
|
|
|
+ IotSapStockVO sapStock = new IotSapStockVO();
|
|
|
+ sapStock.setWERKS(etStockTable.getString("WERKS"));
|
|
|
+ sapStock.setMATNR(etStockTable.getString("MATNR"));
|
|
|
+ sapStock.setMAKTX(etStockTable.getString("MAKTX"));
|
|
|
+ sapStock.setLABST(etStockTable.getBigDecimal("LABST"));
|
|
|
+ sapStock.setLGORT(etStockTable.getString("LGORT"));
|
|
|
+ sapStock.setCHARG(etStockTable.getString("CHARG"));
|
|
|
+ sapStock.setSOBKZ(etStockTable.getString("SOBKZ"));
|
|
|
+ sapStock.setPSPNR(etStockTable.getString("PSPNR"));
|
|
|
+ sapStock.setPOSID(etStockTable.getString("POSID"));
|
|
|
+ sapStock.setMEINS(etStockTable.getString("MEINS"));
|
|
|
+ sapStock.setINSME(etStockTable.getString("INSME"));
|
|
|
+ sapStock.setSPEME(etStockTable.getString("SPEME"));
|
|
|
+ sapStock.setJIAGE(etStockTable.getBigDecimal("JIAGE"));
|
|
|
+
|
|
|
+ sapStocks.add(sapStock);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析工厂 {} 第 {} 行库存数据时发生错误", factory, i, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return sapStocks;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构建结果消息
|
|
|
+ */
|
|
|
+ private String buildResultMessage(int total, int success, List<String> failedFactories) {
|
|
|
+ StringBuilder result = new StringBuilder();
|
|
|
+ result.append("SAP库存同步完成。总计: ").append(total)
|
|
|
+ .append(", 成功: ").append(success)
|
|
|
+ .append(", 失败: ").append(total - success);
|
|
|
+
|
|
|
+ if (CollUtil.isNotEmpty(failedFactories)) {
|
|
|
+ result.append("。失败工厂: ").append(failedFactories);
|
|
|
+ }
|
|
|
+
|
|
|
+ return result.toString();
|
|
|
}
|
|
|
|
|
|
/**
|