Python 全栈系列230 轻全局函数服务 GFGoLite

2024-03-03 20:20

本文主要是介绍Python 全栈系列230 轻全局函数服务 GFGoLite,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

为了将GlobalFunc推向正常的应用轨道,构造一个功能简单的服务。

内容

1 工作模式

Server模式:以API服务方式执行

Worker模式: 以Worker方式执行。通过left/right文件夹和rsync方式执行任务并写结果。

2 构造方法

重载和执行;从标准镜像环境启动服务,通过数据库更新函数,然后保存为新的镜像。

2.1 step0 选择基础镜像建立容器gfgo_lite_build

docker run -it --name=gfgo_lite_build myregistry.domain.com:24052/worker.andy.globalfunc:v101 bash
退出,然后重启
docker restart gfgo_lite_build

由于容器设置为退出不删除,所以可以分步进行操作,每次执行

docker exec -it gfgo_lite_build bash

2.2 step1 从数据库获取数据并生成py

用最基础的方式构造生成方法,先获取三个基础文件

  • 1 RedisOrMongo_v100.py :从Redis或者Mongo中获取数据
  • 2 WMongo_V9000_012.py : 提供Mongo的操作, 是RedisOrMongo的依赖对象
  • 3 worker_gen_file.py: 只提供两个功能,将文本生成为文件,以及根绝py生成包的init文件

GFGoLite将会发布为服务,要可以通过接口实现文件的增/删,所以要将这部分功能进行接口封装。
为了不让项目入口的文件看起来太乱,建立init_funcs文件夹,把这些文件放进去。
目前的源文件主要来自本地文件,先进行上传

gfbase = GFBase(project_path = git_project_path,redis_agent_host = redis_agent_host,tier1 = tier1 )
# 2 扫描所有文件的信息
scan_dict = gfbase._get_scan_files()
# 选择某个包的文件全部上传
some_pack_list = [x for x in scan_dict.keys() if x .startswith('Base.')]
for some_file in some_pack_list:gfbase._save_a_file_rom(some_file)

第一次建立基础的程序包,在入口位置(/workspace/)执行程序

