瀏覽代碼

Merge branch 'masterback'

zhangcl 9 小時之前
父節點
當前提交
2fee77dd60

+ 6 - 0
yudao-module-pms/yudao-module-pms-biz/src/main/java/cn/iocoder/yudao/module/pms/dal/mysql/iotsapstock/IotSapStockMapper.java

@@ -3,10 +3,12 @@ package cn.iocoder.yudao.module.pms.dal.mysql.iotsapstock;
 import cn.iocoder.yudao.framework.common.pojo.PageResult;
 import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
 import cn.iocoder.yudao.framework.mybatis.core.query.MPJLambdaWrapperX;
+import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
 import cn.iocoder.yudao.module.pms.controller.admin.iotsapstock.vo.IotSapStockPageReqVO;
 import cn.iocoder.yudao.module.pms.dal.dataobject.iotsapstock.IotSapStockDO;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Select;
 
 import java.util.List;
 
@@ -96,4 +98,8 @@ public interface IotSapStockMapper extends BaseMapperX<IotSapStockDO> {
                 .gt(IotSapStockDO::getSafetyStock, 0));
     }
 
+    @Select("SELECT * FROM rq_iot_sap_stock WHERE deleted = 0 AND tenant_id = 1")
+    @TenantIgnore
+    List<IotSapStockDO> allSapStocks();
+
 }

+ 1 - 1
yudao-module-pms/yudao-module-pms-biz/src/main/java/cn/iocoder/yudao/module/pms/dal/mysql/iotsapstocklog/IotSapStockLogMapper.java

@@ -53,7 +53,7 @@ public interface IotSapStockLogMapper extends BaseMapperX<IotSapStockLogDO> {
      *
      * @return 删除条数
      */
-    @Delete("DELETE FROM rq_iot_sap_stock_log WHERE 1=1 AND factory_code = #{factoryCode}")
+    @Delete("DELETE FROM rq_iot_sap_stock_log WHERE 1=1")
     Integer deleteSapStockLogs(@Param("factoryCode") String factoryCode);
 
 }

+ 72 - 0
yudao-module-pms/yudao-module-pms-biz/src/main/java/cn/iocoder/yudao/module/pms/job/sap/SyncSapStockContext.java

@@ -0,0 +1,72 @@
+package cn.iocoder.yudao.module.pms.job.sap;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.module.pms.dal.dataobject.iotsapstock.IotSapStockDO;
+import cn.iocoder.yudao.module.system.api.dept.dto.DeptRespDTO;
+import cn.iocoder.yudao.module.system.api.saporg.dto.SapOrgRespDTO;
+import com.sap.conn.jco.JCoDestination;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SyncSapStockContext {
+    public JCoDestination destination;
+    public List<IotSapStockDO> existStocks;
+    public List<SapOrgRespDTO> storageLocations;
+    public List<SapOrgRespDTO> factories;
+    public List<DeptRespDTO> depts;
+
+    // 映射关系
+    public Map<String, Long> factoryIdPair = new HashMap<>();
+    public Map<String, String> factoryNamePair = new HashMap<>();
+    public Map<String, Long> storageLocationIdPair = new HashMap<>();
+    public Map<Long, String> stockLocationCodeIdPair = new HashMap<>();
+    public Map<String, Long> locationCodeDeptIdPair = new HashMap<>();
+    public Map<String, String> stockLocationNamePair = new HashMap<>();
+
+    boolean valid = false;
+
+    SyncSapStockContext invalid() {
+        SyncSapStockContext context = new SyncSapStockContext();
+        valid = false;
+        return context;
+    }
+
+    boolean isValid() {
+        return valid && destination != null;
+    }
+
+    void buildMappings() {
+        // 工厂映射
+        factories.forEach(factory -> {
+            factoryIdPair.put(factory.getFactoryCode(), factory.getId());
+            factoryNamePair.put(factory.getFactoryCode(), factory.getFactoryName());
+        });
+
+        // 库存地点映射
+        storageLocations.forEach(location -> {
+            String uniqueKey = StrUtil.join("-", location.getFactoryCode(), location.getStorageLocationCode());
+            stockLocationCodeIdPair.put(location.getId(), uniqueKey);
+            storageLocationIdPair.put(uniqueKey, location.getId());
+            stockLocationNamePair.put(uniqueKey, location.getStorageLocationName());
+        });
+
+        // 部门映射
+        depts.forEach(dept -> {
+            Set<Long> stockLocationIds = dept.getStockLocationIds();
+            if (CollUtil.isNotEmpty(stockLocationIds)) {
+                stockLocationIds.forEach(locationId -> {
+                    if (stockLocationCodeIdPair.containsKey(locationId)) {
+                        String uniqueKey = stockLocationCodeIdPair.get(locationId);
+                        locationCodeDeptIdPair.put(uniqueKey, dept.getId());
+                    }
+                });
+            }
+        });
+
+        valid = true;
+    }
+}

+ 138 - 62
yudao-module-pms/yudao-module-pms-biz/src/main/java/cn/iocoder/yudao/module/pms/job/sap/SyncSapStockJob.java

