【Python】FANUC机器人OPC UA通信并记录数据

2024-04-11 18:04

本文主要是介绍【Python】FANUC机器人OPC UA通信并记录数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):client = Client(url)client.connect()return client
  • 获取根节点和对象节点
def get_root_node(client: Client):return client.get_root_node()
def get_objects_node(client: Client):return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variables

5. 备份旧CSV文件

def backup_csv_file(filename):if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):"""创建客户端实例并连接到服务端"""client = Client(url)client.connect()return clientdef get_root_node(client: Client):"""获取服务器命名空间中的根节点"""return client.get_root_node()def get_objects_node(client: Client):"""获取服务器的对象节点"""return client.get_objects_node()def get_variables(node: Node, path=""):"""遍历所有子节点并返回变量节点的路径和数值"""variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variablesdef backup_csv_file(filename):"""如果CSV文件已存在则备份"""if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

这篇关于【Python】FANUC机器人OPC UA通信并记录数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/894798

相关文章

Python中局部变量和全局变量举例详解

《Python中局部变量和全局变量举例详解》:本文主要介绍如何通过一个简单的Python代码示例来解释命名空间和作用域的概念,它详细说明了内置名称、全局名称、局部名称以及它们之间的查找顺序,文中通... 目录引入例子拆解源码运行结果如下图代码解析 python3命名空间和作用域命名空间命名空间查找顺序命名空

Python如何将大TXT文件分割成4KB小文件

《Python如何将大TXT文件分割成4KB小文件》处理大文本文件是程序员经常遇到的挑战,特别是当我们需要把一个几百MB甚至几个GB的TXT文件分割成小块时,下面我们来聊聊如何用Python自动完成这... 目录为什么需要分割TXT文件基础版:按行分割进阶版:精确控制文件大小完美解决方案:支持UTF-8编码

基于Python打造一个全能文本处理工具

《基于Python打造一个全能文本处理工具》:本文主要介绍一个基于Python+Tkinter开发的全功能本地化文本处理工具,它不仅具备基础的格式转换功能,更集成了中文特色处理等实用功能,有需要的... 目录1. 概述:当文本处理遇上python图形界面2. 功能全景图:六大核心模块解析3.运行效果4. 相

SpringValidation数据校验之约束注解与分组校验方式

《SpringValidation数据校验之约束注解与分组校验方式》本文将深入探讨SpringValidation的核心功能,帮助开发者掌握约束注解的使用技巧和分组校验的高级应用,从而构建更加健壮和可... 目录引言一、Spring Validation基础架构1.1 jsR-380标准与Spring整合1

Python中的魔术方法__new__详解

《Python中的魔术方法__new__详解》:本文主要介绍Python中的魔术方法__new__的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、核心意义与机制1.1 构造过程原理1.2 与 __init__ 对比二、核心功能解析2.1 核心能力2.2

Python虚拟环境终极(含PyCharm的使用教程)

《Python虚拟环境终极(含PyCharm的使用教程)》:本文主要介绍Python虚拟环境终极(含PyCharm的使用教程),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录一、为什么需要虚拟环境?二、虚拟环境创建方式对比三、命令行创建虚拟环境(venv)3.1 基础命令3

Python Transformer 库安装配置及使用方法

《PythonTransformer库安装配置及使用方法》HuggingFaceTransformers是自然语言处理(NLP)领域最流行的开源库之一,支持基于Transformer架构的预训练模... 目录python 中的 Transformer 库及使用方法一、库的概述二、安装与配置三、基础使用:Pi

Python 中的 with open文件操作的最佳实践

《Python中的withopen文件操作的最佳实践》在Python中,withopen()提供了一个简洁而安全的方式来处理文件操作,它不仅能确保文件在操作完成后自动关闭,还能处理文件操作中的异... 目录什么是 with open()?为什么使用 with open()?使用 with open() 进行

Python中使用正则表达式精准匹配IP地址的案例

《Python中使用正则表达式精准匹配IP地址的案例》Python的正则表达式(re模块)是完成这个任务的利器,但你知道怎么写才能准确匹配各种合法的IP地址吗,今天我们就来详细探讨这个问题,感兴趣的朋... 目录为什么需要IP正则表达式?IP地址的基本结构基础正则表达式写法精确匹配0-255的数字验证IP地

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2