本文主要是介绍通过Slack接收HeadSpin信息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
构建应用程序并对其进行测试需要在流程的每个步骤中各个利益相关者之间进行大量协作。每次运行完成后,与每个利益相关者手动监控、编译和共享会话数据可能是一场噩梦。如果我们可以通过通信工具将会话数据作为警报发送,它将一次性覆盖更多受众。
Slack 是适用于几乎所有组织的团队通信工具,它允许用户在自动化脚本的末尾插入警报。一旦脚本完成执行,会话数据将被自动收集并广播给选定的用户或频道。
用户可以使用 Webhooks 或 Slack API 通过 Slack 发送警报,也可以将文件附加到它们。通过上述任一方法发送警报的第一步是创建一个 Slack 插件。
通过创建 Slack 应用程序,用户可以访问 Slack 的大量功能,其中发送自动警报只是其中之一。在组织级别创建 Slack 应用程序允许整个组织根据需要接收警报和通知。请参阅此快速指南以创建 Slack 插件。
在 Slack 中使用 Webhooks 发送警报
传入 Webhooks 是将警报从应用程序发送到 Slack 的最简单方法。
使用 Webhooks 创建 Slack 警报的步骤:
第 1 步:创建一个 Slack 插件(如果您还没有,请参阅本指南)。
第 2 步:在 Slack 插件中启用传入 Webhooks 功能,以允许它在 Slack 中发布来自外部来源的警报。
第 3 步:启用 Incoming Webhooks 后,将新的 Webhook 添加到您的工作区。 选择您希望应用程序在其中发送警报的 Slack 频道。
第 4 步:将生成特定于您选择的频道的webhook URL。
第 5 步:生成 webhook URL 后,您可以使用 HTTP POST 请求将警报添加到 URL。
import requests
import jsonparams = {'text': "Run ID:<run_id> \n Session Link:<session_link>" }
webhook_url= "https://hooks.slack.com/services/XxxxXxxXXx/XxxXXx/XxxxXxxxxX"
requests.post(url = webhook_url, data =json.dumps(params) )
第 6 步:应用程序会将警报发布到您选择的频道,并为其生成了 webhook URL。 警报将如下所示:
上述警报具有已完成运行的会话的 ID 及其链接。 该频道的成员可以单击链接查看会话日志和其他详细信息。
使用 Post Message Slack API 发送警报
虽然传入 Webhook 是在 Slack 中发布警报的一种简单方法,但它们的功能有限。 例如,Webhooks 允许用户广播消息,但在发布后不能修改或删除它们。 它们也不允许用户覆盖他们为其创建 Webhook URL 的初始频道首选项。
chat.postMessage 方法在功能和范围方面要先进得多。 它们使应用程序能够回复消息、在多个频道上广播相同的消息等。
使用 postMessage API 创建 Slack 警报的步骤:
第 1 步:创建一个 Slack 应用程序(如果您还没有,请参阅本指南)。
第 2 步:从应用管理页面转到应用设置,然后从导航菜单中选择 OAuth 和权限功能。 向下滚动到 Scopes 部分,然后从下拉菜单中选择 chat:write。 此范围将使应用程序能够代表其发送消息。
第 3 步:如果应用程序之前没有授予上述范围,您必须使用重新安装应用程序将应用程序重新安装到其原始工作区。
第 4 步:使用任何 Slack API 都需要工作区令牌。 用户首次在工作区中安装应用程序时会生成工作区令牌,并在“机器人用户 OAuth 令牌”下的应用程序设置中可用。 要使 API 正常工作,令牌需要具有正确的权限。
第 5 步:
- Python 代码段需要一个参数通道来找到正确的对话。
- 参数 text 指定警报的内容。
- 使用来自 Slack Web 客户端的 chat.postMessage API 发布警报的 Python 片段如下:
from slack import WebClient
client = WebClient(token=<workspace_token>)
client.chat_postMessage(channel=<channel_id>, text=<alert_message>)
第 6 步:应用程序会将警报发布到您选择的频道,并为其生成了 webhook URL。 警报将如下所示:
第 7 步:您可以使用参数 thread_ts 将警报作为对上一条消息的回复发送。 当警报与上一条消息相关并且必须发布而不造成混淆时,此命令会派上用场。
from slack import WebClient
client = WebClient(token=<workspace_token>)
client.chat_postMessage(channel=<channel_id>,text=<alert_message>,thread_ts= <ts_value_of_parent_message>
)
步骤 8:您可以通过为参数 thread_ts 提供父消息的 ts 值,使警报显示为对消息的线程回复。 您可以从发送父消息时获得的响应负载中获取 ts 值。
在警报中发布附件
Slack 允许用户将文件作为消息与文本消息一起上传,甚至作为对消息线程的回复。 您将图片、日志和其他工件标记为在会话运行后广播的警报。 files.upload 是用于将附件上传到 Slack 的 API 方法。
使用 files.upload API 将文件附加到 Slack 警报的步骤:
第 1 步:创建一个 Slack 应用程序(如果您还没有,请参阅本指南)。
第 2 步:从应用管理页面转到应用设置,然后从导航菜单中选择 OAuth 和权限功能。 向下滚动到 Scopes 部分,然后从下拉菜单中选择 files:write。 此范围将使应用程序能够代表其发送消息。
第 3 步:如果应用程序之前没有授予上述范围,您必须使用重新安装应用程序将应用程序重新安装到其原始工作区。
第 4 步:使用任何 Slack API 都需要 Workspace 令牌参数。 要使 API 正常工作,令牌需要具有正确的权限。
第5步:
- Python 代码段还需要一个参数通道来找到正确的对话。 您可以使用逗号分隔的列表添加多个频道。
- 您可以使用参数 initial_comment 来指定与警报相关的任何注释。
- 参数 title 为附件提供了一个标题。
- 您必须在参数文件中提及附件路径的需要。
第 6 步:从 Slack Web 客户端使用 files.upload API 上传文件的 Python 片段如下所示:
from slack import WebClient
client = WebClient(token=<workspace_token>)
client.files_upload(channels=<channel_id>,initial_comment=<alert_message>,title=<title_of_the_alert>,file=<file_path>
)
第 7 步:一些附加参数有助于在 Slack 中使用线程功能。
- 您可以使用参数 thread_ts 将文件附加到相应的会话警报。
该应用程序会将附件发布到使用 channel 参数选择的频道,并在 initial_comment 中提供详细信息,如下所示:
Slack 通知集成代码
使用 start HeadSpin 捕获,以下 Python 代码模板展示了如何将测试与用户流关联,并设置状态(取决于自动化脚本的成功或失败)。 该代码使用 postMessage API 将包含会话链接的通知推送到 Slack。
# -*- coding: utf-8 -*-
#!/usr/bin/python
import os
from time import sleep
import sys
import time
import unittest
import requests
from appium import webdriver
from slack import WebClientclass AutomationTests(unittest.TestCase):def setUp(self):self.package = <app_package>self.os = <os_name>self.udid = <device_udid> #Selecting the device using device udid#Initialising desired capabilitiesdesired_caps = {}desired_caps['platformName'] = <platform_name>desired_caps['udid'] = self.udiddesired_caps['deviceName'] = self.udiddesired_caps['appPackage'] = <app_package>desired_caps['appActivity'] = <app_activity>desired_caps['headspin:capture.video'] = Trueself.status = "Fail"appium_input = <appium_url> #Argument for appium urlself.url = appium_inputself.auth_token = appium_input.split('/')[-3]#Initialising driver to start test in required deviceself.driver = webdriver.Remote(self.url, desired_caps)def tearDown(self):self.session_id = self.driver.session_idprint ("https://ui-dev.headspin.io/sessions/" + self.session_id + "/waterfall")#Completing test and releasing the device from the driverself.driver.quit()#Setting the test statusif self.status != "Pass":session_status = "Failed"else: session_status= "Passed"self.associate_userflow()self.update_description()#push notifications to Slack containing the session link and few data derived from the HeadSpin session capturenotification_content = "Session Link: "+ str(self.session_id) + "\n Data 1: " +str(self.data[0]) + "\n Data 2: " +str(self.data[1]) + "\n Data 3: " +str(self.data[2]) + "\n Data 4: " +str(self.data[3])workspace_token= "xoxb-123456621-1234567123-xxxxxxxxxxxxxxxxxxxxxxxx"channel_id = "XXXXXXXXXXX"client = WebClient(token=workspace_token)client.chat_postMessage(channel= channel_id, text= notification_content )def test_workflow(self):#Main script to be executed self.driver.implicitly_wait(30)begin_button = self.driver.find_element_by_class_name('begin-btn')begin_button.click()sleep(10)data_list = self.driver.find_elements_by_class_name('data-list')for i in range(0,4):self.data[i] = data_list[i].get_attribute('text')self.status = "Pass"def associate_userflow(self):#Associate a test with a User Flow session_data = {"session_id": str(self.session_id), "status": session_status, "test_name": self.test_name, "data":[{"key":"bundle_id","value": self.package}, {"key": "status", "value": self.status},{"key":"Data_1", "value":str(self.data[0])},{"key":"Data_2","value":str(self.data[1])},{"key":"Data_3","value":str(self.data[2])},{"key":"Data_4","value":str(self.data[3])}]}api_endpoint= "https://api-dev.headspin.io/v0/perftests/upload"output = requests.post(api_endpoint, headers={'Authorization': 'Bearer {}'.format(self.auth_token)}, json=session_data)def update_description(self):#Adding description with relevant data to the sessiondescription_string = ""for data in session_data['data']:description_string += data['key'] + " : " + str(data['value']) + "\n"data_payload = {}data_payload['name'] = self.test_namedata_payload['description'] = description_stringrequest_url= 'https://api-dev.headspin.io/v0/sessions/' + self.session_id + '/description'response = requests.post(request_url, headers={'Authorization': 'Bearer {}'.format(self.auth_token)}, json=data_payload)if __name__ == '__main__':suite = unittest.TestLoader().loadTestsFromTestCase(AutomationTests)unittest.TextTestRunner(verbosity=2).run(suite)
总结
使用上述方法,用户可以避免手动监控会话的麻烦,而是将会话数据直接传送到他们的 Slack 通道。 他们可以在 Webhooks 的简单但受限的功能或具有更大功能的更高级 API 方法(如 chat.postMessage 和 files.upload)之间进行选择。
这篇关于通过Slack接收HeadSpin信息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!