@@ -3,13 +3,17 @@ package cn.iocoder.yudao.module.pms.job.sap;
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.collection.CollectionUtil;
 import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
-import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
+import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
+import cn.iocoder.yudao.module.pms.dal.mysql.iotsapstock.IotSapStockMapper;
 import cn.iocoder.yudao.module.pms.sap.SapConnector;
 import cn.iocoder.yudao.module.pms.sap.service.IotSapService;
 import cn.iocoder.yudao.module.pms.sap.vo.IotSapStockVO;
+import cn.iocoder.yudao.module.system.api.dept.DeptApi;
 import cn.iocoder.yudao.module.system.api.saporg.SapOrgApi;
 import cn.iocoder.yudao.module.system.api.saporg.dto.SapOrgRespDTO;
-import com.sap.conn.jco.*;
+import com.sap.conn.jco.JCoFunction;
+import com.sap.conn.jco.JCoParameterList;
+import com.sap.conn.jco.JCoTable;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -17,10 +21,8 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static cn.iocoder.yudao.module.pms.framework.config.MultiThreadConfiguration.PMS_THREAD_POOL_TASK_EXECUTOR;
@@ -40,81 +42,93 @@ public class SyncSapStockJob implements JobHandler {
     private ThreadPoolTaskExecutor pmsThreadPoolTaskExecutor;
     @Autowired
     private IotSapService iotSapService;
+    @Autowired
+    private IotSapStockMapper iotSapStockMapper;
+    @Autowired
+    private DeptApi deptApi;
 
     @Override
-    @TenantJob
+    @TenantIgnore
     public String execute(String param) throws Exception {
         // 查询所有工厂 根据工厂 多线程处理 SAP 库存
         List<String> factoryCodes = factoryCodes();
         if (CollUtil.isEmpty(factoryCodes)) {
             return "No SAP Factory";
         }
-        log.info("共找到 {} 个工厂需要同步: {}", factoryCodes.size(), factoryCodes);
+        log.error("共找到 {} 个工厂需要同步: {}", factoryCodes.size(), factoryCodes);
 
-        // 提前获取SAP连接,避免每个线程重复创建
-        JCoDestination destination;
-        try {
-            destination = sapConnector.getDestination();
-            // 测试连接是否有效
-            destination.ping();
-            log.info("SAP连接建立成功");
-        } catch (JCoException e) {
-            log.error("SAP连接建立失败", e);
-            return "SAP连接失败: " + e.getMessage();
+        // 提前获取并缓存基础数据,避免每个线程重复查询
+        SyncSapStockContext syncContext = preloadSyncContext();
+        if (!syncContext.isValid()) {
+            return "基础数据加载失败,无法继续同步";
         }
 
-        // 使用CountDownLatch等待所有线程完成
-        CountDownLatch latch = new CountDownLatch(factoryCodes.size());
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        long startTime = System.currentTimeMillis();
+
+        // 使用单线程循环处理每个工厂(移除了多线程)
+        List<FactorySyncResult> results = new ArrayList<>();
+        for (String factory : factoryCodes) {
+            try {
+                FactorySyncResult result = syncFactoryStockData(factory, syncContext);
+                results.add(result);
+                log.info("工厂 {} 数据处理完成", factory);
+            } catch (Exception e) {
+                log.error("工厂 {} 处理过程中发生异常", factory, e);
+                results.add(FactorySyncResult.failure(factory, e));
+            }
+        }
+
+        // 汇总所有工厂的数据
+        List<IotSapStockVO> allStockData = new ArrayList<>();
         List<String> failedFactories = new ArrayList<>();
         List<String> successFactories = new ArrayList<>();
 
-
-        // 为每个工厂创建异步任务
-        for (String factory : factoryCodes) {
-            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
-                try {
-                    syncFactoryStock(destination, factory);
-                    successFactories.add(factory);
-                    log.info("工厂 {} 库存同步完成", factory);
-                } catch (Exception e) {
-                    failedFactories.add(factory);
-                    log.error("工厂 {} 库存同步失败", factory, e);
-                } finally {
-                    latch.countDown();
+        for (FactorySyncResult result : results) {
+            if (result.success) {
+                if (CollUtil.isNotEmpty(result.stockData)) {
+                    allStockData.addAll(result.stockData);
                 }
-            }, pmsThreadPoolTaskExecutor);
-
-            futures.add(future);
+                successFactories.add(result.factoryCode);
+                log.error("工厂 {} SAP数据获取成功,获取 {} 条记录", result.factoryCode,
+                        CollUtil.size(result.stockData));
+            } else {
+                failedFactories.add(result.factoryCode);
+                log.error("工厂 {} SAP数据获取失败", result.factoryCode, result.error);
+            }
         }
 
-        // 等待所有任务完成,设置超时时间
-        try {
-            boolean completed = latch.await(3, TimeUnit.MINUTES);
-            if (!completed) {
-                log.warn("SAP库存同步任务超时,可能有些工厂数据未完成同步");
+        // 统一处理所有库存数据
+        if (CollUtil.isNotEmpty(allStockData)) {
+            try {
+                log.error("开始统一处理所有工厂的库存数据,总计 {} 条记录", allStockData.size());
+                iotSapService.processSapStockBatch(allStockData, syncContext);
+                log.error("库存数据处理完成");
+            } catch (Exception e) {
+                log.error("统一处理库存数据时发生错误", e);
+                // 如果数据处理失败,认为所有工厂都失败
+                failedFactories.addAll(successFactories);
+                successFactories.clear();
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            log.error("SAP库存同步任务被中断", e);
-            return "任务被中断";
         }
 
+        long endTime = System.currentTimeMillis();
+        log.error("所有工厂同步完成,总耗时: {}ms", (endTime - startTime));
+
         // 汇总执行结果
         String result = buildResultMessage(factoryCodes.size(), successFactories.size(), failedFactories);
-        log.info("SAP库存同步任务完成: {}", result);
+        log.error("SAP库存同步任务完成: {}", result);
 
         return result;
     }
 
     /**
-     * 同步单个工厂的库存数据
+     * 同步单个工厂的库存数据(仅获取数据,不处理)
      */
-    private void syncFactoryStock(JCoDestination destination, String factory) {
+    private FactorySyncResult syncFactoryStockData(String factory, SyncSapStockContext context) {
         JCoFunction function = null;
         try {
             // 每个线程使用独立的JCoFunction实例
-            function = destination.getRepository().getFunction("ZPMS_002");
+            function = context.destination.getRepository().getFunction("ZPMS_002");
             if (function == null) {
                 throw new RuntimeException("未找到SAP函数 ZPMS_002");
             }
@@ -127,31 +141,32 @@ public class SyncSapStockJob implements JobHandler {
 
             // 执行 RFC 调用
             long startTime = System.currentTimeMillis();
-            function.execute(destination);
+            function.execute(context.destination);
             long endTime = System.currentTimeMillis();
 
-            log.debug("工厂 {} SAP调用耗时: {}ms", factory, (endTime - startTime));
+            log.error("工厂 {} SAP调用耗时: {}ms", factory, (endTime - startTime));
 
             // 处理返回数据
             JCoTable etStockTable = function.getTableParameterList().getTable("ET_STOCK");
             if (etStockTable == null) {
-                log.warn("工厂 {} 未返回ET_STOCK表数据", factory);
-                return;
+                log.error("工厂 {} 未返回ET_STOCK表数据", factory);
+                return FactorySyncResult.success(factory, Collections.emptyList());
             }
 
             int rowCount = etStockTable.getNumRows();
-            log.info("工厂 {} 获取到 {} 条库存数据", factory, rowCount);
+            log.error("工厂 {} 获取到 {} 条库存数据", factory, rowCount);
 
+            List<IotSapStockVO> sapStocks = Collections.emptyList();
             if (rowCount > 0) {
-                List<IotSapStockVO> sapStocks = parseSapStockData(etStockTable, factory);
-                if (CollUtil.isNotEmpty(sapStocks)) {
-                    iotSapService.processSapStock(factory, sapStocks);
-                    log.info("工厂 {} 成功处理 {} 条库存记录", factory, sapStocks.size());
-                }
+                sapStocks = parseSapStockData(etStockTable, factory);
+                log.error("工厂 {} 解析出 {} 条库存记录", factory, sapStocks.size());
             }
 
-        } catch (JCoException e) {
-            throw new RuntimeException("工厂 " + factory + " SAP调用失败", e);
+            return FactorySyncResult.success(factory, sapStocks);
+
+        } catch (Exception e) {
+            log.error("工厂 {} SAP数据获取失败", factory, e);
+            return FactorySyncResult.failure(factory, e);
         } finally {
             // 清理JCo资源
             if (function != null) {
@@ -159,7 +174,7 @@ public class SyncSapStockJob implements JobHandler {
                     // JCoFunction没有close方法,但可以显式清理
                     function = null;
                 } catch (Exception e) {
-                    log.warn("清理JCoFunction资源时发生警告", e);
+                    log.error("清理JCoFunction资源时发生警告", e);
                 }
             }
         }
@@ -227,4 +242,65 @@ public class SyncSapStockJob implements JobHandler {
                 .collect(Collectors.toList());
         return CollUtil.isEmpty(factoryCodes) ? CollUtil.empty(String.class) : factoryCodes;
     }
+
+    /**
+     * 预加载同步上下文数据
+     */
+    private SyncSapStockContext preloadSyncContext() {
+        SyncSapStockContext context = new SyncSapStockContext();
+        try {
+
+            // 提前获取SAP连接
+            context.destination = sapConnector.getDestination();
+            context.destination.ping(); // 测试连接
+
+            context.existStocks = iotSapStockMapper.allSapStocks();
+            context.depts = deptApi.ignoredTenantDepts();
+            context.storageLocations = sapOrgApi.getSapOrgByType(3);
+            context.factories = sapOrgApi.getSapOrgByType(1);
+
+            // 构建映射关系
+            context.buildMappings();
+
+            log.error("同步上下文预加载完成: 工厂{}个, 库存地点{}个, 部门{}个, 现有库存{}条",
+                    CollUtil.size(context.factories),
+                    CollUtil.size(context.storageLocations),
+                    CollUtil.size(context.depts),
+                    CollUtil.size(context.existStocks));
+
+            System.out.println(String.format("工厂=%s, 库存地点=%s, 部门=%s, 现有库存=%s",
+                    CollUtil.size(context.factories), CollUtil.size(context.storageLocations), CollUtil.size(context.depts), CollUtil.size(context.existStocks)));
+
+            return context;
+        } catch (Exception e) {
+            log.error("预加载同步上下文失败", e);
+            return context.invalid();
+        }
+    }
+
+    // 内部类用于封装工厂同步结果
+    private static class FactorySyncResult {
+        String factoryCode;
+        List<IotSapStockVO> stockData;
+        Exception error;
+        boolean success;
+
+        FactorySyncResult(String factoryCode) {
+            this.factoryCode = factoryCode;
+            this.success = false;
+        }
+
+        static FactorySyncResult success(String factoryCode, List<IotSapStockVO> stockData) {
+            FactorySyncResult result = new FactorySyncResult(factoryCode);
+            result.stockData = stockData;
+            result.success = true;
+            return result;
+        }
+
+        static FactorySyncResult failure(String factoryCode, Exception error) {
+            FactorySyncResult result = new FactorySyncResult(factoryCode);
+            result.error = error;
+            return result;
+        }
+    }
 }

+ 7 - 0
yudao-module-pms/yudao-module-pms-biz/src/main/java/cn/iocoder/yudao/module/pms/sap/service/IotSapService.java

@@ -40,4 +40,11 @@ public interface IotSapService {
      * @return
      */
     void processSapStock(String factory, List<IotSapStockVO> sapStocks);
+
+    /**
+     * 批量处理SAP库存数据
+     * @param allStockData 所有工厂的库存数据
+     * @param context 同步上下文,包含基础数据映射
+     */
+    void processSapStockBatch(List<IotSapStockVO> allStockData, Object context);
 }

+ 408 - 0
yudao-module-pms/yudao-module-pms-biz/src/main/java/cn/iocoder/yudao/module/pms/sap/service/IotSapServiceImpl.java

@@ -19,6 +19,7 @@ import cn.iocoder.yudao.module.pms.dal.mysql.iotmaterial.IotSapMaterialMapper;
 import cn.iocoder.yudao.module.pms.dal.mysql.iotsappickinglist.IotSapPickingListMapper;
 import cn.iocoder.yudao.module.pms.dal.mysql.iotsapstock.IotSapStockMapper;
 import cn.iocoder.yudao.module.pms.dal.mysql.iotsapstocklog.IotSapStockLogMapper;
+import cn.iocoder.yudao.module.pms.job.sap.SyncSapStockContext;
 import cn.iocoder.yudao.module.pms.sap.vo.*;
 import cn.iocoder.yudao.module.supplier.dal.dataobject.product.SupplierDO;
 import cn.iocoder.yudao.module.supplier.dal.mysql.product.SupplierMapper;
@@ -72,6 +73,10 @@ public class IotSapServiceImpl implements IotSapService {
     @Autowired
     private IotSapPickingListMapper iotSapPickingListMapper;
 
+    // 批量操作的分批大小
+    private static final int BATCH_SIZE = 1000;
+    private static final int LOG_BATCH_SIZE = 2000;
+
     @Override
     public void processMaterials(List<IotSapMaterialVO> sapMaterials) {
         // SAP接口返回的物料编码 砍掉前端的 00000000
@@ -575,6 +580,7 @@ public class IotSapServiceImpl implements IotSapService {
         reqVO.setFactoryCode(factoryCode);
         List<IotSapStockDO> existStocks = TenantUtils.execute(1L, () -> iotSapStockMapper.selectList(reqVO));
         if (CollUtil.isNotEmpty(existStocks)) {
+            log.info("当前工厂{} 当前库中已有sap库存数量{}:", factoryCode, existStockKeys.size());
             existStockKeys = existStocks.stream()
                     // 过滤非空对象和非空code
                     .filter(stk -> ObjUtil.isNotEmpty(stk) && StrUtil.isNotBlank(stk.getMaterialCode())
@@ -726,6 +732,250 @@ public class IotSapServiceImpl implements IotSapService {
         }
     }
 
+    @Override
+    public void processSapStockBatch(List<IotSapStockVO> allStockData, Object contextObj) {
+        if (!(contextObj instanceof SyncSapStockContext)) {
+            throw new IllegalArgumentException("无效的上下文对象");
+        }
+
+        SyncSapStockContext context = (SyncSapStockContext) contextObj;
+
+        if (CollUtil.isEmpty(allStockData)) {
+            log.error("没有需要处理的SAP库存数据");
+            return;
+        }
+
+        log.error("开始批量处理SAP库存数据,总计 {} 条记录", allStockData.size());
+        long startTime = System.currentTimeMillis();
+
+        try {
+            // 按工厂分组处理
+            Map<String, List<IotSapStockVO>> factoryStocksMap = allStockData.stream()
+                    .collect(Collectors.groupingBy(IotSapStockVO::getWERKS));
+
+            log.error("按工厂分组完成,共 {} 个工厂的数据需要处理", factoryStocksMap.size());
+
+            List<IotSapStockDO> totalActualUpdatedStocks = new ArrayList<>();
+            List<IotSapStockDO> totalTobeAddedStocks = new ArrayList<>();
+
+            // 处理每个工厂的数据
+            factoryStocksMap.forEach((factoryCode, stocks) -> {
+                try {
+                    processSingleFactoryStocks(factoryCode, stocks, context, totalTobeAddedStocks, totalActualUpdatedStocks);
+                    log.error("工厂 {} 库存数据处理完成,共处理 {} 条记录", factoryCode, stocks.size());
+                } catch (Exception e) {
+                    log.error("处理工厂 {} 库存数据时发生错误", factoryCode, e);
+                    // 记录错误但继续处理其他工厂
+                }
+            });
+
+            // 分批处理库存日志数据
+            processStockLogsInBatches(allStockData);
+
+            // 分批处理新增库存数据
+            processNewStocksInBatches(totalTobeAddedStocks);
+
+            // 分批处理更新库存数据
+            processUpdatedStocksInBatches(totalActualUpdatedStocks);
+
+            long endTime = System.currentTimeMillis();
+            log.error("所有工厂库存数据处理完成,总耗时: {}ms", (endTime - startTime));
+
+        } catch (Exception e) {
+            log.error("批量处理SAP库存数据时发生错误", e);
+            throw new RuntimeException("SAP库存数据处理失败", e);
+        }
+    }
+
+
+    /**
+     * 分批处理库存日志数据
+     */
+    private void processStockLogsInBatches(List<IotSapStockVO> allStockData) {
+        if (CollUtil.isEmpty(allStockData)) {
+            return;
+        }
+
+        // 准备SAP库存日志数据
+        List<IotSapStockLogDO> tobeAddedSapStockLogs = allStockData.stream()
+                .map(stock -> {
+                    IotSapStockLogDO sapStockLog = new IotSapStockLogDO();
+                    sapStockLog.setFactoryCode(stock.getWERKS());
+                    sapStockLog.setStorageLocationCode(stock.getLGORT());
+                    sapStockLog.setMaterialCode(stock.getMATNR().replaceFirst("^0+", ""));
+                    sapStockLog.setMaterialName(stock.getMAKTX());
+                    sapStockLog.setQuantity(stock.getLABST());
+                    sapStockLog.setUnitPrice(stock.getJIAGE());
+                    sapStockLog.setUnit(stock.getMEINS());
+                    sapStockLog.setSyncTime(LocalDateTime.now());
+                    return sapStockLog;
+                })
+                .collect(Collectors.toList());
+
+        try {
+            // 先删除所有日志记录(分批删除)
+            deleteAllStockLogsInBatches();
+
+            // 分批插入新的日志记录
+            insertStockLogsInBatches(tobeAddedSapStockLogs);
+
+        } catch (Exception e) {
+            log.error("处理库存日志数据时发生错误", e);
+            // 不抛出异常,继续处理其他数据
+        }
+    }
+
+    /**
+     * 分批删除所有库存日志
+     */
+    private void deleteAllStockLogsInBatches() {
+        try {
+            // 直接调用删除方法,不传递工厂代码
+            TenantUtils.execute(1L, () -> {
+                iotSapStockLogMapper.deleteSapStockLogs(null);
+                return null;
+            });
+            log.error("成功删除所有库存日志记录");
+        } catch (Exception e) {
+            log.error("删除库存日志记录时发生错误", e);
+            throw new RuntimeException("删除库存日志失败", e);
+        }
+    }
+
+    /**
+     * 分批插入库存日志
+     */
+    private void insertStockLogsInBatches(List<IotSapStockLogDO> stockLogs) {
+        if (CollUtil.isEmpty(stockLogs)) {
+            return;
+        }
+
+        log.error("开始分批插入库存日志,总计 {} 条记录", stockLogs.size());
+
+        // 分批处理
+        List<List<IotSapStockLogDO>> batches = CollUtil.split(stockLogs, LOG_BATCH_SIZE);
+        CountDownLatch latch = new CountDownLatch(batches.size());
+
+        for (List<IotSapStockLogDO> batch : batches) {
+            pmsThreadPoolTaskExecutor.execute(() -> {
+                try {
+                    TenantUtils.execute(1L, () -> {
+                        try {
+                            iotSapStockLogMapper.insertBatch(batch);
+                            log.error("成功插入一批库存日志,数量: {}", batch.size());
+                        } catch (Exception e) {
+                            log.error("插入库存日志批次时发生错误,批次大小: {}", batch.size(), e);
+                            // 记录错误但继续处理其他批次
+                        }
+                        return null;
+                    });
+                } catch (Exception e) {
+                    log.error("执行租户上下文操作时发生错误", e);
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+            log.error("所有库存日志批次插入完成");
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("等待库存日志插入完成时被中断", e);
+        }
+    }
+
+    /**
+     * 分批处理新增库存数据
+     */
+    private void processNewStocksInBatches(List<IotSapStockDO> newStocks) {
+        if (CollUtil.isEmpty(newStocks)) {
+            log.error("没有需要新增的库存数据");
+            return;
+        }
+
+        log.error("开始分批插入新增库存数据,总计 {} 条记录", newStocks.size());
+
+        List<List<IotSapStockDO>> batches = CollUtil.split(newStocks, BATCH_SIZE);
+        CountDownLatch latch = new CountDownLatch(batches.size());
+        final int[] successCount = {0};
+
+        for (List<IotSapStockDO> batch : batches) {
+            pmsThreadPoolTaskExecutor.execute(() -> {
+                try {
+                    TenantUtils.execute(1L, () -> {
+                        try {
+                            iotSapStockMapper.insertBatch(batch);
+                            successCount[0] += batch.size();
+                            log.error("成功插入一批新增库存,数量: {}", batch.size());
+                        } catch (Exception e) {
+                            log.error("插入新增库存批次时发生错误,批次大小: {}", batch.size(), e);
+                        }
+                        return null;
+                    });
+                } catch (Exception e) {
+                    log.error("执行新增库存租户上下文操作时发生错误", e);
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+            log.error("新增库存数据插入完成,成功插入 {} 条记录", successCount[0]);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("等待新增库存插入完成时被中断", e);
+        }
+    }
+
+    /**
+     * 分批处理更新库存数据
+     */
+    private void processUpdatedStocksInBatches(List<IotSapStockDO> updatedStocks) {
+        if (CollUtil.isEmpty(updatedStocks)) {
+            log.error("没有需要更新的库存数据");
+            return;
+        }
+
+        log.error("开始分批更新库存数据,总计 {} 条记录", updatedStocks.size());
+
+        List<List<IotSapStockDO>> batches = CollUtil.split(updatedStocks, BATCH_SIZE);
+        CountDownLatch latch = new CountDownLatch(batches.size());
+        final int[] successCount = {0};
+
+        for (List<IotSapStockDO> batch : batches) {
+            pmsThreadPoolTaskExecutor.execute(() -> {
+                try {
+                    TenantUtils.execute(1L, () -> {
+                        try {
+                            iotSapStockMapper.updateBatch(batch);
+                            successCount[0] += batch.size();
+                            log.error("成功更新一批库存,数量: {}", batch.size());
+                        } catch (Exception e) {
+                            log.error("更新库存批次时发生错误,批次大小: {}", batch.size(), e);
+                        }
+                        return null;
+                    });
+                } catch (Exception e) {
+                    log.error("执行更新库存租户上下文操作时发生错误", e);
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            latch.await();
+            log.error("库存数据更新完成,成功更新 {} 条记录", successCount[0]);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("等待库存更新完成时被中断", e);
+        }
+    }
+
     /**
      * 创建物料编码到描述的映射
      * @param sapMaterials SAP返回的物料列表
@@ -828,4 +1078,162 @@ public class IotSapServiceImpl implements IotSapService {
                         (existing, replacement) -> existing // 如果有重复的key,保留第一个值
                 ));
     }
+
+    private void processSingleFactoryStocks(String factoryCode, List<IotSapStockVO> stocks, SyncSapStockContext context,
+                                            List<IotSapStockDO> totalTobeAddedStocks, List<IotSapStockDO> totalActualUpdatedStocks) {
+        if (CollUtil.isEmpty(stocks)) {
+            log.error("工厂 {} 没有库存数据需要处理", factoryCode);
+            return;
+        }
+
+        // 筛选出pms系统中未维护的库存地点信息(工厂-库存地点编码)
+        Set<String> notExistStorageLocations = new HashSet<>();
+        stocks.forEach(stock -> {
+            String uniqueKey = StrUtil.join("-", stock.getWERKS(), stock.getLGORT());
+            if (!context.storageLocationIdPair.containsKey(uniqueKey)) {
+                notExistStorageLocations.add(uniqueKey);
+            }
+        });
+
+        if (CollUtil.isNotEmpty(notExistStorageLocations)) {
+            log.error("工厂 {} 发现 {} 个未维护的库存地点: {}", factoryCode,
+                    notExistStorageLocations.size(), notExistStorageLocations);
+            notExistStorageLocations.forEach(location ->
+                    log.error("系统中未维护的工厂编码-库存地点编码: {}", location));
+        }
+
+        // 查询当前工厂的现有库存数据
+        Set<String> existStockKeys = new HashSet<>();
+        IotSapStockPageReqVO reqVO = new IotSapStockPageReqVO();
+        reqVO.setFactoryCode(factoryCode);
+        log.error("当前工厂 {} 已经存在的SAP库存数量:{}", factoryCode, context.existStocks.size());
+        // 从预加载的数据中过滤出当前工厂的库存
+        List<IotSapStockDO> factoryExistStocks = context.existStocks.stream()
+                .filter(stock -> factoryCode.equals(stock.getFactoryCode()))
+                .collect(Collectors.toList());
+
+        if (CollUtil.isNotEmpty(factoryExistStocks)) {
+            log.error("工厂 {} 当前库中已有sap库存数量: {}", factoryCode, factoryExistStocks.size());
+            existStockKeys = factoryExistStocks.stream()
+                    .filter(stk -> ObjUtil.isNotEmpty(stk) && StrUtil.isNotBlank(stk.getMaterialCode())
+                            && StrUtil.isNotBlank(stk.getFactoryCode()) && StrUtil.isNotBlank(stk.getStorageLocationCode()))
+                    .map(stk -> StrUtil.join("-", stk.getFactoryCode(), stk.getStorageLocationCode(), stk.getMaterialCode()))
+                    .collect(Collectors.toSet());
+        }
+
+        log.error("工厂 {} 当前库中已有sap库存唯一键数量: {}", factoryCode, existStockKeys.size());
+
+        // 构建待更新库存映射
+        Map<String, IotSapStockDO> tobeUpdatedStockPair = factoryExistStocks.stream()
+                .filter(stk -> StrUtil.isNotBlank(stk.getMaterialCode()) && StrUtil.isNotBlank(stk.getFactoryCode())
+                        && StrUtil.isNotBlank(stk.getStorageLocationCode()))
+                .collect(Collectors.toMap(
+                        stk -> StrUtil.join("-", stk.getFactoryCode(), stk.getStorageLocationCode(), stk.getMaterialCode()),
+                        stk -> stk,
+                        (existing, replacement) -> existing
+                ));
+
+        // 找出需要新增的库存记录
+        Set<String> finalExistStockKeys = existStockKeys;
+        List<IotSapStockVO> newStocks = stocks.stream()
+                .filter(stk -> factoryCode.equals(stk.getWERKS()) && StrUtil.isNotBlank(stk.getMATNR()) && StrUtil.isNotBlank(stk.getLGORT())
+                        && context.storageLocationIdPair.containsKey(StrUtil.join("-", stk.getWERKS(), stk.getLGORT())))
+                .filter(stk -> {
+                    String processedCode = stk.getMATNR().replaceFirst("^0+", "");
+                    String uniqueKey = StrUtil.join("-", stk.getWERKS(), stk.getLGORT());
+                    return !finalExistStockKeys.contains(StrUtil.join("-", uniqueKey, processedCode));
+                })
+                .collect(Collectors.toList());
+
+        log.error("工厂 {} 需要新增的sap库存数量: {}", factoryCode, newStocks.size());
+
+        // 准备新增的库存数据
+        List<IotSapStockDO> tobeAddedStocks = new ArrayList<>();
+        if (CollUtil.isNotEmpty(newStocks)) {
+            newStocks.forEach(stock -> {
+                String uniqueKey = StrUtil.join("-", stock.getWERKS(), stock.getLGORT());
+                IotSapStockDO sapStock = new IotSapStockDO();
+
+                // 设置部门ID
+                if (context.locationCodeDeptIdPair.containsKey(uniqueKey)) {
+                    sapStock.setDeptId(context.locationCodeDeptIdPair.get(uniqueKey));
+                } else {
+                    // 没有关联组织部门的SAP工厂库存地点,设置默认部门
+                    sapStock.setDeptId(163L); // 科瑞石油技术
+                }
+
+                // 设置工厂信息
+                if (context.factoryIdPair.containsKey(stock.getWERKS())) {
+                    sapStock.setFactoryId(context.factoryIdPair.get(stock.getWERKS()));
+                }
+                sapStock.setFactoryCode(stock.getWERKS());
+                if (context.factoryNamePair.containsKey(stock.getWERKS())) {
+                    sapStock.setFactory(context.factoryNamePair.get(stock.getWERKS()));
+                }
+
+                // 设置库存地点信息
+                if (context.storageLocationIdPair.containsKey(uniqueKey)) {
+                    sapStock.setStorageLocationId(context.storageLocationIdPair.get(uniqueKey));
+                }
+                sapStock.setStorageLocationCode(stock.getLGORT());
+                if (context.stockLocationNamePair.containsKey(uniqueKey)) {
+                    sapStock.setProjectDepartment(context.stockLocationNamePair.get(uniqueKey));
+                }
+
+                // 设置物料信息
+                sapStock.setMaterialCode(stock.getMATNR().replaceFirst("^0+", ""));
+                sapStock.setMaterialName(stock.getMAKTX());
+                sapStock.setQuantity(stock.getLABST());
+                sapStock.setUnitPrice(stock.getJIAGE());
+                sapStock.setUnit(stock.getMEINS());
+                sapStock.setSyncTime(LocalDateTime.now());
+
+                tobeAddedStocks.add(sapStock);
+            });
+        }
+
+        // 准备更新的库存数据
+        List<IotSapStockDO> actualUpdatedStocks = new ArrayList<>();
+        Map<String, IotSapStockVO> existStockPair = createExistStockMap(factoryCode, stocks, existStockKeys, context.storageLocationIdPair);
+
+        if (CollUtil.isNotEmpty(existStockPair)) {
+            Map<String, IotSapStockDO> finalTobeUpdatedStockPair = tobeUpdatedStockPair;
+            existStockPair.forEach((key, sapStockVO) -> {
+                if (finalTobeUpdatedStockPair.containsKey(key)) {
+                    IotSapStockDO existingStock = finalTobeUpdatedStockPair.get(key);
+                    String uniqueKey = StrUtil.join("-", existingStock.getFactoryCode(), existingStock.getStorageLocationCode());
+
+                    // 更新部门关联
+                    if (context.locationCodeDeptIdPair.containsKey(uniqueKey)) {
+                        existingStock.setDeptId(context.locationCodeDeptIdPair.get(uniqueKey));
+                    }
+
+                    // 更新库存信息
+                    existingStock.setUnitPrice(sapStockVO.getJIAGE());
+                    existingStock.setQuantity(sapStockVO.getLABST());
+                    existingStock.setUnit(sapStockVO.getMEINS());
+                    existingStock.setSyncTime(LocalDateTime.now());
+
+                    actualUpdatedStocks.add(existingStock);
+                }
+            });
+        }
+
+        log.error("工厂 {} 需要更新的SAP库存数量: {}", factoryCode, actualUpdatedStocks.size());
+
+        // 本地库存 初始化 批量插入SAP 库存 工厂 库存地点 已经关联了组织架构部门
+        if (CollUtil.isNotEmpty(tobeAddedStocks)) {
+            totalTobeAddedStocks.addAll(tobeAddedStocks);
+        }
+
+        // pms中存在而且SAP接口也返回的库存数据:更新
+        if (CollUtil.isNotEmpty(actualUpdatedStocks)) {
+            totalActualUpdatedStocks.addAll(actualUpdatedStocks);
+        }
+
+        log.error("工厂 {} 库存数据处理完成: 新增 {} 条, 更新 {} 条, 日志 {} 条",
+                factoryCode,
+                CollUtil.size(tobeAddedStocks),
+                CollUtil.size(actualUpdatedStocks));
+    }
 }

+ 7 - 0
yudao-module-system/yudao-module-system-api/src/main/java/cn/iocoder/yudao/module/system/api/dept/DeptApi.java

@@ -37,6 +37,13 @@ public interface DeptApi {
      */
     List<DeptRespDTO> getDeptList();
 
+    /**
+     * 忽略租户 查询所有部门集合
+     *
+     * @return 所有部门
+     */
+    List<DeptRespDTO> ignoredTenantDepts();
+
     /**
      * 校验部门们是否有效。如下情况,视为无效:
      * 1. 部门编号不存在

+ 7 - 0
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/api/dept/DeptApiImpl.java

@@ -47,6 +47,13 @@ public class DeptApiImpl implements DeptApi {
         return BeanUtils.toBean(depts, DeptRespDTO.class);
     }
 
+    @Override
+    public List<DeptRespDTO> ignoredTenantDepts() {
+        DeptListReqVO reqVO = new DeptListReqVO();
+        List<DeptDO> depts = deptService.ignoredTenantDepts(reqVO);
+        return BeanUtils.toBean(depts, DeptRespDTO.class);
+    }
+
     @Override
     public void validateDeptList(Collection<Long> ids) {
         deptService.validateDeptList(ids);

+ 5 - 0
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/dal/mysql/dept/DeptMapper.java

@@ -2,9 +2,11 @@ package cn.iocoder.yudao.module.system.dal.mysql.dept;
 
 import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
 import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
+import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
 import cn.iocoder.yudao.module.system.controller.admin.dept.vo.dept.DeptListReqVO;
 import cn.iocoder.yudao.module.system.dal.dataobject.dept.DeptDO;
 import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Select;
 
 import java.util.Collection;
 import java.util.List;
@@ -39,4 +41,7 @@ public interface DeptMapper extends BaseMapperX<DeptDO> {
         return selectList(DeptDO::getLeaderUserId, id);
     }
 
+    @Select("SELECT * FROM system_dept WHERE deleted = 0 AND tenant_id = 1")
+    @TenantIgnore
+    List<DeptDO> allDepts();
 }

+ 8 - 0
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dept/DeptService.java

@@ -60,6 +60,14 @@ public interface DeptService {
      */
     List<DeptDO> getDeptList(DeptListReqVO reqVO);
 
+    /**
+     * 忽略租户 查询所有部门列表
+     *
+     * @param reqVO 筛选条件请求 VO
+     * @return 部门列表
+     */
+    List<DeptDO> ignoredTenantDepts(DeptListReqVO reqVO);
+
     /**
      * 筛选公司层级的部门列表
      *

+ 5 - 0
yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dept/DeptServiceImpl.java

@@ -258,6 +258,11 @@ public class DeptServiceImpl implements DeptService {
         return list;
     }
 
+    @Override
+    public List<DeptDO> ignoredTenantDepts(DeptListReqVO reqVO) {
+        return deptMapper.allDepts();
+    }
+
     @Override
     public List<DeptDO> companyLevelDepts(DeptListReqVO reqVO) {
         Set<Long> parentIds = new HashSet<>();