Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/43908.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dynamodb_table: Adds `global_table_witness_region_name` argument in support of [multi-Region strong consistency](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/V2globaltables_HowItWorks.html#V2globaltables_HowItWorks.choosing-consistency-mode) for Amazon DynamoDB global tables
```
108 changes: 87 additions & 21 deletions internal/service/dynamodb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ func resourceTable() *schema.Resource {
Computed: true,
ConflictsWith: []string{"on_demand_throughput"},
},
"global_table_witness_region_name": {
Type: schema.TypeString,
Optional: true,
Computed: true,
},
"replica": {
Type: schema.TypeSet,
Optional: true,
Expand Down Expand Up @@ -831,7 +836,11 @@ func resourceTableCreate(ctx context.Context, d *schema.ResourceData, meta any)
}

if v := d.Get("replica").(*schema.Set); v.Len() > 0 {
if err := createReplicas(ctx, conn, d.Id(), v.List(), true, d.Timeout(schema.TimeoutCreate)); err != nil {
var global_table_witness_region_name = ""
if v, ok := d.GetOk("global_table_witness_region_name"); ok {
global_table_witness_region_name = v.(string)
}
if err := createReplicas(ctx, conn, d.Id(), v.List(), true, d.Timeout(schema.TimeoutCreate), global_table_witness_region_name); err != nil {
return create.AppendDiagError(diags, names.DynamoDB, create.ErrActionCreating, resNameTable, d.Id(), fmt.Errorf("replicas: %w", err))
}

Expand Down Expand Up @@ -913,6 +922,10 @@ func resourceTableRead(ctx context.Context, d *schema.ResourceData, meta any) di
d.Set(names.AttrStreamARN, table.LatestStreamArn)
d.Set("stream_label", table.LatestStreamLabel)

if err := d.Set("global_table_witness_region_name", flattenGlobalTableWitnesses(table.GlobalTableWitnesses)); err != nil {
return create.AppendDiagSettingError(diags, names.DynamoDB, resNameTable, d.Id(), "global_table_witness_region_name", err)
}

sse := flattenTableServerSideEncryption(table.SSEDescription)
sse = clearSSEDefaultKey(ctx, c, sse)

Expand Down Expand Up @@ -1169,6 +1182,10 @@ func resourceTableUpdate(ctx context.Context, d *schema.ResourceData, meta any)
return create.AppendDiagError(diags, names.DynamoDB, create.ErrActionUpdating, resNameTable, d.Id(), fmt.Errorf("%s GSI (%s): %w", create.ErrActionWaitingForCreation, idxName, err))
}
}
global_table_witness_region_name := ""
if v, ok := d.GetOk("global_table_witness_region_name"); ok {
global_table_witness_region_name = v.(string)
}

if d.HasChange("server_side_encryption") {
if replicas, sseSpecification := d.Get("replica").(*schema.Set), expandEncryptAtRestOptions(d.Get("server_side_encryption").([]any)); replicas.Len() > 0 && sseSpecification.KMSMasterKeyId != nil {
Expand Down Expand Up @@ -1261,7 +1278,7 @@ func resourceTableUpdate(ctx context.Context, d *schema.ResourceData, meta any)
if d.HasChange("replica") {
replicaTagsChange = true

if err := updateReplica(ctx, conn, d); err != nil {
if err := updateReplica(ctx, conn, d, global_table_witness_region_name); err != nil {
return create.AppendDiagError(diags, names.DynamoDB, create.ErrActionUpdating, resNameTable, d.Id(), err)
}
}
Expand Down Expand Up @@ -1293,11 +1310,16 @@ func resourceTableDelete(ctx context.Context, d *schema.ResourceData, meta any)

if replicas := d.Get("replica").(*schema.Set).List(); len(replicas) > 0 {
log.Printf("[DEBUG] Deleting DynamoDB Table replicas: %s", d.Id())
if err := deleteReplicas(ctx, conn, d.Id(), replicas, d.Timeout(schema.TimeoutDelete)); err != nil {
var global_table_witness_region_name = ""
if v, ok := d.GetOk("global_table_witness_region_name"); ok {
global_table_witness_region_name = v.(string)
}
if err := deleteReplicas(ctx, conn, d.Id(), replicas, d.Timeout(schema.TimeoutDelete), global_table_witness_region_name); err != nil {
// ValidationException: Replica specified in the Replica Update or Replica Delete action of the request was not found.
// ValidationException: Cannot add, delete, or update the local region through ReplicaUpdates. Use CreateTable, DeleteTable, or UpdateTable as required.
if !tfawserr.ErrMessageContains(err, errCodeValidationException, "request was not found") &&
!tfawserr.ErrMessageContains(err, errCodeValidationException, "Cannot add, delete, or update the local region through ReplicaUpdates") {
!tfawserr.ErrMessageContains(err, errCodeValidationException, "Cannot add, delete, or update the local region through ReplicaUpdates") &&
!tfawserr.ErrMessageContains(err, errCodeValidationException, "MultiRegionConsistency must be set as STRONG when GlobalTableWitnessUpdates parameter is present") {
return create.AppendDiagError(diags, names.DynamoDB, create.ErrActionDeleting, resNameTable, d.Id(), err)
}
}
Expand Down Expand Up @@ -1371,7 +1393,7 @@ func cycleStreamEnabled(ctx context.Context, conn *dynamodb.Client, id string, s
return nil
}

func createReplicas(ctx context.Context, conn *dynamodb.Client, tableName string, tfList []any, create bool, timeout time.Duration) error {
func createReplicas(ctx context.Context, conn *dynamodb.Client, tableName string, tfList []any, create bool, timeout time.Duration, gt_witness_reg_name string) error {
// Duplicating this for MRSC Adoption. If using MRSC and CreateReplicationGroupMemberAction list isn't initiated for at least 2 replicas
// then the update table action will fail with
// "Unsupported table replica count for global tables with MultiRegionConsistency set to STRONG"
Expand All @@ -1390,16 +1412,21 @@ func createReplicas(ctx context.Context, conn *dynamodb.Client, tableName string
}
}
}
log.Printf("[DEBUG] global table witness region: %v and numReplicas (%v) and numReplicasMRSC (%v)\n", gt_witness_reg_name, numReplicas, numReplicasMRSC)

if numReplicasMRSC > 0 {
MRSCErrorMsg := "creating replicas: Using MultiRegionStrongConsistency requires exactly 2 replicas, or 1 replica and 1 witness region."
if numReplicasMRSC > 0 && numReplicasMRSC != numReplicas {
return fmt.Errorf("creating replicas: Using MultiRegionStrongConsistency requires all replicas to use 'consistency_mode' set to 'STRONG' ")
return fmt.Errorf("%s", MRSCErrorMsg)
}
if numReplicasMRSC == 1 {
return fmt.Errorf("creating replicas: Using MultiRegionStrongConsistency requires exactly 2 replicas. ")
if numReplicasMRSC == 1 && gt_witness_reg_name == "" {
return fmt.Errorf("%s Only MRSC Replica count of 1 was provided but no Witness region was provided", MRSCErrorMsg)
}
if numReplicasMRSC == 2 && (numReplicasMRSC == numReplicas && gt_witness_reg_name != "") {
return fmt.Errorf("%s MRSC Replica count of 2 was provided and a Witness region was also provided", MRSCErrorMsg)
}
if numReplicasMRSC > 2 {
return fmt.Errorf("creating replicas: Using MultiRegionStrongConsistency supports at most 2 replicas. ")
return fmt.Errorf("%s Too many Replicas were provided %v", MRSCErrorMsg, numReplicasMRSC)
}

mrscInput = awstypes.MultiRegionConsistencyStrong
Expand Down Expand Up @@ -1430,10 +1457,21 @@ func createReplicas(ctx context.Context, conn *dynamodb.Client, tableName string
})
}

var witnessCreate []awstypes.GlobalTableWitnessGroupUpdate
if gt_witness_reg_name != "" {
var witnessInput = &awstypes.CreateGlobalTableWitnessGroupMemberAction{
RegionName: aws.String(gt_witness_reg_name),
}
witnessCreate = append(witnessCreate, awstypes.GlobalTableWitnessGroupUpdate{
Create: witnessInput,
})
}

input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
ReplicaUpdates: replicaCreates,
MultiRegionConsistency: mrscInput,
TableName: aws.String(tableName),
ReplicaUpdates: replicaCreates,
GlobalTableWitnessUpdates: witnessCreate,
MultiRegionConsistency: mrscInput,
}

err := retry.RetryContext(ctx, max(replicaUpdateTimeout, timeout), func() *retry.RetryError {
Expand Down Expand Up @@ -1563,7 +1601,7 @@ func createReplicas(ctx context.Context, conn *dynamodb.Client, tableName string
// ValidationException: One or more parameter values were invalid: KMSMasterKeyId must be specified for each replica.

if create && tfawserr.ErrMessageContains(err, errCodeValidationException, "already exist") {
return createReplicas(ctx, conn, tableName, tfList, false, timeout)
return createReplicas(ctx, conn, tableName, tfList, false, timeout, gt_witness_reg_name)
}

if err != nil && !tfawserr.ErrMessageContains(err, errCodeValidationException, "no actions specified") {
Expand Down Expand Up @@ -1714,7 +1752,7 @@ func updateReplicaDeletionProtection(ctx context.Context, conn *dynamodb.Client,
return nil
}

func updateReplica(ctx context.Context, conn *dynamodb.Client, d *schema.ResourceData) error {
func updateReplica(ctx context.Context, conn *dynamodb.Client, d *schema.ResourceData, gt_witness_reg_name string) error {
oRaw, nRaw := d.GetChange("replica")
o := oRaw.(*schema.Set)
n := nRaw.(*schema.Set)
Expand Down Expand Up @@ -1811,19 +1849,19 @@ func updateReplica(ctx context.Context, conn *dynamodb.Client, d *schema.Resourc
}

if len(removeFirst) > 0 { // mini ForceNew, recreates replica but doesn't recreate the table
if err := deleteReplicas(ctx, conn, d.Id(), removeFirst, d.Timeout(schema.TimeoutUpdate)); err != nil {
if err := deleteReplicas(ctx, conn, d.Id(), removeFirst, d.Timeout(schema.TimeoutUpdate), gt_witness_reg_name); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
}
}

if len(toRemove) > 0 {
if err := deleteReplicas(ctx, conn, d.Id(), toRemove, d.Timeout(schema.TimeoutUpdate)); err != nil {
if err := deleteReplicas(ctx, conn, d.Id(), toRemove, d.Timeout(schema.TimeoutUpdate), gt_witness_reg_name); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
}
}

if len(toAdd) > 0 {
if err := createReplicas(ctx, conn, d.Id(), toAdd, true, d.Timeout(schema.TimeoutCreate)); err != nil {
if err := createReplicas(ctx, conn, d.Id(), toAdd, true, d.Timeout(schema.TimeoutCreate), gt_witness_reg_name); err != nil {
return fmt.Errorf("updating replicas, while creating: %w", err)
}
}
Expand Down Expand Up @@ -2004,10 +2042,11 @@ func deleteTable(ctx context.Context, conn *dynamodb.Client, tableName string) e
return err
}

func deleteReplicas(ctx context.Context, conn *dynamodb.Client, tableName string, tfList []any, timeout time.Duration) error {
func deleteReplicas(ctx context.Context, conn *dynamodb.Client, tableName string, tfList []any, timeout time.Duration, gt_witness_reg_name string) error {
var g multierror.Group

var replicaDeletes []awstypes.ReplicationGroupUpdate
var witnessDeletes []awstypes.GlobalTableWitnessGroupUpdate
for _, tfMapRaw := range tfList {
tfMap, ok := tfMapRaw.(map[string]any)

Expand Down Expand Up @@ -2035,18 +2074,27 @@ func deleteReplicas(ctx context.Context, conn *dynamodb.Client, tableName string
}
}
}

if gt_witness_reg_name != "" {
witnessDeletes = append(witnessDeletes, awstypes.GlobalTableWitnessGroupUpdate{
Delete: &awstypes.DeleteGlobalTableWitnessGroupMemberAction{
RegionName: aws.String(gt_witness_reg_name),
},
})
}
// We built an array of MultiRegionStrongConsistency replicas that need deletion.
// These need to all happen concurrently
if len(replicaDeletes) > 0 {
input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
ReplicaUpdates: replicaDeletes,
TableName: aws.String(tableName),
ReplicaUpdates: replicaDeletes,
GlobalTableWitnessUpdates: witnessDeletes,
}
log.Printf("[DEBUG] Deleting Replicas: %+v\n, tablename: %+v", input, aws.String(tableName))
err := retry.RetryContext(ctx, updateTableTimeout, func() *retry.RetryError {
_, err := conn.UpdateTable(ctx, input)
notFoundRetries := 0
if err != nil {
log.Printf("[DEBUG] UpdateTable Error: %+v\n", err)
if tfawserr.ErrCodeEquals(err, errCodeThrottlingException) {
return retry.RetryableError(err)
}
Expand Down Expand Up @@ -2482,6 +2530,24 @@ func flattenOnDemandThroughput(apiObject *awstypes.OnDemandThroughput) []any {
return []any{m}
}

func flattenGlobalTableWitnesses(apiObjects []awstypes.GlobalTableWitnessDescription) string {
if apiObjects == nil {
return ""
}

if len(apiObjects) > 1 {
return ""
}

for _, apiObject := range apiObjects {
if apiObject.RegionName != nil {
return aws.ToString(apiObject.RegionName)
}
}

return ""
}

func flattenReplicaDescription(apiObject *awstypes.ReplicaDescription) map[string]any {
if apiObject == nil {
return nil
Expand Down
Loading
Loading