-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathchat.py
439 lines (370 loc) · 17.4 KB
/
chat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# Copyright (c) Microsoft. All rights reserved.
# An example for building a simple chat assistant using the AssistantApp from
# the semantic-workbench-assistant package.
#
# This example demonstrates how to use the AssistantApp to create a chat assistant,
# to add additional configuration fields and UI schema for the configuration fields,
# and to handle conversation events to respond to messages in the conversation.
# region Required
#
# The code in this region demonstrates the minimal code required to create a chat assistant
# using the AssistantApp class from the semantic-workbench-assistant package. This code
# demonstrates how to create an AssistantApp instance, define the service ID, name, and
# description, and create the FastAPI app instance. Start here to build your own chat
# assistant using the AssistantApp class.
#
# The code that follows this region is optional and demonstrates how to add event handlers
# to respond to conversation events. You can use this code as a starting point for building
# your own chat assistant with additional functionality.
#
import logging
import re
from typing import Any
import deepmerge
import openai_client
import tiktoken
from content_safety.evaluators import CombinedContentSafetyEvaluator
from openai.types.chat import ChatCompletionMessageParam
from semantic_workbench_api_model.workbench_model import (
ConversationEvent,
ConversationMessage,
MessageType,
NewConversationMessage,
UpdateParticipant,
)
from semantic_workbench_assistant.assistant_app import (
AssistantApp,
BaseModelAssistantConfig,
ContentSafety,
ContentSafetyEvaluator,
ConversationContext,
)
from .config import AssistantConfigModel
logger = logging.getLogger(__name__)
#
# define the service ID, name, and description
#
# the service id to be registered in the workbench to identify the assistant
service_id = "python-02-simple-chatbot.workbench-explorer"
# the name of the assistant service, as it will appear in the workbench UI
service_name = "Python Example 02: Simple Chatbot"
# a description of the assistant service, as it will appear in the workbench UI
service_description = "A simple OpenAI chat assistant using the Semantic Workbench Assistant SDK."
#
# create the configuration provider, using the extended configuration model
#
assistant_config = BaseModelAssistantConfig(AssistantConfigModel)
# define the content safety evaluator factory
async def content_evaluator_factory(context: ConversationContext) -> ContentSafetyEvaluator:
config = await assistant_config.get(context.assistant)
return CombinedContentSafetyEvaluator(config.content_safety_config)
content_safety = ContentSafety(content_evaluator_factory)
# create the AssistantApp instance
assistant = AssistantApp(
assistant_service_id=service_id,
assistant_service_name=service_name,
assistant_service_description=service_description,
config_provider=assistant_config.provider,
content_interceptor=content_safety,
)
#
# create the FastAPI app instance
#
app = assistant.fastapi_app()
# endregion
# region Optional
#
# Note: The code in this region is specific to this example and is not required for a basic assistant.
#
# The AssistantApp class provides a set of decorators for adding event handlers to respond to conversation
# events. In VS Code, typing "@assistant." (or the name of your AssistantApp instance) will show available
# events and methods.
#
# See the semantic-workbench-assistant AssistantApp class for more information on available events and methods.
# Examples:
# - @assistant.events.conversation.on_created (event triggered when the assistant is added to a conversation)
# - @assistant.events.conversation.participant.on_created (event triggered when a participant is added)
# - @assistant.events.conversation.message.on_created (event triggered when a new message of any type is created)
# - @assistant.events.conversation.message.chat.on_created (event triggered when a new chat message is created)
#
@assistant.events.conversation.message.chat.on_created
async def on_message_created(
context: ConversationContext, event: ConversationEvent, message: ConversationMessage
) -> None:
"""
Handle the event triggered when a new chat message is created in the conversation.
**Note**
- This event handler is specific to chat messages.
- To handle other message types, you can add additional event handlers for those message types.
- @assistant.events.conversation.message.log.on_created
- @assistant.events.conversation.message.command.on_created
- ...additional message types
- To handle all message types, you can use the root event handler for all message types:
- @assistant.events.conversation.message.on_created
"""
# update the participant status to indicate the assistant is thinking
await context.update_participant_me(UpdateParticipant(status="thinking..."))
try:
# replace the following with your own logic for processing a message created event
await respond_to_conversation(
context,
message=message,
metadata={"debug": {"content_safety": event.data.get(content_safety.metadata_key, {})}},
)
finally:
# update the participant status to indicate the assistant is done thinking
await context.update_participant_me(UpdateParticipant(status=None))
# Handle the event triggered when the assistant is added to a conversation.
@assistant.events.conversation.on_created
async def on_conversation_created(context: ConversationContext) -> None:
"""
Handle the event triggered when the assistant is added to a conversation.
"""
# replace the following with your own logic for processing a conversation created event
# get the assistant's configuration
config = await assistant_config.get(context.assistant)
# get the welcome message from the assistant's configuration
welcome_message = config.welcome_message
# send the welcome message to the conversation
await context.send_messages(
NewConversationMessage(
content=welcome_message,
message_type=MessageType.chat,
metadata={"generated_content": False},
)
)
# endregion
# region Custom
#
# This code was added specifically for this example to demonstrate how to respond to conversation
# messages using the OpenAI API. For your own assistant, you could replace this code with your own
# logic for responding to conversation messages and add any additional functionality as needed.
#
# demonstrates how to respond to a conversation message using the OpenAI API.
async def respond_to_conversation(
context: ConversationContext, message: ConversationMessage, metadata: dict[str, Any] = {}
) -> None:
"""
Respond to a conversation message.
"""
# define the metadata key for any metadata created within this method
method_metadata_key = "respond_to_conversation"
# get the assistant's configuration, supports overwriting defaults from environment variables
config = await assistant_config.get(context.assistant)
# get the list of conversation participants
participants_response = await context.get_participants(include_inactive=True)
# establish a token to be used by the AI model to indicate no response
silence_token = "{{SILENCE}}"
# create a system message, start by adding the guardrails prompt
system_message_content = config.guardrails_prompt
# add the instruction prompt and the assistant name
system_message_content += f'\n\n{config.instruction_prompt}\n\nYour name is "{context.assistant.name}".'
# if this is a multi-participant conversation, add a note about the participants
if len(participants_response.participants) > 2:
system_message_content += (
"\n\n"
f"There are {len(participants_response.participants)} participants in the conversation,"
" including you as the assistant and the following users:"
+ ",".join([
f' "{participant.name}"'
for participant in participants_response.participants
if participant.id != context.assistant.id
])
+ "\n\nYou do not need to respond to every message. Do not respond if the last thing said was a closing"
" statement such as 'bye' or 'goodbye', or just a general acknowledgement like 'ok' or 'thanks'. Do not"
f' respond as another user in the conversation, only as "{context.assistant.name}".'
" Sometimes the other users need to talk amongst themselves and that is ok. If the conversation seems to"
f' be directed at you or the general audience, go ahead and respond.\n\nSay "{silence_token}" to skip'
" your turn."
)
# create the completion messages for the AI model and add the system message
completion_messages: list[ChatCompletionMessageParam] = [
{
"role": "system",
"content": system_message_content,
}
]
# get the current token count and track the tokens used as messages are added
current_tokens = 0
# add the token count for the system message
current_tokens += get_token_count(system_message_content)
# consistent formatter that includes the participant name for multi-participant and name references
def format_message(message: ConversationMessage) -> str:
# get the participant name for the message sender
conversation_participant = next(
(
participant
for participant in participants_response.participants
if participant.id == message.sender.participant_id
),
None,
)
participant_name = conversation_participant.name if conversation_participant else "unknown"
# format the message content with the participant name and message timestamp
message_datetime = message.timestamp.strftime("%Y-%m-%d %H:%M:%S")
return f"[{participant_name} - {message_datetime}]: {message.content}"
# get messages before the current message
messages_response = await context.get_messages(before=message.id)
messages = messages_response.messages + [message]
# create a list of the recent chat history messages to send to the AI model
history_messages: list[ChatCompletionMessageParam] = []
# iterate over the messages in reverse order to get the most recent messages first
for message in reversed(messages):
# add the token count for the message and check if the token limit has been reached
message_tokens = get_token_count(format_message(message))
current_tokens += message_tokens
if current_tokens > config.request_config.max_tokens - config.request_config.response_tokens:
# if the token limit has been reached, stop adding messages
break
# add the message to the history messages
if message.sender.participant_id == context.assistant.id:
# this is an assistant message
history_messages.append({
"role": "assistant",
"content": format_message(message),
})
else:
# this is a user message
history_messages.append({
"role": "user",
"content": format_message(message),
})
# reverse the history messages to send the most recent messages first
history_messages.reverse()
# add the history messages to the completion messages
completion_messages.extend(history_messages)
# evaluate the content for safety
# disabled because the OpenAI and Azure OpenAI services already have content safety checks
# and we are more interested in running the generated responses through the content safety checks
# which are being handled by the content safety interceptor on the assistant
# this code is therefore included here for reference on how to call the content safety evaluator
# from within the assistant code
# content_evaluator = await content_evaluator_factory(context)
# evaluation = await content_evaluator.evaluate([message.content for message in messages])
# deepmerge.always_merger.merge(
# metadata,
# {
# "debug": {
# f"{assistant.content_interceptor.metadata_key}": {
# f"{method_metadata_key}": {
# "evaluation": evaluation.model_dump(),
# },
# },
# },
# },
# )
# if evaluation.result == ContentSafetyEvaluationResult.Fail:
# # send a notice to the user that the content safety evaluation failed
# deepmerge.always_merger.merge(
# metadata,
# {"generated_content": False},
# )
# await context.send_messages(
# NewConversationMessage(
# content=evaluation.note or "Content safety evaluation failed.",
# message_type=MessageType.notice,
# metadata=metadata,
# )
# )
# return
# generate a response from the AI model
async with openai_client.create_client(config.service_config, api_version="2024-06-01") as client:
try:
# call the OpenAI chat completion endpoint to get a response
completion = await client.chat.completions.create(
messages=completion_messages,
model=config.request_config.openai_model,
max_tokens=config.request_config.response_tokens,
)
# get the content from the completion response
content = completion.choices[0].message.content
# merge the completion response into the passed in metadata
deepmerge.always_merger.merge(
metadata,
{
"debug": {
f"{method_metadata_key}": {
"request": {
"model": config.request_config.openai_model,
"messages": completion_messages,
"max_tokens": config.request_config.response_tokens,
},
"response": completion.model_dump() if completion else "[no response from openai]",
},
}
},
)
except Exception as e:
logger.exception(f"exception occurred calling openai chat completion: {e}")
# if there is an error, set the content to an error message
content = "An error occurred while calling the OpenAI API. Is it configured correctly?"
# merge the error into the passed in metadata
deepmerge.always_merger.merge(
metadata,
{
"debug": {
f"{method_metadata_key}": {
"request": {
"model": config.request_config.openai_model,
"messages": completion_messages,
},
"error": str(e),
},
}
},
)
# set the message type based on the content
message_type = MessageType.chat
# various behaviors based on the content
if content:
# strip out the username from the response
if content.startswith("["):
content = re.sub(r"\[.*\]:\s", "", content)
# check for the silence token, in case the model chooses not to respond
# model sometimes puts extra spaces in the response, so remove them
# when checking for the silence token
if content.replace(" ", "") == silence_token:
# normal behavior is to not respond if the model chooses to remain silent
# but we can override this behavior for debugging purposes via the assistant config
if config.enable_debug_output:
# update the metadata to indicate the assistant chose to remain silent
deepmerge.always_merger.merge(
metadata,
{
"debug": {
f"{method_metadata_key}": {
"silence_token": True,
},
},
"attribution": "debug output",
"generated_content": False,
},
)
# send a notice to the user that the assistant chose to remain silent
await context.send_messages(
NewConversationMessage(
message_type=MessageType.notice,
content="[assistant chose to remain silent]",
metadata=metadata,
)
)
return
# override message type if content starts with "/", indicating a command response
if content.startswith("/"):
message_type = MessageType.command_response
# send the response to the conversation
await context.send_messages(
NewConversationMessage(
content=content or "[no response from openai]",
message_type=message_type,
metadata=metadata,
)
)
# this method is used to get the token count of a string.
def get_token_count(string: str) -> int:
"""
Get the token count of a string.
"""
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(string))
# endregion