Skip to content

Mailchimp Segment Sync #97

Mailchimp Segment Sync

Mailchimp Segment Sync #97

name: Mailchimp Segment Sync
on:
schedule:
# Run daily at 9 AM UTC (adjust timezone as needed)
- cron: '0 9 * * *'
workflow_dispatch: # Allow manual triggering
jobs:
sync:
runs-on: ubuntu-latest
timeout-minutes: 30 # Prevent runaway processes
steps:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install requests
- name: Run Mailchimp sync
env:
# Required: API Configuration
MAILCHIMP_API_KEY: ${{ secrets.MAILCHIMP_API_KEY }}
MAILCHIMP_SERVER_PREFIX: ${{ secrets.MAILCHIMP_SERVER_PREFIX }}
AUDIENCE_A_ID: ${{ secrets.AUDIENCE_A_ID }}
AUDIENCE_B_ID: ${{ secrets.AUDIENCE_B_ID }}
# Required: Segment Configuration
SEGMENT_1_ID: ${{ secrets.SEGMENT_1_ID }}
SEGMENT_1_TAG: ${{ secrets.SEGMENT_1_TAG }}
SEGMENT_2_ID: ${{ secrets.SEGMENT_2_ID }}
SEGMENT_2_TAG: ${{ secrets.SEGMENT_2_TAG }}
SEGMENT_3_ID: ${{ secrets.SEGMENT_3_ID }}
SEGMENT_3_TAG: ${{ secrets.SEGMENT_3_TAG }}
SEGMENT_4_ID: ${{ secrets.SEGMENT_4_ID }}
SEGMENT_4_TAG: ${{ secrets.SEGMENT_4_TAG }}
# Optional: Performance Tuning (will use defaults if not set)
GET_BATCH_SIZE: ${{ secrets.GET_BATCH_SIZE }}
BATCH_SIZE: ${{ secrets.BATCH_SIZE }}
BATCH_DELAY: ${{ secrets.BATCH_DELAY }}
run: |
python -c "
import os
import time
import requests
import json
import hashlib
import sys
# Configuration from environment variables
API_KEY = os.getenv('MAILCHIMP_API_KEY')
SERVER_PREFIX = os.getenv('MAILCHIMP_SERVER_PREFIX', 'us7')
AUDIENCE_A_ID = os.getenv('AUDIENCE_A_ID')
AUDIENCE_B_ID = os.getenv('AUDIENCE_B_ID')
# Batch configuration (optional to obfuscate)
GET_BATCH_SIZE = int(os.getenv('GET_BATCH_SIZE', '1000'))
BATCH_SIZE = int(os.getenv('BATCH_SIZE', '200'))
BATCH_DELAY = int(os.getenv('BATCH_DELAY', '5'))
# Segment and tag configurations from environment
def load_segments():
segments = []
i = 1
while True:
segment_id = os.getenv(f'SEGMENT_{i}_ID')
tag_name = os.getenv(f'SEGMENT_{i}_TAG')
if not segment_id or not tag_name:
break
segments.append({
'SEGMENT_ID': segment_id,
'TAG_NAME': tag_name
})
i += 1
if not segments:
print('ERROR: No segments configured. Please set SEGMENT_1_ID, SEGMENT_1_TAG, etc.')
sys.exit(1)
return segments
SEGMENTS = load_segments()
# Validate required environment variables
required_vars = [API_KEY, AUDIENCE_A_ID, AUDIENCE_B_ID]
required_names = ['MAILCHIMP_API_KEY', 'AUDIENCE_A_ID', 'AUDIENCE_B_ID']
if not all(required_vars):
print('ERROR: Missing required environment variables:')
for var, name in zip(required_vars, required_names):
print(f' {name}: {\"✓\" if var else \"✗\"}')
sys.exit(1)
def md5_hash(email):
return hashlib.md5(email.lower().encode('utf-8')).hexdigest()
def get_segment_members(segment_id, tag_name):
all_emails = []
offset = 0
base_url = f'https://{SERVER_PREFIX}.api.mailchimp.com/3.0'
auth = ('anystring', API_KEY)
print(f'📥 Fetching members from {tag_name} segment...')
while True:
try:
# Get segment members
url = f'{base_url}/lists/{AUDIENCE_A_ID}/segments/{segment_id}/members'
params = {
'count': GET_BATCH_SIZE,
'offset': offset
}
response = requests.get(url, auth=auth, params=params)
response.raise_for_status()
data = response.json()
batch_emails = [member['email_address'] for member in data['members']]
all_emails.extend(batch_emails)
# Check if we have all members (without revealing counts)
if len(all_emails) >= data['total_items']:
break
offset = len(all_emails)
time.sleep(2) # Rate limiting pause
except requests.exceptions.RequestException as e:
print(f'❌ Error fetching members from {tag_name} segment: {str(e)}')
if hasattr(e, 'response') and hasattr(e.response, 'text'):
print(f'Response: {e.response.text}')
raise
print(f'✅ Successfully collected members from {tag_name} segment')
return all_emails
def combine_operations():
all_operations = {}
for segment in SEGMENTS:
segment_id = segment['SEGMENT_ID']
tag_name = segment['TAG_NAME']
emails = get_segment_members(segment_id, tag_name)
for email in emails:
if email not in all_operations:
all_operations[email] = {'add_tags': [], 'remove_tags': []}
all_operations[email]['add_tags'].append(tag_name)
# Handle Active/Lapsed mutual exclusion
if tag_name == 'Active':
all_operations[email]['remove_tags'].append('Lapsed')
elif tag_name == 'Lapsed':
all_operations[email]['remove_tags'].append('Active')
return all_operations
def execute_tagging_operations(operations):
base_url = f'https://{SERVER_PREFIX}.api.mailchimp.com/3.0'
auth = ('anystring', API_KEY)
operation_list = list(operations.items())
total_emails = len(operation_list)
total_batches = (total_batches_calc := (total_emails + BATCH_SIZE - 1) // BATCH_SIZE)
print('🚀 Starting tagging operations...')
print('🔄 Processing in batches with rate limiting...')
processed_count = 0
successful_batches = 0
failed_batches = 0
for batch_num in range(total_batches):
start_idx = batch_num * BATCH_SIZE
end_idx = min(start_idx + BATCH_SIZE, total_emails)
batch_operations = operation_list[start_idx:end_idx]
print(f'📦 Processing batch {batch_num + 1}/{total_batches}...')
try:
api_operations = []
for email, ops in batch_operations:
subscriber_hash = md5_hash(email)
tags = []
for tag in ops['add_tags']:
tags.append({'name': tag, 'status': 'active'})
for tag in ops['remove_tags']:
tags.append({'name': tag, 'status': 'inactive'})
if tags:
api_operations.append({
'method': 'POST',
'path': f'/lists/{AUDIENCE_B_ID}/members/{subscriber_hash}/tags',
'body': json.dumps({'tags': tags})
})
if api_operations:
batch_data = {'operations': api_operations}
response = requests.post(
f'{base_url}/batches',
auth=auth,
json=batch_data
)
response.raise_for_status()
batch_id = response.json()['id']
print(f' ✅ Batch submitted successfully (ID: {batch_id})')
successful_batches += 1
if batch_num < total_batches - 1:
print(f' ⏳ Waiting {BATCH_DELAY}s before next batch...')
time.sleep(BATCH_DELAY)
except requests.exceptions.RequestException as e:
print(f' ❌ Error processing batch: {str(e)}')
if hasattr(e, 'response') and hasattr(e.response, 'text'):
print(f' Response: {e.response.text}')
failed_batches += 1
print(f' ⏳ Waiting {BATCH_DELAY * 2}s after error...')
time.sleep(BATCH_DELAY * 2)
continue
print('🎉 Tagging operations complete!')
print(f'✅ Successful batches: {successful_batches}')
if failed_batches > 0:
print(f'❌ Failed batches: {failed_batches}')
sys.exit(1)
else:
print('✅ All operations completed successfully')
def print_summary(operations):
print('\\n' + '='*40)
print('📊 OPERATION SUMMARY')
print('='*40)
# Show configured tags without revealing counts
configured_tags = set()
for segment in SEGMENTS:
configured_tags.add(segment['TAG_NAME'])
print('🏷️ Configured tags:')
for tag in sorted(configured_tags):
print(f' • {tag}')
# Check for overlaps without revealing numbers
has_overlaps = any(len(ops['add_tags']) > 1 for ops in operations.values())
if has_overlaps:
print('⚠️ Some contacts appear in multiple segments')
print('='*40)
def main():
try:
print('🚀 Starting Mailchimp segment sync...')
print(f'📅 Running at: {time.strftime(\"%Y-%m-%d %H:%M:%S UTC\")}')
# Show configured segments with IDs visible
print(f'🔧 Configuration: {len(SEGMENTS)} segments configured')
for i, segment in enumerate(SEGMENTS, 1):
# Add spaces to bypass GitHub's masking
segment_id_spaced = ' '.join(segment['SEGMENT_ID'])
tag_name_spaced = ' '.join(segment['TAG_NAME'])
print(f' 📋 Segment {i}:')
print(f' ID: {segment_id_spaced}')
print(f' Tag: {tag_name_spaced}')
operations = combine_operations()
print_summary(operations)
execute_tagging_operations(operations)
print('\\n🎉 Script completed successfully!')
except Exception as e:
print(f'\\n💥 Script failed with error: {str(e)}')
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()
"
- name: Report status
if: always()
run: |
if [ $? -eq 0 ]; then
echo "✅ Mailchimp sync completed successfully"
else
echo "❌ Mailchimp sync failed"
exit 1
fi