Source code for awstin.dynamodb.table

import os

import boto3
from botocore.exceptions import ClientError

from awstin.config import aws_config
from awstin.constants import TEST_DYNAMODB_ENDPOINT
from awstin.dynamodb.utils import to_decimal

# Testing parameter to change table listing page size
_PAGE_SIZE = 100


[docs]class DynamoDB: """ A client for use of DynamoDB via awstin. Tables are accessed via data models. See documentation for details. """ def __init__(self, timeout=5.0, max_retries=3): """ Parameters ---------- timeout : float, optional Timeout for establishing a connection to DynamoDB (default 5.0) max_retries : int, optional Max retries for establishing a connection to DynamoDB (default 3) Raises ------ EnvironmentError If neither the TEST_DYNAMODB_ENDPOINT or AWS_REGION environment variables are set """ self.timeout = timeout self.max_retries = max_retries test_endpoint = os.environ.get(TEST_DYNAMODB_ENDPOINT) self.config = aws_config( timeout=timeout, max_retries=max_retries, endpoint=test_endpoint, ) self.client = boto3.client("dynamodb", **self.config) self.resource = boto3.resource("dynamodb", **self.config)
[docs] def list_tables(self): """ Return a list of all table names in this DynamoDB instance. Returns ------- list of str Table names """ response = self.client.list_tables(Limit=_PAGE_SIZE) tables = response["TableNames"] while "LastEvaluatedTableName" in response: response = self.client.list_tables( Limit=_PAGE_SIZE, ExclusiveStartTableName=response["LastEvaluatedTableName"], ) tables.extend(response["TableNames"]) return tables
def __getitem__(self, data_model): """ Indexed access to DynamoDB tables via Python data models. Returns ------- Table the dynamodb table, if it exists. """ return Table(self, data_model)
[docs]class Table: """ Interface to a DynamoDB table. Items can be retrieved from the table by a shorthand depending on the primary key. If it's only a partition key, items can be retrieved by the value of the partition key: ``my_table["hashval"]`` If it's a partition and sort key, items can be retrived by a hashkey, sortkey tuple: ``my_table["hashval", 123]`` Items can also be retrieved in a dict-like way: ``my_table[{"HashKeyName": "hashval", "SortKeyName": 123}]`` """ def __init__(self, dynamodb_client, data_model): """ Paramters --------- client : DynamoDB DynamoDB client data_model : DynamoDB Table Data model for interfacing with the table's contents """ self.data_model = data_model self.name = data_model._table_name_ self._dynamodb = dynamodb_client self._boto3_table = dynamodb_client.resource.Table(self.name) def _get_primary_key(self, key): if isinstance(key, dict): key = {k: to_decimal(v) for k, v in key.items()} primary_key = key else: table_description = self._dynamodb.client.describe_table( TableName=self.name, ) (partition_key,) = [ entry["AttributeName"] for entry in table_description["Table"]["KeySchema"] if entry["KeyType"] == "HASH" ] if isinstance(key, tuple): (sort_key,) = [ entry["AttributeName"] for entry in table_description["Table"]["KeySchema"] if entry["KeyType"] == "RANGE" ] primary_key = { partition_key: to_decimal(key[0]), sort_key: to_decimal(key[1]), } else: primary_key = {partition_key: to_decimal(key)} return primary_key def __getitem__(self, key): """ Get an item, given either a primary key as a dict, or given simply the value of the partition key if there is no sort key Parameters ---------- key : Any Primary key, specified as a hash key value, composite key tuple, or a dict """ primary_key = self._get_primary_key(key) item = self._boto3_table.get_item( Key=primary_key, **self.data_model._dynamo_projection(), )["Item"] return self.data_model.deserialize(item)
[docs] def put_item(self, item): """ Put an item in the table Parameters ---------- item : DynamoModel The item to put in the table """ data = item.serialize() return self._boto3_table.put_item(Item=data)
[docs] def update_item(self, key, update_expression, condition_expression=None): """ Update an item in the table given an awstin update expression. Can optionally have a condition expression. Parameters --------- key : Any Primary key, specified as a hash key value, composite key tuple, or a dict update_expression : awstin.dynamodb.orm.UpdateOperator Update expression. See docs for construction. condition_expression : Query, optional Optional condition expression Returns ------- DynamoModel or None Updated model, or None if the condition expression fails """ boto_query = dict( Key=self._get_primary_key(key), ReturnValues="ALL_NEW", **update_expression.serialize(), ) if condition_expression: boto_query["ConditionExpression"] = condition_expression try: result = self._boto3_table.update_item(**boto_query) except ClientError as e: if "ConditionalCheckFailedException" in str(e): return None else: raise e return self.data_model.deserialize(result["Attributes"])
[docs] def delete_item(self, key, condition_expression=None): """ Delete an item, given either a primary key as a dict, or given simply the value of the partition key if there is no sort key Parameters ---------- key : Any Primary key of the entry to delete, specified as a hash key value, composite key tuple, or a dict condition_expression : Query, optional Optional condition expression for the delete, intended to make the operation idempotent Returns ------- deleted : bool True if the delete, False if the condition was not satisfied Raises ------ botocore.exceptions.ClientError If there's an error in the request. """ primary_key = self._get_primary_key(key) condition_kwargs = ( {"ConditionExpression": condition_expression} if condition_expression is not None else {} ) try: self._boto3_table.delete_item( Key=primary_key, **condition_kwargs, ) return True except ClientError as e: if "ConditionalCheckFailedException" in str(e): return False else: raise e
[docs] def scan(self, scan_filter=None): """ Yield items in from the table, optionally matching the given filter expression. Lazily paginates items internally. Parameters ---------- scan_filter : Query An optional query constructed with awstin's query framework Yields ------ item : DynamoModel An item in the table matching the filter """ filter_kwargs = {} if scan_filter is not None: filter_kwargs["FilterExpression"] = scan_filter results = self._boto3_table.scan( **filter_kwargs, **self.data_model._get_kwargs(), ) items = [self.data_model.deserialize(item) for item in results["Items"]] yield from items while "LastEvaluatedKey" in results: results = self._boto3_table.scan( ExclusiveStartKey=results["LastEvaluatedKey"], **filter_kwargs, **self.data_model._get_kwargs(), ) items = [self.data_model.deserialize(item) for item in results["Items"]] yield from items
[docs] def query(self, query_expression, filter_expression=None): """ Yield items from the table matching some query expression and optional filter expression. Lazily paginates items internally. Parameters ---------- query_expression : Query A Key query constructed with awstin's query syntax filter_expression : Query An additional post-query filter expression constructed with awstin's query syntax Yields ------ item : DynamoModel An item in the table matching thw query """ query_kwargs = {} query_kwargs["KeyConditionExpression"] = query_expression if filter_expression is not None: query_kwargs["FilterExpression"] = filter_expression results = self._boto3_table.query( **query_kwargs, **self.data_model._get_kwargs(), ) items = [self.data_model.deserialize(item) for item in results["Items"]] yield from items while "LastEvaluatedKey" in results: results = self._boto3_table.query( ExclusiveStartKey=results["LastEvaluatedKey"], **query_kwargs, **self.data_model._get_kwargs(), ) items = [self.data_model.deserialize(item) for item in results["Items"]] yield from items