飞书API(10):通过阿里云MaxCompute数仓入库 - 转为阿里云 DataFrame 再入库

2024-05-26 09:28

本文主要是介绍飞书API(10):通过阿里云MaxCompute数仓入库 - 转为阿里云 DataFrame 再入库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、引入

上一小节介绍了怎么入库到阿里云的 MaxCompute 数仓,其中涉及到 2 种入库方式,一种是转为阿里云的 DataFrame,然后类似 pandas 的 DataFrame 直接写入 MySQL 的方法,将数据写入表中;另外一种是转为列表,再写入 MaxCompute 表。上一小节主要对后者进行展开描述。
有粉丝私聊我说介绍下第一种,本文就来重点探讨下第一种的处理方式。
前面的数据我们通过 pandas 处理,所以这里涉及到将 pandas 的 DataFrame 转为 PyODPS 的 DataFrame 。
用Pandas DataFrame初始化时,PyODPS DataFrame 会尝试对 NUMPY OBJECT 或 STRING 类型进行推断。如果一整列都为空,则会报错。为避免报错,我们可以设置 unknown_as_string 值为 True,将这些列指定为 STRING 类型。如果 Pandas DataFrame 中包含 LIST 或 DICT 列,系统不会推断该列的类型,必须手动使用 as_type 指定类型。as_type参数类型必须是 DICT。
下面具体来探讨一下。

二、迭代入库逻辑

2.1 新增字段类型映射

阿里云的 DataFrame 拥有自己的类型系统,进行表初始化时,MaxCompute的类型会被转换成对应的DataFrame类型,以便支持更多类型的计算后端。
目前,DataFrame的执行后端支持MaxCompute SQL、Pandas和数据库(MySQL和Postgres)。数据类型的映射关系如下:

MaxCompute类型DataFrame类型
BIGINTINT64
DOUBLEFLOAT64
STRINGSTRING
DATETIMEDATETIME
BOOLEANBOOLEAN
DECIMALDECIMAL
ARRAY<VALUE_TYPE>LIST<VALUE_TYPE>
MAP<KEY_TYPE, VALUE_TYPE>DICT<KEY_TYPE, VALUE_TYPE>
当options.sql.use_odps2_extension=True时,还支持以下数据类型。
TINYINTINT8
SMALLINTINT16
INTINT32
FLOATFLOAT32

为了保证通用性和准确性,**建议通过 as_type 参数,指定数据类型。**避免由于 PyODPS DataFrame 推断有出入导致发生报错。
所以,需要加一层字段的映射关系。前面已经加了一层字段类型的映射关系,此处直接加上 PyODPS DataFrame 的类型即可:将double映射为float64,将array<string>映射为list<string>
参考如下:

# 关联入库数据类型
data_type_map = [{"feishu_type": 1   ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 2   ,"mc_type": "double"        ,"mc_df_type": "float64"       },{"feishu_type": 3   ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 4   ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 5   ,"mc_type": "datetime"      ,"mc_df_type": "datetime"      },{"feishu_type": 7   ,"mc_type": "boolean"       ,"mc_df_type": "boolean"       },{"feishu_type": 11  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 13  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 15  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 17  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 18  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 19  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 20  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 21  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 22  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 23  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 1001,"mc_type": "datetime"      ,"mc_df_type": "datetime"      },{"feishu_type": 1002,"mc_type": "datetime"      ,"mc_df_type": "datetime"      },{"feishu_type": 1003,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 1004,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 1005,"mc_type": "string"        ,"mc_df_type": "string"        }]

2.2 构建 PyODPS 的 DataFrame 的 astype

astype参数的值,可以传递一个字典,格式如:{“字段1”:“字段1类型”, “字段2”:“字段2类型”}。
在生成astype之前,通过merge_list()将字段映射fields_map、飞书列信息feishu_fields和数据类型映射data_type_map三者进行合并,返回store_fields_info_df,所以在生成astype时,只需要从store_fields_info_df中提取tb_field_namemc_df_type,然后再加上record_idlast_modified_time即可。

def generate_astype(store_fields_info_df):"""功能:对整合了飞书列入库的列、飞书列类型、和阿里云 DataFrame 数据类型的 pandas DataFrame,提取字段名和阿里云DataFrame 数据类型"""#生成阿里云DataFrame的as_type,用于将处理好的数据转为阿里云DataFrame,格式:{'field_no': 'string', 'field_select': 'float64', 'field_text': 'datatime'}mc_df_astype = store_fields_info_df.set_index('tb_field_name').to_dict()['mc_df_type']#加上record_idmc_df_astype['record_id'] = 'string'#加一个表更新时间mc_df_astype['last_modified_time'] = 'DATETIME'# print(mc_df_astype)print('成功生成阿里云 DataFrame 的 as_type。方法:generate_astype')return mc_df_astype

2.3 生成 PyODPS DataFrame 并写入数仓表

def insert_mc_table(feishu_df, mc_table_name, mc_df_astype):"""生成 PyODPS DataFrame 并将数据插入 Maxcompute 表"""aliyun_df = DataFrame(feishu_df,as_type=mc_df_astype)aliyun_df.persist(mc_table_name)    print(f'成功将飞书数据写入 MaxCompute 数据表:{mc_table_name}。关联方法:insert_mc_table。')

2.4 迭代定制化函数

定制化函数也需要修改,原有的逻辑不用变,新增对astype的改动即可。
PyODPS DataFrame 包括以下类型:int8,int16,int32,int64,float32,float64,boolean,string,decimal,datetime,list,dict,不含 date,所以日期列不需要处理。

def custom_field(df_return, columns, columns_index, mc_df_astype):# 2.1 场景一:把数字入库为 int 类型# 修改 SQL 即可# cre_ddl = cre_ddl.replace('field_number float','field_number int')column = Column(name='field_number', type='bigint', comment=columns_index['field_number'][1])columns[columns_index['field_number'][0]] = column#修改mc_df_astypemc_df_astype['field_number'] = 'int64'# 2.2 场景二:把日期入库为 date 类型# 修改 df,MySQL会自动截断,Maxcompute不行,需要使用 x.date() 处理df_return['field_createdtime'] = df_return['field_createdtime'].apply(lambda x:x.date())# 修改 SQL# cre_ddl = cre_ddl.replace('field_createdtime datetime','field_createdtime date')column = Column(name='field_createdtime', type='date', comment=columns_index['field_createdtime'][1])columns[columns_index['field_createdtime'][0]] = column# #修改mc_df_astype# mc_df_astype['field_createdtime'] = 'date'# 2.3 场景三:日期给定默认最大值# 修改 df 即可#默认值改为 2222-01-01 00:00:00mask = df_return['field_date'] == pd.Timestamp('1970-01-01 08:00:01')df_return.loc[mask, 'field_date'] = pd.Timestamp('2222-01-01 00:00:00')# 2.4 场景四:公式保留具体值# 修改 df# 修改 SQLdf_return['field_numformula'] = df_return['field_numformula'].apply(lambda x:json.loads(x)['value'][0])# cre_ddl = cre_ddl.replace('field_numformula varchar(256)','field_numformula int')column = Column(name='field_numformula', type='bigint', comment=columns_index['field_numformula'][1])columns[columns_index['field_numformula'][0]] = column#修改mc_df_astypemc_df_astype['field_numformula'] = 'int64'# 创建新的 schemaschema = Schema(columns=columns)print('定制函数打印数据和建表语句')print('----------------------------------------------\n', df_return[['field_number','field_createdtime','field_date','field_numformula']].head(5))print('----------------------------------------------\n', schema.columns)return df_return, schema

三、整合代码

将上一小节最终的整合代码结合上面的三项内容进行修改。

  • 修改data_type_map:加上键值对mc_df_type
  • 使用generate_astype()+insert_mc_table() 替换replace_nan_with_none()insert_mc_table()
  • 替换定制化函数的内容

最终参考代码如下:


import requests
import json
import datetime
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import urlparse, parse_qs
from odps.models import Schema, Column
import mathdef get_table_params(bitable_url):# bitable_url = "https://feishu.cn/base/aaaaaaaa?table=tblccc&view=vewddd"parsed_url = urlparse(bitable_url)              #解析url:(ParseResult(scheme='https', netloc='feishu.cn', path='/base/aaaaaaaa', params='', query='table=tblccc&view=vewddd', fragment='')query_params = parse_qs(parsed_url.query)       #解析url参数:{'table': ['tblccc'], 'view': ['vewddd']}app_token = parsed_url.path.split('/')[-1]table_id = query_params.get('table', [None])[0]view_id = query_params.get('view', [None])[0]print(f'成功解析链接,app_token:{app_token},table_id:{table_id},view_id:{view_id}。关联方法:get_table_params。')return app_token, table_id, view_iddef get_tenant_access_token(app_id, app_secret):url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"payload = json.dumps({"app_id": app_id,"app_secret": app_secret})headers = {'Content-Type': 'application/json'}response = requests.request("POST", url, headers=headers, data=payload)tenant_access_token = response.json()['tenant_access_token']print(f'成功获取tenant_access_token:{tenant_access_token}。关联函数:get_table_params。')return tenant_access_tokendef get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search?page_size={page_size}&page_token={page_token}&user_id_type=user_id"payload = json.dumps({"view_id": view_id})headers = {'Content-Type': 'application/json','Authorization': f'Bearer {tenant_access_token}'}response = requests.request("POST", url, headers=headers, data=payload)print(f'成功获取page_token为【{page_token}】的数据。关联函数:get_bitable_datas。')return response.json()def get_all_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):has_more = Truefeishu_datas = []while has_more:response = get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token, page_size)if response['code'] == 0:page_token = response['data'].get('page_token')has_more = response['data'].get('has_more')# print(response['data'].get('items'))# print('\n--------------------------------------------------------------------\n')feishu_datas.extend(response['data'].get('items'))else:raise Exception(response['msg'])print(f'成功获取飞书多维表所有数据,返回 feishu_datas。关联函数:get_all_bitable_datas。')return feishu_datasdef get_bitable_fields(tenant_access_token, app_token, table_id, page_size=500):url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/fields?page_size={page_size}"payload = ''headers = {'Authorization': f'Bearer {tenant_access_token}'}response = requests.request("GET", url, headers=headers, data=payload)field_infos = response.json().get('data').get('items')print('成功获取飞书字段信息,关联函数:get_bitable_fields。')return field_infosdef merge_list(ls_from, ls_join, on=None, left_on=None, right_on=None):"""将两个[{},{}]结构的数据合并"""df_from = pd.DataFrame(ls_from)df_join = pd.DataFrame(ls_join)if on is not None:df_merge = df_from.merge(df_join, how='left', on=on)else:df_merge = df_from.merge(df_join, how='left', left_on=left_on, right_on=right_on) # , suffixes=('', '_y')print(f'成功合并列表或DataFrame。关联方法:merge_list。')return df_mergedef extract_key_fields(feishu_datas, store_fields_info_df):"""处理飞书数据类型编号的数据"""print('开始处理飞书多维表关键字段数据...')# 需要record_id 和 订单号,用于和数据库数据匹配df_feishu = pd.DataFrame(feishu_datas)df_return = pd.DataFrame()#根据列的数据类型,分别处理对应的数据。注意:仅返回以下列举的数据类型,如果fields_map的内容包含按钮、流程等数据类型的飞书列,忽略。for index, row in store_fields_info_df.iterrows():if row['type'] == 1:       #文本df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],[{}])[0].get('text'))elif row['type'] in (2, 3, 4, 7, 13, 1005):  #数字、单选、多选、复选框、手机号、自动编号df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name']))elif row['type'] in (5, 1001, 1002):         #日期(包含创建和更新),需要加 8 小时,即 8*60*60*1000=28800 秒df_return[row['tb_field_name']] = pd.to_datetime(df_feishu['fields'].apply(lambda x:28800 + int(x.get(row['field_name'],1000)/1000)), unit='s')elif row['type'] in(11, 23, 1003, 1004):       #人员、群组、创建人、修改人,遍历取namedf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x: ','.join([i.get('name') for i in x.get(row['field_name'],[{"name":""}])]))  # 需要遍历elif row['type'] == 15:    #链接df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('link'))elif row['type'] == 17:    #附件,遍历取urldf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:[i.get('url') for i in x.get(row['field_name'],[{}])]) #需要遍历elif row['type'] in(18, 21):    #单向关联、双向关联,取link_record_idsdf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('link_record_ids'))elif row['type'] in(19, 20):    #查找引用和公式df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps(x.get(row['field_name'])))elif row['type'] == 22:    #地理位置df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('location'))#加上record_iddf_return['record_id'] = df_feishu.record_id#加上表更新字段df_return['last_modified_time'] = datetime.datetime.now()print(f'成功提取入库字段的数据。关联方法:extract_key_fields。')return df_returndef generate_create_schema(store_fields_info_df):columns = []columns_index = {}for index, row in store_fields_info_df.iterrows():name = row['tb_field_name']; type=row['mc_type']; comment=row['feishu_field_name']# print(name,type,comment)columns.append(Column(name=name, type=type, comment=comment))columns_index[name] = (index, comment)columns.append(Column(name='record_id', type='string', comment='飞书表行record_id'))columns.append(Column(name='last_modified_time', type='datetime', comment='数据更新时间'))schema = Schema(columns=columns)print(f'成功生成 MaxCompute 建表 schema。关联方法:generate_create_schema。')return schema, columns, columns_indexdef cre_mc_table(db_table_name, schema):if o.exist_table(db_table_name):print(f'表单 {db_table_name} 已存在,不需要新建。关联方法:cre_mc_table。')else:table = o.create_table(db_table_name, schema, if_not_exists=True)print(f'成功创建 MaxCompute 表:{db_table_name}。关联方法:cre_mc_table。')def generate_astype(store_fields_info_df):"""功能:对整合了飞书列入库的列、飞书列类型、和阿里云 DataFrame 数据类型的 pandas DataFrame,提取字段名和阿里云DataFrame 数据类型"""#生成阿里云DataFrame的as_type,用于将处理好的数据转为阿里云DataFrame,格式:{'field_no': 'string', 'field_select': 'float64', 'field_text': 'datatime'}mc_df_astype = store_fields_info_df.set_index('tb_field_name').to_dict()['mc_df_type']#加上record_idmc_df_astype['record_id'] = 'string'#加一个表更新时间mc_df_astype['last_modified_time'] = 'DATETIME'# print(mc_df_astype)print('成功生成阿里云 DataFrame 的 as_type。方法:generate_astype')return mc_df_astypedef insert_mc_table(feishu_df, mc_table_name, mc_df_astype):"""生成 PyODPS DataFrame 并将数据插入 Maxcompute 表"""aliyun_df = DataFrame(feishu_df,as_type=mc_df_astype)aliyun_df.persist(mc_table_name)    print(f'成功将飞书数据写入 MaxCompute 数据表:{mc_table_name}。关联方法:insert_mc_table。')def custom_field(df_return, columns, columns_index, mc_df_astype):# 2.1 场景一:把数字入库为 int 类型# 修改 SQL 即可# cre_ddl = cre_ddl.replace('field_number float','field_number int')column = Column(name='field_number', type='bigint', comment=columns_index['field_number'][1])columns[columns_index['field_number'][0]] = column#修改mc_df_astypemc_df_astype['field_number'] = 'int64'# 2.2 场景二:把日期入库为 date 类型# 修改 df,MySQL会自动截断,Maxcompute不行,需要使用 x.date() 处理df_return['field_createdtime'] = df_return['field_createdtime'].apply(lambda x:x.date())# 修改 SQL# cre_ddl = cre_ddl.replace('field_createdtime datetime','field_createdtime date')column = Column(name='field_createdtime', type='date', comment=columns_index['field_createdtime'][1])columns[columns_index['field_createdtime'][0]] = column# #修改mc_df_astype# mc_df_astype['field_createdtime'] = 'date'# 2.3 场景三:日期给定默认最大值# 修改 df 即可#默认值改为 2222-01-01 00:00:00mask = df_return['field_date'] == pd.Timestamp('1970-01-01 08:00:01')df_return.loc[mask, 'field_date'] = pd.Timestamp('2222-01-01 00:00:00')# 2.4 场景四:公式保留具体值# 修改 df# 修改 SQLdf_return['field_numformula'] = df_return['field_numformula'].apply(lambda x:json.loads(x)['value'][0])# cre_ddl = cre_ddl.replace('field_numformula varchar(256)','field_numformula int')column = Column(name='field_numformula', type='bigint', comment=columns_index['field_numformula'][1])columns[columns_index['field_numformula'][0]] = column#修改mc_df_astypemc_df_astype['field_numformula'] = 'int64'# 创建新的 schemaschema = Schema(columns=columns)print('定制函数打印数据和建表语句')print('----------------------------------------------\n', df_return[['field_number','field_createdtime','field_date','field_numformula']].head(5))print('----------------------------------------------\n', schema.columns)return df_return, schema, mc_df_astypedef main(mc_table_name, bitable_url, fields_map):# 基本配置app_token, table_id, view_id = get_table_params(bitable_url)app_id = 'your_app_id'app_secret = 'your_app_secret'tenant_access_token = get_tenant_access_token(app_id, app_secret)page_size = 50# 获取飞书多维表所有数据feishu_datas = get_all_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_size=page_size)#获取飞书字段信息feishu_fields = get_bitable_fields(tenant_access_token, app_token, table_id)# 以 fields_map 为准关联数据store_fields_info_df = merge_list(fields_map, feishu_fields, left_on='feishu_field_name', right_on='field_name')# 处理入库字段数据feishu_df = extract_key_fields(feishu_datas, store_fields_info_df)# 关联入库数据类型data_type_map = [{"feishu_type": 1   ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 2   ,"mc_type": "double"        ,"mc_df_type": "float64"       },{"feishu_type": 3   ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 4   ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 5   ,"mc_type": "datetime"      ,"mc_df_type": "datetime"      },{"feishu_type": 7   ,"mc_type": "boolean"       ,"mc_df_type": "boolean"       },{"feishu_type": 11  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 13  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 15  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 17  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 18  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 19  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 20  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 21  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  },{"feishu_type": 22  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 23  ,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 1001,"mc_type": "datetime"      ,"mc_df_type": "datetime"      },{"feishu_type": 1002,"mc_type": "datetime"      ,"mc_df_type": "datetime"      },{"feishu_type": 1003,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 1004,"mc_type": "string"        ,"mc_df_type": "string"        },{"feishu_type": 1005,"mc_type": "string"        ,"mc_df_type": "string"        }]store_fields_info_df = merge_list(store_fields_info_df, data_type_map, left_on='type', right_on='feishu_type')# 生成 MaxCompute schemaschema, columns, columns_index = generate_create_schema(store_fields_info_df)# 生成 DataFrame astypemc_df_astype = generate_astype(store_fields_info_df)# 定制化feishu_df, schema, mc_df_astype = custom_field(feishu_df, columns, columns_index, mc_df_astype)# 建 MaxCompute 数据表cre_mc_table(mc_table_name, schema)# MaxCompute 表插入数据insert_mc_table(feishu_df, mc_table_name, mc_df_astype)print(f'成功将飞书多维表({bitable_url})的数据入库到 mysql 数据表:{mc_table_name}。')if __name__ == '__main__':mc_table_name = 'for_ods.feishu_data_type_test'bitable_url = "https://forchangesz.feishu.cn/base/SpY3b9LMFaodpOsE0kdcGEyonbg?table=tbl5BZE0Aubjz5Yy&view=vewDM4NGlP"fields_map = [{'tb_field_name': 'field_text','feishu_field_name': '文本'},{'tb_field_name': 'field_email','feishu_field_name': 'email'},{'tb_field_name': 'field_select','feishu_field_name': '单选'},{'tb_field_name': 'field_mobile','feishu_field_name': '电话号码'},{'tb_field_name': 'field_no','feishu_field_name': '自动编号'},{'tb_field_name': 'field_member1','feishu_field_name': '人员1'},{'tb_field_name': 'field_group1','feishu_field_name': '群组1'},{'tb_field_name': 'field_creator','feishu_field_name': '创建人'},{'tb_field_name': 'field_modifier','feishu_field_name': '修改人'},{'tb_field_name': 'field_member2','feishu_field_name': '人员2'},{'tb_field_name': 'field_group2','feishu_field_name': '群组2'},{'tb_field_name': 'field_url','feishu_field_name': '超链接'},{'tb_field_name': 'field_location','feishu_field_name': '地理位置'},{'tb_field_name': 'field_findnum','feishu_field_name': '查找引用数值'},{'tb_field_name': 'field_numformula','feishu_field_name': '数字公式'},{'tb_field_name': 'field_number','feishu_field_name': '数字'},{'tb_field_name': 'field_progress','feishu_field_name': '进度'},{'tb_field_name': 'field_money','feishu_field_name': '货币'},{'tb_field_name': 'field_Rating','feishu_field_name': '评分'},{'tb_field_name': 'field_bool','feishu_field_name': '复选框'},{'tb_field_name': 'field_date','feishu_field_name': '日期'},{'tb_field_name': 'field_createdtime','feishu_field_name': '创建时间'},{'tb_field_name': 'field_updatedtime','feishu_field_name': '更新时间'},{'tb_field_name': 'field_mulselect','feishu_field_name': '多选'},{'tb_field_name': 'field_singleunion','feishu_field_name': '单向关联'},{'tb_field_name': 'field_doubleunion','feishu_field_name': '双向关联'},{'tb_field_name': 'field_file','feishu_field_name': '附件'}]main(mc_table_name, bitable_url, fields_map)

