本文主要是介绍Python 全栈系列230 轻全局函数服务 GFGoLite,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
说明
为了将GlobalFunc推向正常的应用轨道,构造一个功能简单的服务。
内容
1 工作模式
Server模式:以API服务方式执行
Worker模式: 以Worker方式执行。通过left/right文件夹和rsync方式执行任务并写结果。
2 构造方法
重载和执行;从标准镜像环境启动服务,通过数据库更新函数,然后保存为新的镜像。
2.1 step0 选择基础镜像建立容器gfgo_lite_build
docker run -it --name=gfgo_lite_build myregistry.domain.com:24052/worker.andy.globalfunc:v101 bash
退出,然后重启
docker restart gfgo_lite_build
由于容器设置为退出不删除,所以可以分步进行操作,每次执行
docker exec -it gfgo_lite_build bash
2.2 step1 从数据库获取数据并生成py
用最基础的方式构造生成方法,先获取三个基础文件
- 1 RedisOrMongo_v100.py :从Redis或者Mongo中获取数据
- 2 WMongo_V9000_012.py : 提供Mongo的操作, 是RedisOrMongo的依赖对象
- 3 worker_gen_file.py: 只提供两个功能,将文本生成为文件,以及根绝py生成包的init文件
GFGoLite将会发布为服务,要可以通过接口实现文件的增/删,所以要将这部分功能进行接口封装。
为了不让项目入口的文件看起来太乱,建立init_funcs文件夹,把这些文件放进去。
目前的源文件主要来自本地文件,先进行上传
gfbase = GFBase(project_path = git_project_path,redis_agent_host = redis_agent_host,tier1 = tier1 )
# 2 扫描所有文件的信息
scan_dict = gfbase._get_scan_files()
# 选择某个包的文件全部上传
some_pack_list = [x for x in scan_dict.keys() if x .startswith('Base.')]
for some_file in some_pack_list:gfbase._save_a_file_rom(some_file)
第一次建立基础的程序包,在入口位置(/workspace/
)执行程序
from init_funcs import create_file,generate_init_py,RedisOrMongo, Naiveimport os
import json
# 声明空间# 在容器中启动
redis_cfg = Naive()
redis_cfg.redis_agent_host = 'http://172.17.0.1:24021/'
redis_cfg.redis_connection_hash = None# 声明当前要创建的程序文件夹:默认为funcs
target_folder = 'GlobalFunc'
tier1 = 'sp_GlobalFunc'
var_space_name = 'Base'
# 分支,一般默认为master
branch_name = 'master'
tier2 = '_'.join([var_space_name, branch_name])
the_space_name = '.'.join([tier1,tier2])target_folder = target_folder + '/' + var_space_name
os.makedirs(target_folder, exist_ok=True)rom = RedisOrMongo(the_space_name, redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')# 这个一般根据需要,或者代码中得来 --- 需要的列表项
func_list = [ 'from_pickle','to_pickle']
for some_name in func_list:# 获取 meta,data : data就是代码字符the_data = rom.getx(some_name)filename = the_data['meta']['name']filedata = the_data['data']create_file(target_folder, filename, filedata)# 生成初始化文件
generate_init_py(target_folder)
此时文件结构为
root@a1f87a061e62:/workspace# tree /workspace/
/workspace/
├── GFGo.py
├── GlobalFunc
│ └── Base
│ ├── from_pickle.py
│ ├── __init__.py
│ └── to_pickle.py
├── init_funcs
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-38.pyc
│ │ ├── RedisOrMongo_v100.cpython-38.pyc
│ │ ├── WMongo_V9000_012.cpython-38.pyc
│ │ └── worker_gen_file.cpython-38.pyc
│ ├── RedisOrMongo_v100.py
│ ├── WMongo_V9000_012.py
│ └── worker_gen_file.py
├── m7.24065.pkl
├── mymeta.pkl
└── __pycache__└── GFGo.cpython-38.pyc
再给GlobalFunc添加一个包的初始化文件(引入Base)
__init__.py
from . import Base
2.3 step2 建立server_funcs.py
接下来建立server_funcs.py
。在这个文件中,保留服务所依赖的基础函数和配置,非常少。
# 保留,用于提供server.py的引入
# 【引入时处于和GlobalFunc同级别的位置】import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):def default(self, obj):if isinstance(obj, np.integer):return int(obj)elif isinstance(obj, np.floating):return float(obj)elif isinstance(obj, np.ndarray):return obj.tolist()if isinstance(obj, datetime):return obj.__str__()if isinstance(obj, dd.timedelta):return obj.__str__()else:return super(MyEncoder, self).default(obj)# 【创建tornado所需问文件夹】
import os
# 如果路径不存在则创建
def create_folder_if_notexist(somepath):if not os.path.exists(somepath):os.makedirs(somepath)return True
m_static = os.path.join(os.getcwd(),'m_static')
m_template = os.path.join(os.getcwd(),'m_template')create_folder_if_notexist(m_static)
create_folder_if_notexist(m_template)settings = {
'static_path':m_static,
'template_path':m_template
}
还有一些函数,需要依赖较大的离线文件,因此这部分函数也放在这个文件里,在服务启动时一次性导入。
# 【创建时间轴】
from GlobalFunc import Base base_year=1970
next_years=100
time_zone=8time_axis_name = 'base_year{0}_next{1}_tz{2}'.format(base_year,next_years,time_zone)
if Base.is_file_exists(filename ='%s.pkl' % time_axis_name):print('时间轴文件 %s 已存在' % time_axis_name)
else:yymon_dict = Base.gen_time_axis(base_year=base_year, next_years=next_years,time_zone=time_zone)Base.to_pickle(data = yymon_dict, file_name=time_axis_name)yymon_dict = Base.from_pickle(time_axis_name)
at2 = Base.ATimer2(yymon_dict = yymon_dict)
# 样例字符
some_dt_str = '2024-01-31 11:11:11'
# 字符转数值
at2.char2num(some_dt_str)
# 数值转字符
at2.num2char(the_ts=1706670671)
# 计算字符的时间差:可同比与datetime的delta(3.37 微秒)
at2.c_period_btw('2024-01-31 11:11:11', '2024-02-01 11:11:11')时间轴文件 base_year1970_next100_tz8 已存在
Out[1]: 86400
这部分知识为了引入时间轴对象,通过偏移/查找的方式快速进行时间的转换和偏移计算。
3 服务
使用tornado建立服务,可以使用web进行简单的应用。
3.1 设计
GlobalFunc本身有两种类型的操作:
- 1 开发。
- 2 应用。
目前使用GFBase对象进行文件的操作,使用GFGo对象进行调用操作。
因为有些细节还没有想清楚,但是又不能停下来,所以才会想创建GFGoLite:急用先行,边做边完善。到积累成熟时再创建完整版的GlobalFunc服务。
第一步,我想仅仅使用GFGo的内容。这种模式下,在用户侧【client use】使用API调用已经存在于数据库中的函数;使用代码编辑的方式在本地进行开发【server produce】,然后将文件覆盖到数据库。
3.2 GFGo
GFGo.py
提供了位置参数调用以及关键字调用两种方式,需要注意的是,未来将规范所有的函数均采用关键字参数。
另外对象还提供了一个包的重载方法,主要对应在运行时函数包可能发生增改。
import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)
3.3 server.py
在这里会导入所有必须的函数、对象提供服务。
目前假设,该服务唯一会导致的数据变化是函数的增改,所以容器的提交仅仅意味着函数变多了。
from server_funcs import *
from GFGo import GFGo
import GlobalFunc as funcs import tornado.httpserver # http服务器
import tornado.ioloop # ?
import tornado.options # 指定服务端口和路径解析
import tornado.web # web模块
from tornado.options import define, options
import os.path # 获取和生成template文件路径# 增加GF的创建实例
gfgo = GFGo()app_list = []IndexHandler_path = r'/'
class IndexHandler(tornado.web.RequestHandler):def get(self):self.write('【GET】This is Website for Internal API System')self.write('Please Refer to API document')print('Get got a request test')# print(buffer_dict)def post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)msg_dict = {}msg_dict['info'] = '【POST】This is Website for Internal API System'msg_dict['input_dict'] = some_dictself.write(json.dumps(msg_dict))print('Post got a request test')
IndexHandler_tuple = (IndexHandler_path,IndexHandler)
app_list.append(IndexHandler_tuple)GFGoHandler_path = r'/gfgo/'
class GFGoHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)kwargs = some_dict['kwargs']pack_func = some_dict['pack_func']res = gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs)self.write(json.dumps(res))
gfgo_tuple = (GFGoHandler_path,GFGoHandler)
app_list.append(gfgo_tuple)# 时间轴服务作为基础的服务
'''
1 start_dt 开始时间
2 end_dt 结束时间(选填),默认调用当前
3 time_unit 时间单位,默认秒
4 bias_hours 偏移时间(选填),默认采取东八区
'''# 单元素调用
TimeAxisHandler_path = r'/time_gap/'
class TimeAxisHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)start_dt = some_dict['start_dt']bias_hours = some_dict.get('bias_hours') or 8end_dt = some_dict.get('end_dt') or Base.get_time_str1(bias_hours = bias_hours)time_unit = some_dict['time_unit'] or 'seconds'time_gap_seconds = at2.c_period_btw(start_dt,end_dt)if time_unit.lower() =='seconds':res = time_gap_secondselif time_unit.lower() =='minutes':res = time_gap_seconds // 60elif time_unit.lower() =='hours':res = time_gap_seconds // 3600elif time_unit.lower() =='days':res = time_gap_seconds //86400else:res = Noneself.write(json.dumps(res))
timeaxis_tuple = (TimeAxisHandler_path,TimeAxisHandler)
app_list.append(timeaxis_tuple)# Next: 时间间隔的列表处理,以及
if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 单核# 多核打开注释# 0 是全部核# http_server.start(num_processes=10) # tornado将按照cpu核数来fork进程# ---启动print('Server Started')tornado.ioloop.IOLoop.instance().start()root@a1f87a061e62:/workspace# python3 server.py
时间轴文件 base_year1970_next100_tz8 已存在
Server Started
[I 240303 09:05:47 web:2239] 200 POST /gfgo/ (127.0.0.1) 0.54ms
[I 240303 09:05:57 web:2239] 200 POST /time_gap/ (127.0.0.1) 0.44ms
[I 240303 09:06:13 web:2239] 200 POST /time_gap/ (127.0.0.1) 0.57ms
功能测试
import requests as req # 测试1:调用Base包的函数
kwargs = {'ts':None, 'bias_hours':8}
pack_func = 'Base.get_time_str1'some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_funcres = req.post('http://127.0.0.1:8000/gfgo/', json = some_dict).json()
In [8]: res
Out[8]: '2024-03-03 16:30:53'# 测试2:调用基础时间轴函数
some_dict = {}
some_dict['start_dt'] = '2024-03-03 00:00:00'
some_dict['time_unit'] ='hours'
res = req.post('http://127.0.0.1:8000/time_gap/', json = some_dict).json()In [14]: res
Out[14]: 17
3.4 部署
3.4.1 保存为镜像
将容器进行提交就可以了
┌─root@m7:~
└─ $ docker commit gfgo_lite_build myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100
sha256:d2791b11a8fd3b5d307c7ce7ccc0622b8f42a84970a0c5f877ff88da54caeb8a
┌─root@m7:~
└─ $ docker push myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100
The push refers to repository [myregistry.domain.com:24052/worker.andy.gfgo_lite_24090]
e83f4383e7b7: Pushed
f1db272a1809: Mounted from worker.andy.globalfunc
...
805802706667: Mounted from worker.andy.globalfunc
v100: digest: sha256:9bd2d0a8674a3b3acf843aed6ab5b4372190dcae55edc6ef7a24181295f83549 size: 6221
3.4.2 启动服务
容器无需挂载任何数据卷启动
docker run -d \--restart=always \--name=gfgo_lite_24090 \-v /etc/localtime:/etc/localtime \-v /etc/timezone:/etc/timezone\-v /etc/hostname:/etc/hostname\-e "LANG=C.UTF-8" \-w /workspace\-p 24090:8000\myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100 \sh -c "python3 server.py"
在挂载了时区后,时间偏移参数就要置为0了
import requests as req # 测试1:调用Base包的函数
kwargs = {'ts':None, 'bias_hours':0}
pack_func = 'Base.get_time_str1'some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_funcres = req.post('http://127.0.0.1:24090/gfgo/', json = some_dict).json()
print(res)# 测试2:调用基础时间轴函数
some_dict = {}
some_dict['start_dt'] = '2024-03-03 00:00:00'
some_dict['time_unit'] ='hours'
some_dict['bias_hours'] =-8
res = req.post('http://127.0.0.1:24090/time_gap/', json = some_dict).json()
print(res)
4 后续
一方面是函数的继续增多,调用(例如重载)。
另一方面是函数的增改。
很多应用需要分为单个和批次调用,例如对时间的计算可以为单个执行,也可以输入一个列表。
这篇关于Python 全栈系列230 轻全局函数服务 GFGoLite的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!