from init_funcs import create_file,generate_init_py,RedisOrMongo, Naiveimport os 
import json
# 声明空间# 在容器中启动
redis_cfg = Naive()
redis_cfg.redis_agent_host = 'http://172.17.0.1:24021/'
redis_cfg.redis_connection_hash = None# 声明当前要创建的程序文件夹:默认为funcs
target_folder = 'GlobalFunc'
tier1 = 'sp_GlobalFunc'
var_space_name = 'Base'
# 分支,一般默认为master
branch_name = 'master' 
tier2 = '_'.join([var_space_name, branch_name])
the_space_name = '.'.join([tier1,tier2])target_folder = target_folder + '/' + var_space_name
os.makedirs(target_folder, exist_ok=True)rom = RedisOrMongo(the_space_name, redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')# 这个一般根据需要,或者代码中得来 --- 需要的列表项
func_list = [ 'from_pickle','to_pickle']
for some_name in func_list:# 获取 meta,data : data就是代码字符the_data = rom.getx(some_name)filename = the_data['meta']['name']filedata = the_data['data']create_file(target_folder, filename, filedata)# 生成初始化文件
generate_init_py(target_folder)

此时文件结构为

root@a1f87a061e62:/workspace# tree /workspace/
/workspace/
├── GFGo.py
├── GlobalFunc
│   └── Base
│       ├── from_pickle.py
│       ├── __init__.py
│       └── to_pickle.py
├── init_funcs
│   ├── __init__.py
│   ├── __pycache__
│   │   ├── __init__.cpython-38.pyc
│   │   ├── RedisOrMongo_v100.cpython-38.pyc
│   │   ├── WMongo_V9000_012.cpython-38.pyc
│   │   └── worker_gen_file.cpython-38.pyc
│   ├── RedisOrMongo_v100.py
│   ├── WMongo_V9000_012.py
│   └── worker_gen_file.py
├── m7.24065.pkl
├── mymeta.pkl
└── __pycache__└── GFGo.cpython-38.pyc

再给GlobalFunc添加一个包的初始化文件(引入Base)

__init__.py

from . import Base

2.3 step2 建立server_funcs.py

接下来建立server_funcs.py。在这个文件中,保留服务所依赖的基础函数和配置,非常少。

# 保留,用于提供server.py的引入
# 【引入时处于和GlobalFunc同级别的位置】import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):def default(self, obj):if isinstance(obj, np.integer):return int(obj)elif isinstance(obj, np.floating):return float(obj)elif isinstance(obj, np.ndarray):return obj.tolist()if isinstance(obj, datetime):return obj.__str__()if isinstance(obj, dd.timedelta):return obj.__str__()else:return super(MyEncoder, self).default(obj)# 【创建tornado所需问文件夹】
import os 
# 如果路径不存在则创建
def create_folder_if_notexist(somepath):if not os.path.exists(somepath):os.makedirs(somepath)return True
m_static = os.path.join(os.getcwd(),'m_static')
m_template = os.path.join(os.getcwd(),'m_template')create_folder_if_notexist(m_static)
create_folder_if_notexist(m_template)settings = {
'static_path':m_static,
'template_path':m_template
}

还有一些函数,需要依赖较大的离线文件,因此这部分函数也放在这个文件里,在服务启动时一次性导入。

# 【创建时间轴】
from GlobalFunc import Base base_year=1970
next_years=100
time_zone=8time_axis_name = 'base_year{0}_next{1}_tz{2}'.format(base_year,next_years,time_zone)
if Base.is_file_exists(filename ='%s.pkl' % time_axis_name):print('时间轴文件 %s 已存在' % time_axis_name)
else:yymon_dict = Base.gen_time_axis(base_year=base_year, next_years=next_years,time_zone=time_zone)Base.to_pickle(data = yymon_dict, file_name=time_axis_name)yymon_dict = Base.from_pickle(time_axis_name)
at2 = Base.ATimer2(yymon_dict = yymon_dict)
# 样例字符
some_dt_str = '2024-01-31 11:11:11'
# 字符转数值
at2.char2num(some_dt_str)
# 数值转字符
at2.num2char(the_ts=1706670671)
# 计算字符的时间差:可同比与datetime的delta(3.37 微秒)
at2.c_period_btw('2024-01-31 11:11:11', '2024-02-01 11:11:11')时间轴文件 base_year1970_next100_tz8 已存在
Out[1]: 86400

这部分知识为了引入时间轴对象,通过偏移/查找的方式快速进行时间的转换和偏移计算。

3 服务

使用tornado建立服务,可以使用web进行简单的应用。

3.1 设计

GlobalFunc本身有两种类型的操作:

  • 1 开发。
  • 2 应用。

目前使用GFBase对象进行文件的操作,使用GFGo对象进行调用操作。

因为有些细节还没有想清楚,但是又不能停下来,所以才会想创建GFGoLite:急用先行,边做边完善。到积累成熟时再创建完整版的GlobalFunc服务。

第一步,我想仅仅使用GFGo的内容。这种模式下,在用户侧【client use】使用API调用已经存在于数据库中的函数;使用代码编辑的方式在本地进行开发【server produce】,然后将文件覆盖到数据库。

3.2 GFGo

GFGo.py提供了位置参数调用以及关键字调用两种方式,需要注意的是,未来将规范所有的函数均采用关键字参数。

另外对象还提供了一个包的重载方法,主要对应在运行时函数包可能发生增改。

import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like 
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)

3.3 server.py

在这里会导入所有必须的函数、对象提供服务。

目前假设,该服务唯一会导致的数据变化是函数的增改,所以容器的提交仅仅意味着函数变多了。

from server_funcs import * 
from GFGo import GFGo
import GlobalFunc as funcs import tornado.httpserver  # http服务器
import tornado.ioloop  # ?
import tornado.options  # 指定服务端口和路径解析
import tornado.web  # web模块
from tornado.options import define, options
import os.path  # 获取和生成template文件路径# 增加GF的创建实例
gfgo = GFGo()app_list = []IndexHandler_path = r'/'
class IndexHandler(tornado.web.RequestHandler):def get(self):self.write('【GET】This is Website for Internal API System')self.write('Please Refer to API document')print('Get got a request test')# print(buffer_dict)def post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)msg_dict = {}msg_dict['info'] = '【POST】This is Website for Internal API System'msg_dict['input_dict'] = some_dictself.write(json.dumps(msg_dict))print('Post got a request test')
IndexHandler_tuple = (IndexHandler_path,IndexHandler)
app_list.append(IndexHandler_tuple)GFGoHandler_path = r'/gfgo/'
class GFGoHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)kwargs = some_dict['kwargs']pack_func = some_dict['pack_func']res = gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs)self.write(json.dumps(res))
gfgo_tuple = (GFGoHandler_path,GFGoHandler)
app_list.append(gfgo_tuple)# 时间轴服务作为基础的服务
'''
1 start_dt 开始时间
2 end_dt 结束时间(选填),默认调用当前
3 time_unit 时间单位,默认秒
4 bias_hours 偏移时间(选填),默认采取东八区
'''# 单元素调用
TimeAxisHandler_path = r'/time_gap/'
class TimeAxisHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)start_dt = some_dict['start_dt']bias_hours = some_dict.get('bias_hours') or 8end_dt = some_dict.get('end_dt') or Base.get_time_str1(bias_hours = bias_hours)time_unit = some_dict['time_unit'] or 'seconds'time_gap_seconds = at2.c_period_btw(start_dt,end_dt)if time_unit.lower() =='seconds':res = time_gap_secondselif time_unit.lower() =='minutes':res = time_gap_seconds // 60elif time_unit.lower() =='hours':res = time_gap_seconds // 3600elif time_unit.lower() =='days':res = time_gap_seconds //86400else:res = Noneself.write(json.dumps(res))
timeaxis_tuple = (TimeAxisHandler_path,TimeAxisHandler)
app_list.append(timeaxis_tuple)# Next: 时间间隔的列表处理,以及
if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 单核# 多核打开注释# 0 是全部核# http_server.start(num_processes=10) # tornado将按照cpu核数来fork进程# ---启动print('Server Started')tornado.ioloop.IOLoop.instance().start()root@a1f87a061e62:/workspace# python3 server.py
时间轴文件 base_year1970_next100_tz8 已存在
Server Started
[I 240303 09:05:47 web:2239] 200 POST /gfgo/ (127.0.0.1) 0.54ms
[I 240303 09:05:57 web:2239] 200 POST /time_gap/ (127.0.0.1) 0.44ms
[I 240303 09:06:13 web:2239] 200 POST /time_gap/ (127.0.0.1) 0.57ms

功能测试

import requests as req # 测试1:调用Base包的函数
kwargs = {'ts':None, 'bias_hours':8}
pack_func = 'Base.get_time_str1'some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_funcres = req.post('http://127.0.0.1:8000/gfgo/', json = some_dict).json()
In [8]: res
Out[8]: '2024-03-03 16:30:53'# 测试2:调用基础时间轴函数
some_dict = {}
some_dict['start_dt'] = '2024-03-03 00:00:00'
some_dict['time_unit'] ='hours'
res = req.post('http://127.0.0.1:8000/time_gap/', json = some_dict).json()In [14]: res
Out[14]: 17

3.4 部署

3.4.1 保存为镜像

将容器进行提交就可以了

┌─root@m7:~
└─ $ docker commit gfgo_lite_build myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100
sha256:d2791b11a8fd3b5d307c7ce7ccc0622b8f42a84970a0c5f877ff88da54caeb8a
┌─root@m7:~
└─ $ docker push myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100
The push refers to repository [myregistry.domain.com:24052/worker.andy.gfgo_lite_24090]
e83f4383e7b7: Pushed
f1db272a1809: Mounted from worker.andy.globalfunc
...
805802706667: Mounted from worker.andy.globalfunc
v100: digest: sha256:9bd2d0a8674a3b3acf843aed6ab5b4372190dcae55edc6ef7a24181295f83549 size: 6221
3.4.2 启动服务

容器无需挂载任何数据卷启动

docker run -d \--restart=always \--name=gfgo_lite_24090 \-v /etc/localtime:/etc/localtime  \-v /etc/timezone:/etc/timezone\-v /etc/hostname:/etc/hostname\-e "LANG=C.UTF-8" \-w /workspace\-p 24090:8000\myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100 \sh -c "python3 server.py"

在挂载了时区后,时间偏移参数就要置为0了

import requests as req # 测试1:调用Base包的函数
kwargs = {'ts':None, 'bias_hours':0}
pack_func = 'Base.get_time_str1'some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_funcres = req.post('http://127.0.0.1:24090/gfgo/', json = some_dict).json()
print(res)# 测试2:调用基础时间轴函数
some_dict = {}
some_dict['start_dt'] = '2024-03-03 00:00:00'
some_dict['time_unit'] ='hours'
some_dict['bias_hours'] =-8
res = req.post('http://127.0.0.1:24090/time_gap/', json = some_dict).json()
print(res)

4 后续

一方面是函数的继续增多,调用(例如重载)。

另一方面是函数的增改。

很多应用需要分为单个和批次调用,例如对时间的计算可以为单个执行,也可以输入一个列表。

这篇关于Python 全栈系列230 轻全局函数服务 GFGoLite的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/770796

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

hdu1171(母函数或多重背包)

题意:把物品分成两份,使得价值最接近 可以用背包,或者是母函数来解,母函数(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v) 其中指数为价值,每一项的数目为(该物品数+1)个 代码如下: #include<iostream>#include<algorithm>

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。