本文主要是介绍HDFS文件误删怎么办,一招教你恢复回来,再也不用担心删库跑路了,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
本文基于 Hadoop3.1.2版本讲解
HDFS 文件删除过程
下面是hdfs删除路径的方法,源码路径org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete()
/*** Remove the indicated file from namespace.* * @see ClientProtocol#delete(String, boolean) for detailed description and * description of exceptions*/boolean delete(String src, boolean recursive, boolean logRetryCache)throws IOException {waitForLoadingFSImage();BlocksMapUpdateInfo toRemovedBlocks = null;writeLock();boolean ret = false;try {checkOperation(OperationCategory.WRITE);checkNameNodeSafeMode("Cannot delete " + src);toRemovedBlocks = FSDirDeleteOp.delete( // @1this, src, recursive, logRetryCache);ret = toRemovedBlocks != null;} catch (AccessControlException e) {logAuditEvent(false, "delete", src);throw e;} finally {writeUnlock();}getEditLog().logSync();if (toRemovedBlocks != null) {removeBlocks(toRemovedBlocks); // @2}logAuditEvent(true, "delete", src);return ret;}
代码@1里面做了三件比较重要的事
①从 NameNode 维护的的目录树里面删除路径,这也是为什么执行删除操作之后就无法再通过hdfs dfs -ls xxx 或其它 api 方式再查看到路径的根本原因(拒绝需要被删除的文件的外部访问)
②找出被删路径关联的 block 信息,每个文件包含多个 block 块,分布在各个 DataNode,此时并未真正物理删除 DataNode 上物理磁盘上的block块
③记录删除日志到editlog(这一步也很重要,甚至是后面恢复数据的关键)
代码@2把将要删除的block信息添加到org.apache.hadoop.hdfs.server.blockmanagement.BlockManager 里面维护的 InvalidateBlocks 对象中,InvalidateBlocks 专门用于保存等待删除的数据块副本
以上步骤并未涉及真正的物理删除的操作
BlockManager
BlockManager 管理了hdfs block 的生命周期并且维护在 Hadoop 集群中的块相关的信息,包括快的上报、复制、删除、监控、标记等等一系列功能。
BlockManager 中有个方法 invalidateWorkForOneNode() 专门用于定时删除 InvalidateBlocks 中存储的待删除的快,此方法会在 NameNode 启动时在 BlockManager 的内部线程类ReplicationMonitor 定时轮循把要删除的块放入 DatanodeDescriptor 中的逻辑,方法的调用路径如下:
org.apache.hadoop.hdfs.server.namenode.NameNode#initialize(Configuration conf) org.apache.hadoop.hdfs.server.namenode.NameNode#startCommonServices(Configuration conf)org.apache.hadoop.hdfs.server.namenode.FSNamesystem#startCommonServices(Configuration conf, HAContext haContext)org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#activate(Configuration conf)org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.ReplicationMonitor#run()org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#computeDatanodeWork()org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#computeDatanodeWorkcomputeInvalidateWork(int nodesToProcess)org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#invalidateWorkForOneNode(DatanodeInfo dn)org.apache.hadoop.hdfs.server.blockmanagement.InvalidateBlocks#invalidateWork(final DatanodeDescriptor dn)
BlockManager 维护了 InvalidateBlocks,存放了待删除的 block,BlockManager 在 NameNode 启动时会单独启动一个线程,定时把要删除的块信息放入 InvalidateBlocks 中,每次会从InvalidateBlocks 队列中为每个 DataNode 取出 blockInvalidateLimit(由配置项dfs.block.invalidate.limit,默认1000)个块逻辑在 BlockManager.computeInvalidateWork() 方法里会把要删除的块信息放入 DatanodeDescriptor 中的 invalidateBlocks 数组,DatanodeManager 再通过 DataNode 与NameNode 心跳时,构建删除块的指令集,NameNode 再把指令下发给DataNode,心跳由 DatanodeProtocol 调用,方法的调用路径如下:
org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol#sendHeartbeat()org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#sendHeartbeat()org.apache.hadoop.hdfs.server.namenode.FSNamesystem#handleHeartbeat()org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager#handleHeartbeat()
DatanodeManager.handleHeartbeat() 中构建删除的指令给 DataNode,待 NameNode 发送的代码如下:
/** Handle heartbeat from datanodes. */public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,StorageReport[] reports, final String blockPoolId,long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes,VolumeFailureSummary volumeFailureSummary,@Nonnull SlowPeerReports slowPeers,@Nonnull SlowDiskReports slowDisks) throws IOException {final DatanodeDescriptor nodeinfo;try {nodeinfo = getDatanode(nodeReg);} catch (UnregisteredNodeException e) {return new DatanodeCommand[]{RegisterCommand.REGISTER};}// Check if this datanode should actually be shutdown instead.if (nodeinfo != null && nodeinfo.isDisallowed()) {setDatanodeDead(nodeinfo);throw new DisallowedDatanodeException(nodeinfo);}if (nodeinfo == null || !nodeinfo.isRegistered()) {return new DatanodeCommand[]{RegisterCommand.REGISTER};}heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);// If we are in safemode, do not send back any recovery / replication// requests. Don't even drain the existing queue of work.if (namesystem.isInSafeMode()) {return new DatanodeCommand[0];}// block recovery commandfinal BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,nodeinfo);if (brCommand != null) {return new DatanodeCommand[]{brCommand};}final List<DatanodeCommand> cmds = new ArrayList<>();// Allocate _approximately_ maxTransfers pending tasks to DataNode.// NN chooses pending tasks based on the ratio between the lengths of// replication and erasure-coded block queues.int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();int totalBlocks = totalReplicateBlocks + totalECBlocks;if (totalBlocks > 0) {int numReplicationTasks = (int) Math.ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);int numECTasks = (int) Math.ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);if (LOG.isDebugEnabled()) {LOG.debug("Pending replication tasks: " + numReplicationTasks+ " erasure-coded tasks: " + numECTasks);}// check pending replication tasksList<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(numReplicationTasks);if (pendingList != null && !pendingList.isEmpty()) {cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,pendingList));}// check pending erasure coding tasksList<BlockECReconstructionInfo> pendingECList = nodeinfo.getErasureCodeCommand(numECTasks);if (pendingECList != null && !pendingECList.isEmpty()) {cmds.add(new BlockECReconstructionCommand(DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));}}// check block invalidationBlock[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);if (blks != null) {cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,blks));}// cache commandsaddCacheCommands(blockPoolId, nodeinfo, cmds);// key update commandblockManager.addKeyUpdateCommand(cmds, nodeinfo);// check for balancer bandwidth updateif (nodeinfo.getBalancerBandwidth() > 0) {cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));// set back to 0 to indicate that datanode has been sent the new valuenodeinfo.setBalancerBandwidth(0);}if (slowPeerTracker != null) {final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();if (!slowPeersMap.isEmpty()) {if (LOG.isDebugEnabled()) {LOG.debug("DataNode " + nodeReg + " reported slow peers: " +slowPeersMap);}for (String slowNodeId : slowPeersMap.keySet()) {slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));}}}if (slowDiskTracker != null) {if (!slowDisks.getSlowDisks().isEmpty()) {if (LOG.isDebugEnabled()) {LOG.debug("DataNode " + nodeReg + " reported slow disks: " +slowDisks.getSlowDisks());}slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);}}if (!cmds.isEmpty()) {return cmds.toArray(new DatanodeCommand[cmds.size()]);}return new DatanodeCommand[0];}
定时轮循+limit 1000个块删除的特性决定了hdfs删除数据并不会立即真正的执行物理删除,并且一次删除的数量也有限,所以出现误删操作需要立即停止HDFS,虽然有的数据在轮循中已被删除,所以事发后停止HDFS集群越早,被删的数据越少,损失越小!
EditLog
EditLog记录了hdfs操作的每一条日志记录,包括当然包括删除,我们所熟知的文件操作类型只有增、删、改,但是在 HDFS 的领域里,远远不止这些操作,我们看看 EditLog 操作类型的枚举类org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.hadoop.hdfs.server.namenode;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;/*** Op codes for edits file*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public enum FSEditLogOpCodes {// last op code in fileOP_ADD ((byte) 0, AddOp.class),// deprecated operationOP_RENAME_OLD ((byte) 1, RenameOldOp.class),OP_DELETE ((byte) 2, DeleteOp.class),OP_MKDIR ((byte) 3, MkdirOp.class),OP_SET_REPLICATION ((byte) 4, SetReplicationOp.class),@Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete@Deprecated OP_DATANODE_REMOVE((byte) 6), // obsoleteOP_SET_PERMISSIONS ((byte) 7, SetPermissionsOp.class),OP_SET_OWNER ((byte) 8, SetOwnerOp.class),OP_CLOSE ((byte) 9, CloseOp.class),OP_SET_GENSTAMP_V1 ((byte) 10, SetGenstampV1Op.class),OP_SET_NS_QUOTA ((byte) 11, SetNSQuotaOp.class), // obsoleteOP_CLEAR_NS_QUOTA ((byte) 12, ClearNSQuotaOp.class), // obsoleteOP_TIMES ((byte) 13, TimesOp.class), // set atime, mtimeOP_SET_QUOTA ((byte) 14, SetQuotaOp.class),// filecontext renameOP_RENAME ((byte) 15, RenameOp.class),// concat filesOP_CONCAT_DELETE ((byte) 16, ConcatDeleteOp.class),OP_SYMLINK ((byte) 17, SymlinkOp.class),OP_GET_DELEGATION_TOKEN ((byte) 18, GetDelegationTokenOp.class),OP_RENEW_DELEGATION_TOKEN ((byte) 19, RenewDelegationTokenOp.class),OP_CANCEL_DELEGATION_TOKEN ((byte) 20, CancelDelegationTokenOp.class),OP_UPDATE_MASTER_KEY ((byte) 21, UpdateMasterKeyOp.class),OP_REASSIGN_LEASE ((byte) 22, ReassignLeaseOp.class),OP_END_LOG_SEGMENT ((byte) 23, EndLogSegmentOp.class),OP_START_LOG_SEGMENT ((byte) 24, StartLogSegmentOp.class),OP_UPDATE_BLOCKS ((byte) 25, UpdateBlocksOp.class),OP_CREATE_SNAPSHOT ((byte) 26, CreateSnapshotOp.class),OP_DELETE_SNAPSHOT ((byte) 27, DeleteSnapshotOp.class),OP_RENAME_SNAPSHOT ((byte) 28, RenameSnapshotOp.class),OP_ALLOW_SNAPSHOT ((byte) 29, AllowSnapshotOp.class),OP_DISALLOW_SNAPSHOT ((byte) 30, DisallowSnapshotOp.class),OP_SET_GENSTAMP_V2 ((byte) 31, SetGenstampV2Op.class),OP_ALLOCATE_BLOCK_ID ((byte) 32, AllocateBlockIdOp.class),OP_ADD_BLOCK ((byte) 33, AddBlockOp.class),OP_ADD_CACHE_DIRECTIVE ((byte) 34, AddCacheDirectiveInfoOp.class),OP_REMOVE_CACHE_DIRECTIVE ((byte) 35, RemoveCacheDirectiveInfoOp.class),OP_ADD_CACHE_POOL ((byte) 36, AddCachePoolOp.class),OP_MODIFY_CACHE_POOL ((byte) 37, ModifyCachePoolOp.class),OP_REMOVE_CACHE_POOL ((byte) 38, RemoveCachePoolOp.class),OP_MODIFY_CACHE_DIRECTIVE ((byte) 39, ModifyCacheDirectiveInfoOp.class),OP_SET_ACL ((byte) 40, SetAclOp.class),OP_ROLLING_UPGRADE_START ((byte) 41, RollingUpgradeStartOp.class),OP_ROLLING_UPGRADE_FINALIZE ((byte) 42, RollingUpgradeFinalizeOp.class),OP_SET_XATTR ((byte) 43, SetXAttrOp.class),OP_REMOVE_XATTR ((byte) 44, RemoveXAttrOp.class),OP_SET_STORAGE_POLICY ((byte) 45, SetStoragePolicyOp.class),OP_TRUNCATE ((byte) 46, TruncateOp.class),OP_APPEND ((byte) 47, AppendOp.class),OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class),OP_ADD_ERASURE_CODING_POLICY ((byte) 49, AddErasureCodingPolicyOp.class),OP_ENABLE_ERASURE_CODING_POLICY((byte) 50, EnableErasureCodingPolicyOp.class),OP_DISABLE_ERASURE_CODING_POLICY((byte) 51,DisableErasureCodingPolicyOp.class),OP_REMOVE_ERASURE_CODING_POLICY((byte) 52, RemoveErasureCodingPolicyOp.class),// Note that the current range of the valid OP code is 0~127OP_INVALID ((byte) -1);private final byte opCode;private final Class<? extends FSEditLogOp> opClass;/*** Constructor** @param opCode byte value of constructed enum*/FSEditLogOpCodes(byte opCode) {this(opCode, null);}FSEditLogOpCodes(byte opCode, Class<? extends FSEditLogOp> opClass) {this.opCode = opCode;this.opClass = opClass;}/*** return the byte value of the enum** @return the byte value of the enum*/public byte getOpCode() {return opCode;}public Class<? extends FSEditLogOp> getOpClass() {return opClass;}private static final FSEditLogOpCodes[] VALUES;static {byte max = 0;for (FSEditLogOpCodes code : FSEditLogOpCodes.values()) {if (code.getOpCode() > max) {max = code.getOpCode();}}VALUES = new FSEditLogOpCodes[max + 1];for (FSEditLogOpCodes code : FSEditLogOpCodes.values()) {if (code.getOpCode() >= 0) {VALUES[code.getOpCode()] = code;}}}/*** Converts byte to FSEditLogOpCodes enum value** @param opCode get enum for this opCode* @return enum with byte value of opCode*/public static FSEditLogOpCodes fromByte(byte opCode) {if (opCode >= 0 && opCode < VALUES.length) {return VALUES[opCode];}return opCode == -1 ? OP_INVALID : null;}
}
总计54种操作类型!打破了人们印象中文件只有增删改读的几种操作。在hadoop的配置参数dfs.namenode.name.dir
可以找到路径
这里EditLog文件是序列化后的二进制文件不能直接查看,hdfs自带了解析的命令,可以解析成xml明文格式
hdfs oev -i edits_0000000000035854978-0000000000035906741 -o edits.xml
对 hdfs 的每一个操作都会记录一串 RECORD,RECORD 里面不同的操作包含的字段属性也不同,但是所有操作都具备的属性是 OPCODE,对应上面的枚举类org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes
中的操作
hdfs元数据的加载
hdfs 启动时,NameNode 会加载 Fsimage,Fsimage 记录了 hdfs 现有的全量的路径信息,启动过程中仅仅加载 Fsimage?这句话不完全正确!启动的同时,还会加载未被合并成 fsimage 的EditLog。关于 fsimage 具体细节这里不展开。举个栗子:
假设Hadoop 3分钟checkpoint一次生成Fsimage文件,EditLog 1分钟生成一个文件,下面是依次生成的文件:
fsimage_1
editlog_1
editlog_2
editlog_3
fsimage_2
editlog_4
editlog_5
当NameNode启动时,会加载后缀时间戳最大的那个fsimage文件和它后面产生的editlog文件,也就是会加载fsimage_2、editlog_4、editlog_5进NameNode内存。
恢复方法
假设我们执行 hdfs dfs -rmr xxx 命令的操作记录在了 editlog_5 上面,那么,重启 NameNode 后,我们查看hdfs无法再查看到xxx路径,如果我们把fsimage_2删掉,NameNode则会加载fsimage_1、editlog_1、editlog_2,此时的元数据里面xxx还未被删除,如果此时DataNode未物理删除block,则数据可以恢复,但是editlog_4、editlog_5对应的hdfs操作会丢失。有没有更好的方法呢?
方案一:
删掉 fsimage_2,从上一次 checkpoint 的地方也就是 fsimage_1 恢复,我们集群的实际配置,是一个小时生成一次 fsimage 文件,也就是说,这种恢复方案会导致近一小时 hdfs 新增的文件全部丢失,这一个小时不知道发生了多少事情,可想而知的后果是恢复之后一堆报错,显然不是最好的方案
方案二:
修改editlog_5,把删除xxx
那条操作改成其它安全的操作类型,这样重启NameNode后,又可以看到这个路径。
步骤:
-
关闭HDFS集群
-
解析editlog
找到删除操作时间点范围内所属的editlog文件,解析
hdfs oev -i edits_0000000000000000336-0000000000000000414 -o edits.xml
查看editlog.xml,执行删除操作的日志已经记录在里面了
- 替换删除操作
把OP_DELETE操作替换成比较安全的操作,例如
<RECORD><OPCODE>OP_DELETE</OPCODE><DATA><TXID>374</TXID><LENGTH>0</LENGTH><PATH>/hbase/oldWALs/test%2C16020%2C1630456996531.1630464212306</PATH><TIMESTAMP>1630724840046</TIMESTAMP><RPC_CLIENTID>d1e2ae59-ba0c-4385-87d4-4da3d9ec019b</RPC_CLIENTID><RPC_CALLID>1049</RPC_CALLID></DATA></RECORD>
注意:只能改变OPCODE,其他不能修改!!!
-
反解析成editlog
# 反解析更改后的xml文件成editlog
hdfs oev -i editlog.xml -o edits_0000000000000000336-0000000000000000414.tmp -p binary
# 重命名掉之前的editlog
mv edits_0000000000000000336-0000000000000000414 edits_0000000000000000336-0000000000000000414.bak
# 替换反解析后的editlog
mv edits_0000000000000000336-0000000000000000414.tmp edits_0000000000000000336-0000000000000000414
- 使用 scp 将修改后的 editlog 同步到其他 JournalNode 节点
- 重启 HDFS,完成文件恢复
这篇关于HDFS文件误删怎么办,一招教你恢复回来,再也不用担心删库跑路了的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!