同构图与异构图CYPHER-TASK设计与TASK锁机制

2024-02-25 06:32

本文主要是介绍同构图与异构图CYPHER-TASK设计与TASK锁机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

同构图与异构图CYPHER-TASK设计与TASK锁机制

  • 问题背景
    • CYPHER-TASK设计
      • 同构图
      • 异构图
      • check-point表结构设计
      • task-lock表结构设计
    • 任务模块解构
      • 数据分块
      • 任务状态回滚
      • 任务状态同步
      • 任务状态锁
    • 完整实现案例
      • 同构图
        • 节点TASK
        • 关系TASK
      • 异构图
        • 节点TASK
        • 关系TASK
    • 备注

问题背景

大规模重复并发执行写入操作会导致图数据库服务堆积大量的写入请求,导致服务性能下降甚至宕机。因此TASK锁机制设计非常重要,必须保证在同一时刻写入任务不可重复执行;检查点机制的设计保证了数据同步的一致性和完整性;TASK占用过多系统内存【尤其在处理大量数据时】图数据库服务会存在宕机风险,数据分块方案的设计很好的避免了这个问题。

CYPHER-TASK设计

同构图

  • 每个任务都需要获取锁然后执行数据构建逻辑,不管构建逻辑是否成功执行TASK结束时必须释放锁
  • [NODE-TASK]负责锁的node_check-point更新以及后续任务的rel_check_point同步
  • [REL-TASK]负责node_check-point的回滚和任务状态同步rel_check_point=node_check_point
# TASK执行流程
[NODE-TASK]->[REL-TASK]

异构图

  • 每个任务都需要获取锁然后执行数据构建逻辑,不管构建逻辑是否成功执行TASK结束时必须释放锁
  • [FROM-NODE-TASK]负责锁的node_check-point更新以及后续任务的rel_check_point同步
  • [TO-NODE-TASK]负责node_check-point的回滚
  • [REL-TASK]负责node_check-point的回滚和任务状态同步rel_check_point=node_check_point
# TASK执行流程
[FROM-NODE-TASK]->[TO-NODE-TASK]->[REL-TASK]

check-point表结构设计

任务状态表负责保存节点TASK和关系TASK的任务状态,在整个任务流上实现任务状态的传递,同时保证数据一致性和完整性。

CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`hcode` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '代码:HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '名称',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '关联类型',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG的hcode',
`node_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT '节点可以获取检查点时间可更改,关系TASK可以获取检查点时间【一个完整的图数据DAG-TASK必须包含节点和关系构建TASK】',
`rel_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT '保存更新前node_check_point的值',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对该检查点任务的具体描述',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '同步全量数据的CYPHER:数据分块方案脚本',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '同步全量数据的CYPHER:不设置时间范围的同步脚本',
`hcreatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`hupdatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '创建人',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '更新人',
`hisvalid` int(11) NOT NULL DEFAULT '1' COMMENT '逻辑删除标记:0-无效;1-有效',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT '数据来源标记',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE KEY `unique_key_02` (`hcode`) USING BTREE COMMENT '唯一索引',
UNIQUE KEY `unique_key_01` (`from`,`to`,`relationship`) USING BTREE COMMENT '唯一索引',
KEY `updateTime` (`hupdatetime`) USING BTREE,
KEY `name` (`from`) USING BTREE,
KEY `hisvalid` (`hisvalid`) USING BTREE,
KEY `type` (`relationship`) USING BTREE,
KEY `check_point` (`node_check_point`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=742715632 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='ONgDB DAG TASK检查点记录表';

task-lock表结构设计

任务状态锁表负责对任务状态进行加锁操作,保证TASK运行时的唯一性。

任务模块解构

数据分块

控制加载到内存的数据量,避免占用过多堆内存保证图数据库可靠运行。

// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value

任务状态回滚

回滚到构建节点的任务状态,下一次构建节点关系时从回滚点开始操作【任务运行都从节点TASK开始】。

// 当操作失败的数据包数量大于0时,回滚node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint

任务状态同步

关系TASK-CHECK-POINT和节点TASK-CHECK-POINT状态同步。

// batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint

任务状态锁

【图数据构建任务状态锁】【保证某一时刻关系的DAG中TASK运行的唯一性】。

  • 获取锁
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
  • 释放锁
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

完整实现案例

同构图

节点TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV001 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
关系TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV001 {hcode:row.from}),(to:HORGGuaranteeV001 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

异构图

异构图的TASK执行流程:(FROM-TASK)->(TO-TASK)->(REL-TASK),其中(TO-TASK)只负责node_check_point的回滚,不负责node_check_point的更新操作和任务状态同步操作。

节点TASK
  • FROM节点TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT org_hcode AS hcode,org_name AS name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HBondOrg {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
  • TO节点TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode,name,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HEventBond {hcode:row.hcode}) SET n+=row', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量大于0时,回滚node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
关系TASK
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode AS `to`,org_hcode AS `from`,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HBondOrg {hcode:row.org_hcode}),(to:HEventBond {hcode:row.hcode}) MERGE (from)-[r:发行证券]->(to) SET r+=olab.reset.map(row,[\'from\',\'to\'])', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
CALL apoc.load.jdbcUpdate('jdbc:mysql://test.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

备注

通过对图数据任务的解构分析,可以设想CYPHER脚本的自动化生成完全是可以实现的。进一步设计实现一个自动化抽取图数据的系统是非常有价值的。从图数据schema设计,到数据模型推荐和图数据自动抽取,可以大大解放数据工程师的精力。

# 图数据库相关插件包下载
https://github.com/neo4j-contrib/neo4j-apoc-procedures
https://github.com/ongdb-contrib/ongdb-lab-apoc
  • 相关文章
    基于check-point实现图数据构建任务
    基于check-point机制的任务状态回滚和数据分块任务

这篇关于同构图与异构图CYPHER-TASK设计与TASK锁机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/744652

相关文章

linux报错INFO:task xxxxxx:634 blocked for more than 120 seconds.三种解决方式

《linux报错INFO:taskxxxxxx:634blockedformorethan120seconds.三种解决方式》文章描述了一个Linux最小系统运行时出现的“hung_ta... 目录1.问题描述2.解决办法2.1 缩小文件系统缓存大小2.2 修改系统IO调度策略2.3 取消120秒时间限制3

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

Java如何通过反射机制获取数据类对象的属性及方法

《Java如何通过反射机制获取数据类对象的属性及方法》文章介绍了如何使用Java反射机制获取类对象的所有属性及其对应的get、set方法,以及如何通过反射机制实现类对象的实例化,感兴趣的朋友跟随小编一... 目录一、通过反射机制获取类对象的所有属性以及相应的get、set方法1.遍历类对象的所有属性2.获取

Python中的可视化设计与UI界面实现

《Python中的可视化设计与UI界面实现》本文介绍了如何使用Python创建用户界面(UI),包括使用Tkinter、PyQt、Kivy等库进行基本窗口、动态图表和动画效果的实现,通过示例代码,展示... 目录从像素到界面:python带你玩转UI设计示例:使用Tkinter创建一个简单的窗口绘图魔法:用

MySQL中的锁和MVCC机制解读

《MySQL中的锁和MVCC机制解读》MySQL事务、锁和MVCC机制是确保数据库操作原子性、一致性和隔离性的关键,事务必须遵循ACID原则,锁的类型包括表级锁、行级锁和意向锁,MVCC通过非锁定读和... 目录mysql的锁和MVCC机制事务的概念与ACID特性锁的类型及其工作机制锁的粒度与性能影响多版本

Spring使用@Retryable实现自动重试机制

《Spring使用@Retryable实现自动重试机制》在微服务架构中,服务之间的调用可能会因为一些暂时性的错误而失败,例如网络波动、数据库连接超时或第三方服务不可用等,在本文中,我们将介绍如何在Sp... 目录引言1. 什么是 @Retryable?2. 如何在 Spring 中使用 @Retryable

C# Task Cancellation使用总结

《C#TaskCancellation使用总结》本文主要介绍了在使用CancellationTokenSource取消任务时的行为,以及如何使用Task的ContinueWith方法来处理任务的延... 目录C# Task Cancellation总结1、调用cancellationTokenSource.

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization