Written by
Ivan Bercovich
on
on
Tiny Slack GPT
I made a simple, single file script to run GPT in your slack organization. It can participate in conversations when mentioned, summarize long threads, and query a sqlite database containing an organizational knowledge-base (in ScOp’s case, a list of investments).
"""
Single file app to run a GPT slackbot
Includes basic Text2SQL capabilities
1-) create application on slack
2-) enable socket mode
3-) create OAuth tokens
4-) add slash commands for /query and /summarize
5-) figure out your bot user ID
6-) enable the right scopes:
Bot Token Scopes | User Token Scopes
=============================================
app_mentions:read | channels:history
channels:history | chat:write
channels:read | groups:history
chat:write | im:history
commands | im:read
groups:history | im:write
groups:read | mpim:history
im:history | mpim:read
mpim:history | mpim:write
users:read |
7-) run from terminal, cron, or as a service
"""
from __future__ import print_function
# Python built-in modules
import os
import re
import time
import socket
import sqlite3
# Third-party modules
import openai
import tiktoken
from tabulate import tabulate
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
#############################
# load local only credentials
##############################
# best to add credentials to .env file
# on same folder and keep it out of git
# you can use pip import dotenv
SLACK_BOT_TOKEN = "xoxb-..."
SLACK_APP_TOKEN = "xapp-..."
SLACK_BOT_USER = "U..."
OPENAI_API_KEY = "sk-..."
OPENAI_ORG = "org-..."
OPENAI_MODEL = "gpt-3.5-turbo" #or "gpt-4"
TOKEN_LIMIT = 8000 if OPENAI_MODEL == "gpt-4" else 4000
# other params
DATA_DB = '...' #sqlite file name _.db
DATA_TABLE = '...' #table names
#########################################
# check there is internet connection or
# program fails when connecting to Slack
#########################################
def is_connected():
"""
Check if there is an internet connection.
Returns True if connected, False otherwise.
"""
try:
conn = socket.create_connection(("www.google.com", 80))
if conn is not None:
conn.close()
return True
except OSError:
pass
return False
while not is_connected():
time.sleep(5)
print("is connected!")
########################
# initialize APIs
########################
app = App(token=SLACK_BOT_TOKEN)
web_client = WebClient(token=SLACK_BOT_TOKEN)
openai.organization = OPENAI_ORG
openai.api_key = OPENAI_API_KEY
encoding = tiktoken.encoding_for_model(OPENAI_MODEL)
########################
# Slack event listeners
########################
@app.event("app_mention")
def mention_handler(body, say):
"""
Gets called when the bot is mentioned
Message is passed to GPT, with context
"""
channel_id = body["event"]["channel"]
channel_name = get_channel_name(channel_id)
history = get_message_history(channel_id)
response = chat_gpt(history, channel_name)
say({"text": response, "response_type": "in_channel"})
# pylint: disable=unused-argument
@app.command("/summarize")
def call_gpt_summarize(ack, say, command):
"""
Summarizes conversation in existing channel, in thread
"""
ack()
channel = command["channel_id"]
prompt = f"<@{command['user_id']}> asked to summarize conversation"
m_ts = web_client.chat_postMessage(channel=channel, text=prompt)["ts"]
history = get_message_history(command["channel_id"])
text = summarize_conversation(history)
web_client.chat_postMessage(channel=channel, text=text, thread_ts=m_ts)
@app.command("/query")
def data_query_command(ack, say, command):
"""
Sends natural language question to DB
"""
ack()
question = command["text"].lower()
query = text2sql(command["text"])
if query:
# post user question
prompt = f"<@{command['user_id']}> asked: {question}"
say({"text": prompt, "response_type": "in_channel"})
# post query
say({"text": f"```\n{query}\n```\n", "response_type": "in_channel"})
data = fetch(query)
# display raw data as table
split_markdown(data, say)
# pylint: disable=unused-argument
@app.event("message")
def handle_message_events(body, logger):
"""
catch-all for other events or it throws errors
"""
#############################
# Slack helper functions
#############################
def get_slack_users():
"""
gets the list of users so they can be referred by name.
otherwise, all we know are their IDs
"""
result = web_client.users_list()
result = {usr["id"]: usr["name"] for usr in result["members"]}
return result
def get_channel_name(channel_id):
"""
get channel name
"""
# Get the channel information using the conversations.info API method
response = web_client.conversations_info(channel=channel_id)
# Extract the channel name from the response
return response["channel"]["name"]
USERS = get_slack_users()
def get_message_history(channel_id):
"""
gets message history, including threads, and removes unnecessary components
"""
messages = get_channel_messages(channel_id)
history = get_threads(channel_id, messages)
history = [
{"text": replace_user_id(conv["text"]), "user": USERS[conv["user"]]}
for conv in history
]
history = list(reversed(history))
return history
def get_channel_messages(channel_id):
"""
retrieves messages from a channel
"""
try:
result = web_client.conversations_history(channel=channel_id)
messages = result["messages"]
return messages
except SlackApiError as error:
print(f"Error: {error}")
return []
def get_threads(channel_id, messages):
"""
retrieves threads for messages that contain them
flattens the whole structure into a single thread
"""
all_messages = []
for message in messages:
all_messages.append(message)
if "thread_ts" in message:
try:
result = web_client.conversations_replies(
channel=channel_id, ts=message["ts"]
)
# for some reason threads are ordered opposite to messages
thread_messages = list(reversed(result["messages"]))
all_messages.extend(thread_messages)
except SlackApiError as error:
print(f"Error: {error}")
return all_messages
def split_markdown(code, say):
"""
slack is bad at printing large comment blocks so I need this work around
this will split blocks of text intended as markdown into multiple blocks
"""
if isinstance(code, str):
lines = code.splitlines()
sections = ["\n".join(lines[i : i + 10]) for i in range(0, len(lines), 10)]
for section in sections:
table = "```\n" + section + "\n```"
say({"text": table, "response_type": "in_channel"})
def replace_user_id(message):
"""
Function to replace user IDs with user names
Also remove duplicate mentions to users
"""
def helper(match):
"""
Function to replace user IDs with user names
"""
user_id = match.group(1)
return USERS.get(user_id, f"<@{user_id}>")
pattern = r"<@([A-Z0-9]+)>"
# remove name: at the start of the message
message = re.sub(r"^\w+:\s*", "", message)
# replace mention/ids with names
return re.sub(pattern, helper, message)
def get_root_dir():
"""
Returns root dir for the main file, in this case app.py
"""
# pylint: disable=no-member
main_script_path = os.path.abspath(__file__)
main_script_dir = os.path.dirname(os.path.realpath(main_script_path))
return main_script_dir
#############################
# SQLite functions
#############################
def text2sql(text):
"""
takes a text prompt and returns sql
"""
schema = get_json_schema(DATA_TABLE)
rows, cols = data_query(f"SELECT * FROM {DATA_TABLE};")
markdown_table = get_markdown_table(rows, cols, pretty=False)
query = call_gpt(
get_prompt_template(
"query",
params={
"table": DATA_TABLE,
"schema": schema,
"data": markdown_table,
"question": text,
},
)
).lower()
# remove code blocks which appear ocassionally
query = query.replace("sql", "")
query = query.replace("```", "")
return query.strip()
def fetch(query):
"""
takes a query and returns data and explanation
call_gpt(
get_prompt_template(
"explain_query",
params={"question": question, "query": query, "response": rows},
)
)
"""
rows, cols = data_query(query)
data = get_markdown_table(rows, cols)
return data
def get_markdown_table(rows, cols, pretty=True):
"""
returns markdown table for one or more items
"""
# make headers multi-line
if pretty:
cols = [col.replace("_", "\n") for col in cols]
return tabulate(rows, cols, floatfmt=".2f", intfmt=",")
def data_query(query):
"""
runs the actual query
"""
rows = []
cols = []
if query:
conn, cursor = get_db_conn()
cursor.execute(query)
cols = [col[0] for col in cursor.description]
rows = cursor.fetchall()
conn.close()
return rows, cols
def get_json_schema(table):
"""
get's the schema for the table
"""
# Retrieve the schema of the table and convert it to a JSON object
rows, _ = data_query(f"PRAGMA table_info({table})")
schema = [f'column: "{row[1]}", type: "{row[2]}"' for row in rows]
schema = "\n".join(schema)
return schema
def get_db_conn(mode="ro", db_path=None):
"""
Parameters:
mode (string): default 'ro' (read only), alternatively 'rw' (read write).
db_path (path): path where the db is located, alternatively it gets resolved from .env
Returns:
conn (sqlite connection):
cursor (sqlite cursor):
"""
if db_path is None:
db_path = os.path.join(get_root_dir(), DATA_DB)
# Connect to the database, read only
conn = sqlite3.connect(f"file:{db_path}?mode={mode}", uri=True)
# Create a cursor object
cursor = conn.cursor()
return conn, cursor
#############################
# GPT functions
#############################
def trim_list_by_tokens(str_list, prompt_header):
"""
Takes a list and trims it based on the number of tokens allowed
This approach is useful to avoid cutting messages in half
Removes the token legth of the prompt header
"""
count = 0
ret = []
if isinstance(str_list, list):
prompt_token_len = len(encoding.encode(prompt_header)) + 25 # add a buffer
# discount tokens for separator
prompt_token_len += len(encoding.encode("\n\n###\n\n")) * len(str_list)
# we reverse it so we only end up cropping the oldest messages
for item in reversed(str_list):
tokens = encoding.encode(item)
count += len(tokens)
if count > TOKEN_LIMIT - prompt_token_len:
break
ret.append(item)
# we reverse the list again so it's in chronological order
ret = list(reversed(ret))
ret = "\n\n###\n\n".join(ret)
return ret
def summarize_conversation(history):
"""
uses GPT to summarize a conversation between various participants
"""
if len(history) > 0:
history = [f'{conv["user"]}: {conv["text"]}' for conv in history]
history = trim_list_by_tokens(history, get_prompt_template("summarize"))
return call_gpt(get_prompt_template("summarize", {"history": history}))
return None
def chat_gpt(history, channel):
"""
calls GPT with context, so it behaves like chat
"""
history = [f'{conv["user"]}: {conv["text"]}' for conv in history]
history = trim_list_by_tokens(history, get_prompt_template("bot_mention"))
prompt = get_prompt_template(
"bot_mention", {"history": history, "channel": channel}
)
response = call_gpt(prompt)
return response
def call_gpt(prompt, model=OPENAI_MODEL):
"""
calls GPT
"""
response = openai.ChatCompletion.create(
model=model,
messages=[
{"role": "system", "content": get_prompt_template("system")},
{"role": "user", "content": prompt},
],
)
return response.choices[0].message.content
def get_prompt_template(template, params=None):
"""
Params need to be a dict with keys that match template
"""
content = PROMPTS[template]
if isinstance(params, dict):
content = content.format(**params)
return content
PROMPTS = {
"system": """You are a helpful assistant named scopy that works for a venture capital company named ScOp VC.""",
"bot_mention": """You are a helpful assistant that is part of a venture capital firm, named ScOp. Your name is scopy.
Below you will see a conversation in a particular slack channel among several members of this veture
capital firm named ScOp. The next time someone asks a question, please consider the existing conversation
context and provide a helpful answer. Sometimes, the answer will be unrelated to the conversation,
in which case you should come up with your own. The name of the channel is {channel}.
The channel name might be related to one of the companies in ScOp's portfolio. Always be nice and respecful,
but if the person asking the question is named Kevin, answer accurately but in a funny tone, as if you are
exasperated because Kevin is always asking silly questions. The last paragraph is the user's most recent
question, which is intended for you scopy. \n\n {history} \n\n###\n\n """,
"query": """Please regard the following table, named "{table}" with the following schema: \n###\n ```\n {schema} \n``` \n###\n
Here's what the data looks like: \n###\n ```\n {data} \n``` \n###\n
Write a SQL query, without any explanation of any kind, just the SQL query,
to answer the following question: {question}""",
"summarize": """Please write a thorough summary the following conversation. There are many participants.
Each time a participant speaks, a paragraph with start with the person's name.
The conversation follows below: \n\n###\n\n {history}"""
}
if __name__ == "__main__":
handler = SocketModeHandler(app, SLACK_APP_TOKEN)
handler.start()