最终执行的结果和上一小节一致,参考如下:
image.png

image.png

image.png

四、小结

本文探讨了怎么通过 PyODPS 的 DataFrame 将飞书数据入库,主要涉及四点:新增 PyODPS 的 DataFrame 的数据类型映射、定义 astype、将飞书数据转为 PyODPS 的 DataFrame 并入库和定制化中新增 astype 的修改。

PyODPS 的 DataFrame 更多用于数据科学计算,方便将分析的结果数据保存到表中,此处仅用它作为一个中间桥梁,将 pandas 的 DataFrame 和数仓表连接起来。

这篇关于飞书API(10):通过阿里云MaxCompute数仓入库 - 转为阿里云 DataFrame 再入库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1004077

相关文章

基于Flask框架添加多个AI模型的API并进行交互

《基于Flask框架添加多个AI模型的API并进行交互》:本文主要介绍如何基于Flask框架开发AI模型API管理系统,允许用户添加、删除不同AI模型的API密钥,感兴趣的可以了解下... 目录1. 概述2. 后端代码说明2.1 依赖库导入2.2 应用初始化2.3 API 存储字典2.4 路由函数2.5 应

Python中DataFrame转列表的最全指南

《Python中DataFrame转列表的最全指南》在Python数据分析中,Pandas的DataFrame是最常用的数据结构之一,本文将为你详解5种主流DataFrame转换为列表的方法,大家可以... 目录引言一、基础转换方法解析1. tolist()直接转换法2. values.tolist()矩阵

C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)

《C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)》本文主要介绍了C#集成DeepSeek模型实现AI私有化的方法,包括搭建基础环境,如安装Ollama和下载DeepS... 目录前言搭建基础环境1、安装 Ollama2、下载 DeepSeek R1 模型客户端 ChatBo

Java中将异步调用转为同步的五种实现方法

《Java中将异步调用转为同步的五种实现方法》本文介绍了将异步调用转为同步阻塞模式的五种方法:wait/notify、ReentrantLock+Condition、Future、CountDownL... 目录异步与同步的核心区别方法一:使用wait/notify + synchronized代码示例关键

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

一分钟带你上手Python调用DeepSeek的API

《一分钟带你上手Python调用DeepSeek的API》最近DeepSeek非常火,作为一枚对前言技术非常关注的程序员来说,自然都想对接DeepSeek的API来体验一把,下面小编就来为大家介绍一下... 目录前言免费体验API-Key申请首次调用API基本概念最小单元推理模型智能体自定义界面总结前言最

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep