top of page
Search

Building a Chatbot API with AWS Chalice: Part 3 - Maintaining Chat Memory with DynamoDB

  • Carlo Sansonetti
  • 19 hours ago
  • 8 min read

Welcome to the final part of our series on building a serverless chatbot API! In Part 1, we set up our Chalice project and deployed a simple API. In Part 2, we integrated AWS Bedrock to give our chatbot AI capabilities. Now, we'll add the missing piece: conversation memory using DynamoDB.

Why Conversation Memory Matters

A chatbot without memory is like meeting someone with amnesia every time you talk to them. When I was rebuilding CGCircuit's customer support system, I found that maintaining conversation context was critical for several reasons:

  1. Continuity - Users can reference previous questions or answers

  2. Personalization - The chatbot can adapt to the user's knowledge level and interests

  3. Efficiency - Users don't need to repeat context in every message

  4. Analytics - Historical conversations provide insights for improvement


Why DynamoDB?

I chose DynamoDB for CGCircuit's chatbot conversation storage because:

  1. Serverless - Pairs perfectly with our Chalice API and has no servers to manage

  2. Low latency - Consistently fast response times, which is crucial for interactive applications

  3. Scalability - Handles any volume of conversations without performance degradation

  4. Cost-effective - Pay only for what you use with predictable pricing

  5. Flexible schema - Easily adapts as conversation storage requirements evolve


Setting Up IAM Permissions

First, let's update our IAM permissions to allow DynamoDB access. Update your .chalice/policy.json file:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "bedrock:InvokeModel",
        "bedrock:InvokeModelWithResponseStream"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:Query",
        "dynamodb:Scan"
      ],
      "Resource": "arn:aws:dynamodb:*:*:table/ChatbotConversations"
    }
  ]
}

Adding DynamoDB Table Creation to Our Deployment

Let's update the .chalice/config.json file to include DynamoDB resource creation:

{
  "version": "2.0",
  "app_name": "chatbot-api",
  "stages": {
    "dev": {
      "api_gateway_stage": "api",
      "environment_variables": {
        "STAGE": "dev",
        "CONVERSATIONS_TABLE": "ChatbotConversations-dev"
      },
      "resources": {
        "dynamodb_tables": {
          "conversations": {
            "name": "ChatbotConversations-dev",
            "hash_key": "conversation_id",
            "read_capacity": 5,
            "write_capacity": 5
          }
        }
      }
    },
    "prod": {
      "api_gateway_stage": "api",
      "environment_variables": {
        "STAGE": "prod",
        "CONVERSATIONS_TABLE": "ChatbotConversations-prod"
      },
      "resources": {
        "dynamodb_tables": {
          "conversations": {
            "name": "ChatbotConversations-prod",
            "hash_key": "conversation_id",
            "read_capacity": 10,
            "write_capacity": 10
          }
        }
      }
    }
  }
}

Implementing DynamoDB Integration

Now, let's create our DynamoDB handler in the chalicelib/dynamodb.py file:

import os
import uuid
import json
import boto3
from datetime import datetime, timedelta
from decimal import Decimal

# Initialize DynamoDB client
dynamodb = boto3.resource('dynamodb')
table_name = os.environ.get('CONVERSATIONS_TABLE', 'ChatbotConversations-dev')
conversations_table = dynamodb.Table(table_name)

class DecimalEncoder(json.JSONEncoder):
    """Helper class to convert Decimal objects to floats for JSON serialization"""
    def default(self, o):
        if isinstance(o, Decimal):
            return float(o)
        return super(DecimalEncoder, self).default(o)

def create_conversation(user_id):
    """
    Create a new conversation
    
    Args:
        user_id (str): Identifier for the user
        
    Returns:
        str: The conversation ID
    """
    conversation_id = str(uuid.uuid4())
    timestamp = datetime.utcnow().isoformat()
    
    conversations_table.put_item(
        Item={
            'conversation_id': conversation_id,
            'user_id': user_id,
            'created_at': timestamp,
            'updated_at': timestamp,
            'messages': [],
            'metadata': {
                'message_count': 0
            }
        }
    )
    
    return conversation_id

def get_conversation(conversation_id):
    """
    Retrieve a conversation by ID
    
    Args:
        conversation_id (str): The conversation ID
        
    Returns:
        dict: The conversation data or None if not found
    """
    response = conversations_table.get_item(
        Key={
            'conversation_id': conversation_id
        }
    )
    
    return response.get('Item')

def add_message_to_conversation(conversation_id, role, content):
    """
    Add a message to an existing conversation
    
    Args:
        conversation_id (str): The conversation ID
        role (str): The message sender role (user/assistant)
        content (str): The message content
        
    Returns:
        bool: True if successful, False otherwise
    """
    timestamp = datetime.utcnow().isoformat()
    
    try:
        response = conversations_table.update_item(
            Key={
                'conversation_id': conversation_id
            },
            UpdateExpression="SET messages = list_append(messages, :message), "
                            "metadata.message_count = metadata.message_count + :count, "
                            "updated_at = :timestamp",
            ExpressionAttributeValues={
                ':message': [{
                    'role': role,
                    'content': content,
                    'timestamp': timestamp
                }],
                ':count': 1,
                ':timestamp': timestamp
            },
            ReturnValues="UPDATED_NEW"
        )
        return True
    except Exception as e:
        print(f"Error adding message to conversation: {str(e)}")
        return False

def get_conversation_history(conversation_id, max_messages=10):
    """
    Get the recent conversation history
    
    Args:
        conversation_id (str): The conversation ID
        max_messages (int): Maximum number of messages to return
        
    Returns:
        list: List of messages or empty list if conversation not found
    """
    conversation = get_conversation(conversation_id)
    
    if not conversation or 'messages' not in conversation:
        return []
    
    messages = conversation['messages']
    # Return the most recent messages up to max_messages
    return messages[-max_messages:] if len(messages) > max_messages else messages

def format_history_for_bedrock(messages):
    """
    Format conversation history for Bedrock
    
    Args:
        messages (list): List of message objects
        
    Returns:
        list: List of formatted messages for Bedrock
    """
    formatted_messages = []
    
    for message in messages:
        formatted_messages.append({
            "role": message["role"],
            "content": message["content"]
        })
    
    return formatted_messages

def list_user_conversations(user_id, limit=10):
    """
    List conversations for a specific user
    
    Args:
        user_id (str): The user ID
        limit (int): Maximum number of conversations to return
        
    Returns:
        list: List of conversation summaries
    """
    # Note: In a production system, you would use a GSI for this query
    # This is a simple scan with a filter which is not efficient for large tables
    response = conversations_table.scan(
        FilterExpression='user_id = :user_id',
        ExpressionAttributeValues={
            ':user_id': user_id
        },
        Limit=limit
    )
    
    conversations = []
    for item in response.get('Items', []):
        conversations.append({
            'conversation_id': item['conversation_id'],
            'created_at': item['created_at'],
            'updated_at': item['updated_at'],
            'message_count': item['metadata']['message_count']
        })
    
    # Sort by updated_at in descending order (newest first)
    conversations.sort(key=lambda x: x['updated_at'], reverse=True)
    
    return conversations[:limit]

def delete_conversation(conversation_id):
    """
    Delete a conversation
    
    Args:
        conversation_id (str): The conversation ID
        
    Returns:
        bool: True if successful, False otherwise
    """
    try:
        conversations_table.delete_item(
            Key={
                'conversation_id': conversation_id
            }
        )
        return True
    except Exception as e:
        print(f"Error deleting conversation: {str(e)}")
        return False

Updating the Bedrock Integration

Now we need to update our Bedrock integration to use the conversation history. Let's modify the chalicelib/bedrock.py file:


from chalicelib import dynamodb

# Update the generate_response function
def generate_response(message, conversation_id=None, user_id=None, model_id=DEFAULT_MODEL, max_tokens=500, system_prompt=None):
    """
    Generate a response using AWS Bedrock with conversation history
    
    Args:
        message (str): The user message to respond to
        conversation_id (str, optional): The conversation ID for history
        user_id (str, optional): The user ID (required if creating a new conversation)
        model_id (str, optional): The model ID to use
        max_tokens (int, optional): Maximum tokens in response
        system_prompt (str, optional): Instructions for the AI model
        
    Returns:
        dict: The response containing the AI message and conversation_id
    """
    # Create or retrieve conversation
    if not conversation_id and user_id:
        conversation_id = dynamodb.create_conversation(user_id)
        history = []
    elif conversation_id:
        history = dynamodb.get_conversation_history(conversation_id)
        if not history:
            # If conversation doesn't exist but ID was provided
            if not user_id:
                raise ValueError("User ID is required to create a new conversation")
            conversation_id = dynamodb.create_conversation(user_id)
            history = []
    else:
        # No history tracking, just one-off response
        history = []
    
    # Add the new user message to the history if we're tracking
    if conversation_id:
        dynamodb.add_message_to_conversation(conversation_id, "user", message)
    
    # Format the history for Bedrock
    formatted_history = dynamodb.format_history_for_bedrock(history) if history else None
    
    # Generate response based on the model
    if model_id.startswith("anthropic.claude"):
        prompt = format_claude_prompt(message, formatted_history, system_prompt)
        response_data = invoke_claude(prompt, model_id, max_tokens)
        ai_response = parse_claude_response(response_data)
    else:
        # Add support for other models as needed
        raise ValueError(f"Unsupported model: {model_id}")
    
    # Add the AI response to the conversation history
    if conversation_id:
        dynamodb.add_message_to_conversation(conversation_id, "assistant", ai_response)
    
    return {
        "response": ai_response,
        "conversation_id": conversation_id
    }

# Update the format_claude_prompt function
def format_claude_prompt(message, history=None, system_prompt=None):
    """Format the prompt for Claude models with history"""
    default_system = "You are a helpful AI assistant that provides clear, concise information about technical topics."
    
    if system_prompt is None:
        system_prompt = default_system
    
    # Claude 3 format with messages array
    request_body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 500,
        "system": system_prompt,
        "messages": []
    }
    
    # Add history if available
    if history and len(history) > 0:
        request_body["messages"] = history
    
    # Add the current message
    request_body["messages"].append({"role": "user", "content": message})
    
    return request_body

Updating the API Endpoints

Finally, let's update our app.py file to include conversation endpoints:

from chalice import Chalice, Response
import json
import uuid
from chalicelib import bedrock, dynamodb

app = Chalice(app_name='chatbot-api')

@app.route('/')
def index():
    return {'message': 'Welcome to the Chatbot API'}

@app.route('/health')
def health_check():
    return {'status': 'healthy'}

@app.route('/chat', methods=['POST'])
def chat():
    request_body = app.current_request.json_body
    
    if not request_body or 'message' not in request_body:
        return Response(
            body=json.dumps({'error': 'Message is required'}),
            status_code=400,
            headers={'Content-Type': 'application/json'}
        )
    
    message = request_body['message']
    conversation_id = request_body.get('conversation_id')
    user_id = request_body.get('user_id')
    model_id = request_body.get('model_id', bedrock.DEFAULT_MODEL)
    system_prompt = request_body.get('system_prompt')
    
    # If no conversation_id but we want to save history
    if not conversation_id and request_body.get('save_history', False):
        if not user_id:
            user_id = f"anonymous-{str(uuid.uuid4())}"
    
    try:
        result = bedrock.generate_response(
            message, 
            conversation_id=conversation_id,
            user_id=user_id,
            model_id=model_id,
            system_prompt=system_prompt
        )
        return result
    except Exception as e:
        return Response(
            body=json.dumps({'error': str(e)}),
            status_code=500,
            headers={'Content-Type': 'application/json'}
        )

@app.route('/conversations', methods=['GET'])
def list_conversations():
    user_id = app.current_request.query_params.get('user_id')
    
    if not user_id:
        return Response(
            body=json.dumps({'error': 'User ID is required'}),
            status_code=400,
            headers={'Content-Type': 'application/json'}
        )
    
    limit = int(app.current_request.query_params.get('limit', 10))
    conversations = dynamodb.list_user_conversations(user_id, limit)
    
    return {
        'conversations': conversations
    }

@app.route('/conversations/{conversation_id}', methods=['GET'])
def get_conversation():
    conversation_id = app.current_request.uri_params['conversation_id']
    conversation = dynamodb.get_conversation(conversation_id)
    
    if not conversation:
        return Response(
            body=json.dumps({'error': 'Conversation not found'}),
            status_code=404,
            headers={'Content-Type': 'application/json'}
        )
    
    return {
        'conversation': conversation
    }

@app.route('/conversations/{conversation_id}', methods=['DELETE'])
def delete_conversation():
    conversation_id = app.current_request.uri_params['conversation_id']
    success = dynamodb.delete_conversation(conversation_id)
    
    if not success:
        return Response(
            body=json.dumps({'error': 'Failed to delete conversation'}),
            status_code=500,
            headers={'Content-Type': 'application/json'}
        )
    
    return {
        'message': 'Conversation deleted successfully'
    }

@app.route('/conversations', methods=['POST'])
def create_conversation():
    request_body = app.current_request.json_body
    
    if not request_body or 'user_id' not in request_body:
        return Response(
            body=json.dumps({'error': 'User ID is required'}),
            status_code=400,
            headers={'Content-Type': 'application/json'}
        )
    
    user_id = request_body['user_id']
    conversation_id = dynamodb.create_conversation(user_id)
    
    return {
        'conversation_id': conversation_id
    }

Testing the Conversation Flow

Let's test our conversation API locally:

chalice local

First, let's create a new conversation:

curl -X POST http://127.0.0.1:8000/conversations \
  -H "Content-Type: application/json" \
  -d '{"user_id": "test-user-123"}'

You should receive a response with a conversation_id.

Now, let's send a message using that conversation ID:

curl -X POST http://127.0.0.1:8000/chat \
  -H "Content-Type: application/json" \
  -d '{
    "message": "What is the capital of Italy?",
    "conversation_id": "YOUR_CONVERSATION_ID",
    "user_id": "test-user-123"
  }'

You'll get a response from the chatbot, and the message will be saved to your conversation history.

Let's send a follow-up question that references the previous conversation:

curl -X POST http://127.0.0.1:8000/chat \
  -H "Content-Type: application/json" \
  -d '{
    "message": "How does it compare to Paris in terms of population?",
    "conversation_id": "YOUR_CONVERSATION_ID",
    "user_id": "test-user-123"
  }'

Notice that the chatbot remembers the context of the previous question and can answer the follow-up appropriately!

To retrieve the entire conversation:

curl http://127.0.0.1:8000/conversations/YOUR_CONVERSATION_ID

Deploying to AWS

Now let's deploy our complete chatbot API to AWS:

chalice deploy

After deployment, you can use the provided endpoint to interact with your chatbot API in the same way we tested locally.


Implementing TTL for Conversations (Optional)

For production use, you might want to automatically delete old conversations to save on storage costs. DynamoDB provides a Time-to-Live (TTL) feature for this purpose.

To implement TTL:

  1. Add a TTL attribute to your conversations table definition in .chalice/config.json:

"dynamodb_tables": {
  "conversations": {
    "name": "ChatbotConversations-dev",
    "hash_key": "conversation_id",
    "read_capacity": 5,
    "write_capacity": 5,
    "ttl": {
      "attribute_name": "ttl",
      "enabled": true
    }
  }
}
  1. Update the create_conversation function in chalicelib/dynamodb.py to include a TTL timestamp (as we already did in our implementation).

  2. You may also want to update the TTL when a conversation is updated:

def add_message_to_conversation(conversation_id, role, content):
    """Add a message to an existing conversation"""
    timestamp = datetime.utcnow().isoformat()
    ttl_seconds = int((datetime.utcnow() + timedelta(days=30)).timestamp())
    
    try:
        response = conversations_table.update_item(
            Key={
                'conversation_id': conversation_id
            },
            UpdateExpression="SET messages = list_append(messages, :message), "
                            "metadata.message_count = metadata.message_count + :count, "
                            "updated_at = :timestamp, "
                            "ttl = :ttl_value",
            ExpressionAttributeValues={
                ':message': [{
                    'role': role,
                    'content': content,
                    'timestamp': timestamp
                }],
                ':count': 1,
                ':timestamp': timestamp,
                ':ttl_value': ttl_seconds
            },
            ReturnValues="UPDATED_NEW"
        )
        return True
    except Exception as e:
        print(f"Error adding message to conversation: {str(e)}")
        return False
def create_conversation(user_id):
    """
    Create a new conversation
    
    Args:
        user_id (str): Identifier for the user
        
    Returns:
        str: The conversation ID
    """
    conversation_id = str(uuid.uuid4())
    timestamp = datetime.utcnow().isoformat()
    
    # Calculate TTL (30 days from now)
    ttl_seconds = int((datetime.utcnow() + timedelta(days=30)).timestamp())
    
    conversations_table.put_item(
        Item={
            'conversation_id': conversation_id,
            'user_id': user_id,
            'created_at': timestamp,
            'updated_at': timestamp,
            'ttl': ttl_seconds,
            'messages': [],
            'metadata': {
                'message_count': 0
            }
        }
    )
    
    return conversation_id

Conclusion and Next Steps

In this three-part series, we've built a complete, production-ready chatbot API using AWS Chalice, Bedrock, and DynamoDB. Our chatbot can:


  1. Process user messages through a serverless API

  2. Generate intelligent responses using AWS Bedrock models

  3. Maintain conversation context using DynamoDB

  4. Scale automatically to handle any load

  5. Manage conversation history with TTL for cost optimization


This architecture provides a solid foundation for building more advanced chatbot features, such as:

  1. User Authentication - Implement secure user authentication using AWS Cognito

  2. Multi-Language Support - Detect and respond in multiple languages

  3. Conversation Analytics - Track metrics about conversation quality, duration, and outcomes

  4. Sentiment Analysis - Analyze user sentiment to adapt responses

  5. Knowledge Base Integration - Connect to a knowledge base for domain-specific information

The CGCircuit chatbot I built using this architecture has significantly improved our vendor support experience, reducing response times and handling over 70% of common inquiries without human intervention.

I hope this series has given you a solid foundation for building your own chatbot API. If you have questions or want to share your implementation, leave a comment below!

Comments


AI Automation Expert
  • Improve operational performance for small and medium-sized businesses

  • Identify pain points and uncover opportunities for efficiency gains

  • Design and implement tailored no-code automation solutions

  • Develop, deploy, and maintain fully custom-coded automation systems

Cloud Development
  • Migrate local systems to the cloud

  • Build custom cloud-based tools

  • Optimize cloud storage and costs

  • Design secure, scalable architectures

Leadership Career Coach
  • Migrate local systems to the cloud

  • Build custom cloud-based tools

  • Optimize cloud storage and costs

  • Design secure, scalable architectures

bottom of page